rogue
Loading...
Searching...
No Matches
ZmqServer.cpp
Go to the documentation of this file.
1
18#include "rogue/Directives.h"
19
21
22#include <inttypes.h>
23#include <zmq.h>
24
25#include <memory>
26#include <string>
27
28#include "rogue/GeneralError.h"
29#include "rogue/GilRelease.h"
30#include "rogue/ScopedGil.h"
31
32#ifndef NO_PYTHON
33 #include <boost/python.hpp>
34namespace bp = boost::python;
35#endif
36
38 rogue::interfaces::ZmqServerPtr ret = std::make_shared<rogue::interfaces::ZmqServer>(addr, port);
39 return (ret);
40}
41
44#ifndef NO_PYTHON
45
46 bp::class_<rogue::interfaces::ZmqServerWrap, rogue::interfaces::ZmqServerWrapPtr, boost::noncopyable>(
47 "ZmqServer",
48 bp::init<std::string, uint16_t>())
55#endif
56}
57
58rogue::interfaces::ZmqServer::ZmqServer(const std::string& addr, uint16_t port) {
59 log_ = rogue::Logging::create("ZmqServer");
60
61 this->addr_ = addr;
62 this->zmqCtx_ = zmq_ctx_new();
63 this->threadEn_ = false;
64 this->basePort_ = port;
65}
66
68 stop();
69 // stop() only frees the context when start() ran; covers construct-without-start.
70 if (zmqCtx_ != nullptr) {
71 zmq_ctx_destroy(zmqCtx_);
72 zmqCtx_ = nullptr;
73 }
74}
75
77 std::string dummy;
78 bool res = false;
79 uint16_t port;
80
81 if (this->threadEn_) return;
82 port = this->basePort_;
83
84 // Auto port
85 if (port == 0) {
86 for (this->basePort_ = 9099; this->basePort_ < (9099 + 100); this->basePort_ += 4) {
87 res = this->tryConnect();
88 if (res) break;
89 }
90 } else {
91 res = this->tryConnect();
92 }
93
94 if (!res) {
95 if (port == 0)
96 throw(rogue::GeneralError::create("ZmqServer::ZmqServer",
97 "Failed to auto bind server on interface %s.",
98 this->addr_.c_str()));
99 else
100 throw(rogue::GeneralError::create("ZmqServer::ZmqServer",
101 "Failed to bind server to port %" PRIu16
102 " on interface %s. Another process may be using this port.",
103 port + 1,
104 this->addr_.c_str()));
105 }
106
107 log_->debug("Started Rogue server on %s at ports %" PRIu16 ":%" PRIu16 ":%" PRIu16,
108 this->addr_.c_str(),
109 this->basePort_,
110 this->basePort_ + 1,
111 this->basePort_ + 2);
112
113 this->threadEn_ = true;
114 this->rThread_ = new std::thread(&rogue::interfaces::ZmqServer::runThread, this);
115 this->sThread_ = new std::thread(&rogue::interfaces::ZmqServer::strThread, this);
116
117 // Send empty frame
118 dummy = "null\n";
119 zmq_send(this->zmqPub_, dummy.c_str(), dummy.size(), 0);
120}
121
123 if (threadEn_) {
124 rogue::GilRelease noGil;
125 threadEn_ = false;
126 log_->info("Waiting for server thread to exit");
127 // Null-guard: a bad_alloc on the second start()-allocated thread leaves
128 // one pointer valid and the other null.
129 if (rThread_ != nullptr) {
130 rThread_->join();
131 delete rThread_;
132 rThread_ = nullptr;
133 }
134 if (sThread_ != nullptr) {
135 sThread_->join();
136 delete sThread_;
137 sThread_ = nullptr;
138 }
139 log_->info("Closing pub socket");
140 zmq_close(this->zmqPub_);
141 zmqPub_ = nullptr;
142 log_->info("Closing request socket");
143 zmq_close(this->zmqRep_);
144 zmqRep_ = nullptr;
145 log_->info("Closing string socket");
146 zmq_close(this->zmqStr_);
147 zmqStr_ = nullptr;
148 log_->info("Destroying Context");
149 zmq_ctx_destroy(this->zmqCtx_);
150 zmqCtx_ = nullptr;
151 log_->info("Zmq server done. Exiting");
152 }
153}
154
155bool rogue::interfaces::ZmqServer::tryConnect() {
156 std::string temp;
157 uint32_t opt;
158
159 log_->debug("Trying to serve on ports %" PRIu16 ":%" PRIu16 ":%" PRIu16,
160 this->basePort_,
161 this->basePort_ + 1,
162 this->basePort_ + 2);
163
164 this->zmqPub_ = zmq_socket(this->zmqCtx_, ZMQ_PUB);
165 this->zmqRep_ = zmq_socket(this->zmqCtx_, ZMQ_REP);
166 this->zmqStr_ = zmq_socket(this->zmqCtx_, ZMQ_REP);
167
168 // Throws before start() skip the dtor's stop(); free the context here.
169 try {
170 opt = 0;
171 if (zmq_setsockopt(this->zmqPub_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
172 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket linger"));
173
174 if (zmq_setsockopt(this->zmqRep_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
175 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket linger"));
176
177 if (zmq_setsockopt(this->zmqStr_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
178 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket linger"));
179
180 opt = 100;
181 if (zmq_setsockopt(this->zmqRep_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
182 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket receive timeout"));
183
184 if (zmq_setsockopt(this->zmqStr_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
185 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket receive timeout"));
186 } catch (...) {
187 zmq_close(this->zmqPub_);
188 zmqPub_ = nullptr;
189 zmq_close(this->zmqRep_);
190 zmqRep_ = nullptr;
191 zmq_close(this->zmqStr_);
192 zmqStr_ = nullptr;
193 throw;
194 }
195
196 // Setup publish port
197 temp = "tcp://";
198 temp.append(this->addr_);
199 temp.append(":");
200 temp.append(std::to_string(static_cast<int64_t>(this->basePort_)));
201
202 // Null after close so the auto-port retry loop does not double-close stale handles.
203 if (zmq_bind(this->zmqPub_, temp.c_str()) < 0) {
204 zmq_close(this->zmqPub_);
205 zmqPub_ = nullptr;
206 zmq_close(this->zmqRep_);
207 zmqRep_ = nullptr;
208 zmq_close(this->zmqStr_);
209 zmqStr_ = nullptr;
210 log_->debug("Failed to bind publish socket to %s: %s", temp.c_str(), zmq_strerror(zmq_errno()));
211 return false;
212 }
213
214 // Setup response port
215 temp = "tcp://";
216 temp.append(this->addr_);
217 temp.append(":");
218 temp.append(std::to_string(static_cast<int64_t>(this->basePort_ + 1)));
219
220 if (zmq_bind(this->zmqRep_, temp.c_str()) < 0) {
221 zmq_close(this->zmqPub_);
222 zmqPub_ = nullptr;
223 zmq_close(this->zmqRep_);
224 zmqRep_ = nullptr;
225 zmq_close(this->zmqStr_);
226 zmqStr_ = nullptr;
227 log_->debug("Failed to bind request socket to %s: %s", temp.c_str(), zmq_strerror(zmq_errno()));
228 return false;
229 }
230
231 // Setup string port
232 temp = "tcp://";
233 temp.append(this->addr_);
234 temp.append(":");
235 temp.append(std::to_string(static_cast<int64_t>(this->basePort_ + 2)));
236
237 if (zmq_bind(this->zmqStr_, temp.c_str()) < 0) {
238 zmq_close(this->zmqPub_);
239 zmqPub_ = nullptr;
240 zmq_close(this->zmqRep_);
241 zmqRep_ = nullptr;
242 zmq_close(this->zmqStr_);
243 zmqStr_ = nullptr;
244 log_->debug("Failed to bind string request socket to %s: %s", temp.c_str(), zmq_strerror(zmq_errno()));
245 return false;
246 }
247
248 return true;
249}
250
252 return this->basePort_;
253}
254
255std::string rogue::interfaces::ZmqServer::doString(const std::string& data) {
256 return "";
257}
258
259#ifndef NO_PYTHON
260
262 zmq_msg_t msg;
263 Py_buffer valueBuf;
264
265 if (!this->threadEn_) return;
266
267 // PyErr_Print surfaces the Python-level error (e.g. TypeError from a
268 // non-buffer argument) AND clears the thread's error indicator; without
269 // it boost::python may observe "exception already set" when translating
270 // our throw, masking the original cause.
271 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0) {
272 PyErr_Print();
273 throw(rogue::GeneralError::create("ZmqServer::publish", "Failed to extract object data"));
274 }
275
276 // zmq_msg_init_size returns -1 on allocation failure; the message is left
277 // uninitialized so memcpy/zmq_msg_data must not run, and we still own
278 // valueBuf and have to release it before propagating the error.
279 if (zmq_msg_init_size(&msg, valueBuf.len) < 0) {
280 PyBuffer_Release(&valueBuf);
281 throw(rogue::GeneralError::create("ZmqServer::publish", "zmq_msg_init_size failed"));
282 }
283 memcpy(zmq_msg_data(&msg), valueBuf.buf, valueBuf.len);
284 PyBuffer_Release(&valueBuf);
285
286 rogue::GilRelease noGil;
287 // On success, zmq_sendmsg transfers ownership and zeroes the message;
288 // on failure (-1, e.g. ETERM during shutdown) we retain ownership and
289 // must close to avoid leaking the libzmq message buffer.
290 if (zmq_sendmsg(this->zmqPub_, &msg, 0) < 0) {
291 zmq_msg_close(&msg);
292 throw(rogue::GeneralError::create("ZmqServer::publish", "zmq_sendmsg failed"));
293 }
294}
295
296bp::object rogue::interfaces::ZmqServer::doRequest(bp::object data) {
297 bp::handle<> handle(bp::borrowed(Py_None));
298 return bp::object(handle);
299}
300
302 : rogue::interfaces::ZmqServer(addr, port) {}
303
305 if (bp::override f = this->get_override("_doRequest")) {
306 try {
307 return (f(data));
308 } catch (...) {
309 PyErr_Print();
310 }
311 }
313}
314
318
319std::string rogue::interfaces::ZmqServerWrap::doString(const std::string& data) {
320 {
322 if (bp::override f = this->get_override("_doString")) {
323 try {
324 return (f(data));
325 } catch (...) {
326 PyErr_Print();
327 }
328 }
329 }
331}
332
333std::string rogue::interfaces::ZmqServerWrap::defDoString(const std::string& data) {
335}
336
337#endif
338
339void rogue::interfaces::ZmqServer::runThread() {
340 zmq_msg_t rxMsg;
341 zmq_msg_t txMsg;
342
343 log_->logThreadId();
344 log_->info("Started Rogue server thread");
345
346 while (threadEn_) {
347 zmq_msg_init(&rxMsg);
348
349 // Get the message
350 if (zmq_recvmsg(this->zmqRep_, &rxMsg, 0) > 0) {
351#ifndef NO_PYTHON
352 // ZMQ_REP requires send-after-recv to keep the FSM healthy; the
353 // catch blocks below send an empty reply so a single bad request
354 // cannot wedge the socket into EFSM and drop all later requests.
355 bool txInit = false;
356 try {
357 Py_buffer valueBuf;
359 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
360
361 // PyErr_Print surfaces the Python-level error (e.g. MemoryError) AND clears
362 // the thread's error indicator; without this, the pending exception leaks
363 // into the next loop iteration's Py_BuildValue/bp::object calls.
364 if (val == NULL) {
365 PyErr_Print();
366 throw(rogue::GeneralError::create("ZmqServer::runThread", "Failed to generate bytearray"));
367 }
368
369 bp::handle<> handle(val);
370
371 bp::object ret = this->doRequest(bp::object(handle));
372
373 if (PyObject_GetBuffer(ret.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0) {
374 PyErr_Print();
375 throw(rogue::GeneralError::create("ZmqServer::runThread", "Failed to extract object data"));
376 }
377
378 // zmq_msg_init_size returns -1 on allocation failure; the message
379 // stays uninitialized, so memcpy/zmq_msg_data must not run. Release
380 // valueBuf before throwing so the catch block (which sends the
381 // empty-REP keepalive) does not leak the Python buffer.
382 if (zmq_msg_init_size(&txMsg, valueBuf.len) < 0) {
383 PyBuffer_Release(&valueBuf);
384 throw(rogue::GeneralError::create("ZmqServer::runThread", "zmq_msg_init_size failed"));
385 }
386 txInit = true;
387 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
388 PyBuffer_Release(&valueBuf);
389
390 // On success, zmq_sendmsg transfers ownership and zeroes the
391 // message; on failure (-1) we retain ownership and must close.
392 // Throw so the catch block sends the empty REP keepalive that
393 // clears the FSM -- otherwise the REP socket stays in the
394 // "must send" state and every subsequent zmq_recvmsg returns
395 // EFSM, wedging the worker on failed recvs until shutdown.
396 if (zmq_sendmsg(this->zmqRep_, &txMsg, 0) < 0) {
397 zmq_msg_close(&txMsg);
398 txInit = false;
399 throw(rogue::GeneralError::create("ZmqServer::runThread", "zmq_sendmsg failed"));
400 }
401 txInit = false;
402 } catch (const std::exception& e) {
403 log_->warning("ZmqServer::runThread: dropping request after exception: %s", e.what());
404 if (txInit) zmq_msg_close(&txMsg);
405 zmq_send(this->zmqRep_, "", 0, 0);
406 } catch (...) {
407 log_->warning("ZmqServer::runThread: dropping request after unknown exception");
408 if (txInit) zmq_msg_close(&txMsg);
409 zmq_send(this->zmqRep_, "", 0, 0);
410 }
411#endif
412 }
413 // Close even on RCVTIMEO: zmq_recvmsg() initializes rxMsg regardless.
414 zmq_msg_close(&rxMsg);
415 }
416 log_->info("Stopped Rogue server thread");
417}
418
419void rogue::interfaces::ZmqServer::strThread() {
420 zmq_msg_t msg;
421
422 log_->logThreadId();
423 log_->info("Started Rogue string server thread");
424
425 while (threadEn_) {
426 zmq_msg_init(&msg);
427
428 // Get the message
429 if (zmq_recvmsg(this->zmqStr_, &msg, 0) > 0) {
430 // ZMQ_REP requires send-after-recv to keep the FSM healthy; the
431 // catch blocks below send an empty reply so a single bad request
432 // cannot wedge the socket into EFSM and drop all later requests.
433 try {
434 std::string data((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
435 std::string ret = this->doString(data);
436 zmq_send(this->zmqStr_, ret.c_str(), ret.size(), 0);
437 } catch (const std::exception& e) {
438 log_->warning("ZmqServer::strThread: dropping request after exception: %s", e.what());
439 zmq_send(this->zmqStr_, "", 0, 0);
440 } catch (...) {
441 log_->warning("ZmqServer::strThread: dropping request after unknown exception");
442 zmq_send(this->zmqStr_, "", 0, 0);
443 }
444 }
445 // Close even on RCVTIMEO: zmq_recvmsg() initializes msg regardless.
446 zmq_msg_close(&msg);
447 }
448 log_->info("Stopped Rogue string server thread");
449}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
RAII helper that acquires the Python GIL for a scope.
Definition ScopedGil.h:35
boost::python::object defDoRequest(boost::python::object data)
Calls base-class doRequest() implementation.
std::string defDoString(const std::string &data)
Calls base-class doString() implementation.
boost::python::object doRequest(boost::python::object data)
Processes a Python-object request message.
std::string doString(const std::string &data)
Processes a string request message.
ZmqServerWrap(std::string addr, uint16_t port)
Constructs wrapper server.
ZeroMQ server for Rogue control, request/reply, and publish updates.
Definition ZmqServer.h:48
static void setup_python()
Registers Python bindings for this class.
Definition ZmqServer.cpp:43
uint16_t port()
Returns currently bound base port.
void start()
Starts server, binds sockets, and launches worker threads.
Definition ZmqServer.cpp:76
virtual ~ZmqServer()
Destroys server and stops worker threads/sockets.
Definition ZmqServer.cpp:67
void stop()
Stops server threads and closes sockets.
static std::shared_ptr< rogue::interfaces::ZmqServer > create(const std::string &addr, uint16_t port)
Creates a ZeroMQ server.
Definition ZmqServer.cpp:37
virtual boost::python::object doRequest(boost::python::object data)
Handles one binary request payload.
virtual std::string doString(const std::string &data)
Handles one string request payload.
void publish(boost::python::object data)
Publishes an update payload on the publish socket.
ZmqServer(const std::string &addr, uint16_t port)
Constructs a ZeroMQ server.
Definition ZmqServer.cpp:58
std::shared_ptr< rogue::interfaces::ZmqServer > ZmqServerPtr
Definition ZmqServer.h:143