rogue
Loading...
Searching...
No Matches
ZmqServer.cpp
Go to the documentation of this file.
1
18#include "rogue/Directives.h"
19
21
22#include <inttypes.h>
23#include <zmq.h>
24
25#include <memory>
26#include <string>
27
28#include "rogue/GeneralError.h"
29#include "rogue/GilRelease.h"
30#include "rogue/ScopedGil.h"
31
32#ifndef NO_PYTHON
33 #include <boost/python.hpp>
34namespace bp = boost::python;
35#endif
36
38 rogue::interfaces::ZmqServerPtr ret = std::make_shared<rogue::interfaces::ZmqServer>(addr, port);
39 return (ret);
40}
41
44#ifndef NO_PYTHON
45
46 bp::class_<rogue::interfaces::ZmqServerWrap, rogue::interfaces::ZmqServerWrapPtr, boost::noncopyable>(
47 "ZmqServer",
48 bp::init<std::string, uint16_t>())
55#endif
56}
57
58rogue::interfaces::ZmqServer::ZmqServer(const std::string& addr, uint16_t port) {
59 log_ = rogue::Logging::create("ZmqServer");
60
61 this->addr_ = addr;
62 this->zmqCtx_ = zmq_ctx_new();
63 this->threadEn_ = false;
64 this->basePort_ = port;
65}
66
70
72 std::string dummy;
73 bool res = false;
74 uint16_t port;
75
76 if (this->threadEn_) return;
77 port = this->basePort_;
78
79 // Auto port
80 if (port == 0) {
81 for (this->basePort_ = 9099; this->basePort_ < (9099 + 100); this->basePort_ += 4) {
82 res = this->tryConnect();
83 if (res) break;
84 }
85 } else {
86 res = this->tryConnect();
87 }
88
89 if (!res) {
90 if (port == 0)
91 throw(rogue::GeneralError::create("ZmqServer::ZmqServer",
92 "Failed to auto bind server on interface %s.",
93 this->addr_.c_str()));
94 else
95 throw(rogue::GeneralError::create("ZmqServer::ZmqServer",
96 "Failed to bind server to port %" PRIu16
97 " on interface %s. Another process may be using this port.",
98 port + 1,
99 this->addr_.c_str()));
100 }
101
102 log_->debug("Started Rogue server on %s at ports %" PRIu16 ":%" PRIu16 ":%" PRIu16,
103 this->addr_.c_str(),
104 this->basePort_,
105 this->basePort_ + 1,
106 this->basePort_ + 2);
107
108 this->threadEn_ = true;
109 this->rThread_ = new std::thread(&rogue::interfaces::ZmqServer::runThread, this);
110 this->sThread_ = new std::thread(&rogue::interfaces::ZmqServer::strThread, this);
111
112 // Send empty frame
113 dummy = "null\n";
114 zmq_send(this->zmqPub_, dummy.c_str(), dummy.size(), 0);
115}
116
118 if (threadEn_) {
119 rogue::GilRelease noGil;
120 threadEn_ = false;
121 log_->info("Waiting for server thread to exit");
122 rThread_->join();
123 sThread_->join();
124 log_->info("Closing pub socket");
125 zmq_close(this->zmqPub_);
126 log_->info("Closing request socket");
127 zmq_close(this->zmqRep_);
128 log_->info("Closing string socket");
129 zmq_close(this->zmqStr_);
130 log_->info("Destroying Context");
131 zmq_ctx_destroy(this->zmqCtx_);
132 log_->info("Zmq server done. Exiting");
133 }
134}
135
136bool rogue::interfaces::ZmqServer::tryConnect() {
137 std::string temp;
138 uint32_t opt;
139
140 log_->debug("Trying to serve on ports %" PRIu16 ":%" PRIu16 ":%" PRIu16,
141 this->basePort_,
142 this->basePort_ + 1,
143 this->basePort_ + 2);
144
145 this->zmqPub_ = zmq_socket(this->zmqCtx_, ZMQ_PUB);
146 this->zmqRep_ = zmq_socket(this->zmqCtx_, ZMQ_REP);
147 this->zmqStr_ = zmq_socket(this->zmqCtx_, ZMQ_REP);
148
149 opt = 0;
150 if (zmq_setsockopt(this->zmqPub_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
151 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket linger"));
152
153 if (zmq_setsockopt(this->zmqRep_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
154 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket linger"));
155
156 if (zmq_setsockopt(this->zmqStr_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
157 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket linger"));
158
159 opt = 100;
160 if (zmq_setsockopt(this->zmqRep_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
161 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket receive timeout"));
162
163 if (zmq_setsockopt(this->zmqStr_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
164 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket receive timeout"));
165
166 // Setup publish port
167 temp = "tcp://";
168 temp.append(this->addr_);
169 temp.append(":");
170 temp.append(std::to_string(static_cast<int64_t>(this->basePort_)));
171
172 if (zmq_bind(this->zmqPub_, temp.c_str()) < 0) {
173 zmq_close(this->zmqPub_);
174 zmq_close(this->zmqRep_);
175 zmq_close(this->zmqStr_);
176 log_->debug("Failed to bind publish socket to %s: %s", temp.c_str(), zmq_strerror(zmq_errno()));
177 return false;
178 }
179
180 // Setup response port
181 temp = "tcp://";
182 temp.append(this->addr_);
183 temp.append(":");
184 temp.append(std::to_string(static_cast<int64_t>(this->basePort_ + 1)));
185
186 if (zmq_bind(this->zmqRep_, temp.c_str()) < 0) {
187 zmq_close(this->zmqPub_);
188 zmq_close(this->zmqRep_);
189 zmq_close(this->zmqStr_);
190 log_->debug("Failed to bind request socket to %s: %s", temp.c_str(), zmq_strerror(zmq_errno()));
191 return false;
192 }
193
194 // Setup string port
195 temp = "tcp://";
196 temp.append(this->addr_);
197 temp.append(":");
198 temp.append(std::to_string(static_cast<int64_t>(this->basePort_ + 2)));
199
200 if (zmq_bind(this->zmqStr_, temp.c_str()) < 0) {
201 zmq_close(this->zmqPub_);
202 zmq_close(this->zmqRep_);
203 zmq_close(this->zmqStr_);
204 log_->debug("Failed to bind string request socket to %s: %s", temp.c_str(), zmq_strerror(zmq_errno()));
205 return false;
206 }
207
208 return true;
209}
210
212 return this->basePort_;
213}
214
215std::string rogue::interfaces::ZmqServer::doString(const std::string& data) {
216 return "";
217}
218
219#ifndef NO_PYTHON
220
222 zmq_msg_t msg;
223 Py_buffer valueBuf;
224
225 if (!this->threadEn_) return;
226
227 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0)
228 throw(rogue::GeneralError::create("ZmqServer::publish", "Failed to extract object data"));
229
230 zmq_msg_init_size(&msg, valueBuf.len);
231 memcpy(zmq_msg_data(&msg), valueBuf.buf, valueBuf.len);
232 PyBuffer_Release(&valueBuf);
233
234 rogue::GilRelease noGil;
235 zmq_sendmsg(this->zmqPub_, &msg, 0);
236}
237
238bp::object rogue::interfaces::ZmqServer::doRequest(bp::object data) {
239 bp::handle<> handle(bp::borrowed(Py_None));
240 return bp::object(handle);
241}
242
244 : rogue::interfaces::ZmqServer(addr, port) {}
245
247 if (bp::override f = this->get_override("_doRequest")) {
248 try {
249 return (f(data));
250 } catch (...) {
251 PyErr_Print();
252 }
253 }
255}
256
260
261std::string rogue::interfaces::ZmqServerWrap::doString(const std::string& data) {
262 {
264 if (bp::override f = this->get_override("_doString")) {
265 try {
266 return (f(data));
267 } catch (...) {
268 PyErr_Print();
269 }
270 }
271 }
273}
274
275std::string rogue::interfaces::ZmqServerWrap::defDoString(const std::string& data) {
277}
278
279#endif
280
281void rogue::interfaces::ZmqServer::runThread() {
282 zmq_msg_t rxMsg;
283 zmq_msg_t txMsg;
284
285 log_->logThreadId();
286 log_->info("Started Rogue server thread");
287
288 while (threadEn_) {
289 zmq_msg_init(&rxMsg);
290
291 // Get the message
292 if (zmq_recvmsg(this->zmqRep_, &rxMsg, 0) > 0) {
293#ifndef NO_PYTHON
294 Py_buffer valueBuf;
296 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
297
298 if (val == NULL) throw(rogue::GeneralError::create("ZmqServer::runThread", "Failed to generate bytearray"));
299
300 bp::handle<> handle(val);
301
302 bp::object ret = this->doRequest(bp::object(handle));
303
304 if (PyObject_GetBuffer(ret.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0)
305 throw(rogue::GeneralError::create("ZmqServer::runThread", "Failed to extract object data"));
306
307 zmq_msg_init_size(&txMsg, valueBuf.len);
308 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
309 PyBuffer_Release(&valueBuf);
310
311 zmq_sendmsg(this->zmqRep_, &txMsg, 0);
312#endif
313
314 zmq_msg_close(&rxMsg);
315 }
316 }
317 log_->info("Stopped Rogue server thread");
318}
319
320void rogue::interfaces::ZmqServer::strThread() {
321 std::string data;
322 std::string ret;
323 zmq_msg_t msg;
324
325 log_->logThreadId();
326 log_->info("Started Rogue string server thread");
327
328 while (threadEn_) {
329 zmq_msg_init(&msg);
330
331 // Get the message
332 if (zmq_recvmsg(this->zmqStr_, &msg, 0) > 0) {
333 data = std::string((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
334 ret = this->doString(data);
335 zmq_send(this->zmqStr_, ret.c_str(), ret.size(), 0);
336 zmq_msg_close(&msg);
337 }
338 }
339 log_->info("Stopped Rogue string server thread");
340}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
RAII helper that acquires the Python GIL for a scope.
Definition ScopedGil.h:35
boost::python::object defDoRequest(boost::python::object data)
Calls base-class doRequest() implementation.
std::string defDoString(const std::string &data)
Calls base-class doString() implementation.
boost::python::object doRequest(boost::python::object data)
Processes a Python-object request message.
std::string doString(const std::string &data)
Processes a string request message.
ZmqServerWrap(std::string addr, uint16_t port)
Constructs wrapper server.
ZeroMQ server for Rogue control, request/reply, and publish updates.
Definition ZmqServer.h:47
static void setup_python()
Registers Python bindings for this class.
Definition ZmqServer.cpp:43
uint16_t port()
Returns currently bound base port.
void start()
Starts server, binds sockets, and launches worker threads.
Definition ZmqServer.cpp:71
virtual ~ZmqServer()
Destroys server and stops worker threads/sockets.
Definition ZmqServer.cpp:67
void stop()
Stops server threads and closes sockets.
static std::shared_ptr< rogue::interfaces::ZmqServer > create(const std::string &addr, uint16_t port)
Creates a ZeroMQ server.
Definition ZmqServer.cpp:37
virtual boost::python::object doRequest(boost::python::object data)
Handles one binary request payload.
virtual std::string doString(const std::string &data)
Handles one string request payload.
void publish(boost::python::object data)
Publishes an update payload on the publish socket.
ZmqServer(const std::string &addr, uint16_t port)
Constructs a ZeroMQ server.
Definition ZmqServer.cpp:58
std::shared_ptr< rogue::interfaces::ZmqServer > ZmqServerPtr
Definition ZmqServer.h:146