rogue
Loading...
Searching...
No Matches
ZmqClient.cpp
Go to the documentation of this file.
1
18#include "rogue/Directives.h"
19
21
22#include <inttypes.h>
23#include <zmq.h>
24
25#include <cstdio>
26#include <memory>
27#include <string>
28
29#include "rogue/GeneralError.h"
30#include "rogue/GilRelease.h"
31#include "rogue/ScopedGil.h"
32
33#ifndef NO_PYTHON
34 #include <boost/python.hpp>
35namespace bp = boost::python;
36#endif
37
38namespace {
39
40constexpr uint32_t kRetryLogIntervalSeconds = 30;
41
42void logWaitRetry(rogue::LoggingPtr log, double seconds, uint32_t& lastLoggedSeconds) {
43 uint32_t elapsedSeconds = static_cast<uint32_t>(seconds);
44
45 if (elapsedSeconds == 0) return;
46
47 if (lastLoggedSeconds == 0) {
48 log->warning(
49 "Timeout waiting for response after %" PRIu32 " seconds, server may be busy. Continuing to wait...",
50 elapsedSeconds);
51 lastLoggedSeconds = elapsedSeconds;
52 return;
53 }
54
55 if (elapsedSeconds >= (lastLoggedSeconds + kRetryLogIntervalSeconds)) {
56 log->warning("Still waiting for response after %" PRIu32 " seconds, server may be busy...", elapsedSeconds);
57 lastLoggedSeconds = elapsedSeconds;
58 }
59}
60
61void logWaitRecovered(rogue::LoggingPtr log, double seconds) {
62 uint32_t elapsedSeconds = static_cast<uint32_t>(seconds);
63
64 if (elapsedSeconds != 0) {
65 log->warning("Received response from server after %" PRIu32 " seconds.", elapsedSeconds);
66 }
67}
68
69#ifndef NO_PYTHON
70
71// Python-facing setTimeout wrapper. A Python bool is an int subclass, so the
72// removed (msecs, waitRetry) form would silently coerce `True` to failTime=1.
73// Reject bools loudly so stale 2-arg callers (e.g. pysmurf's
74// setTimeout(10000, True)) get a clear TypeError instead of misbehaving.
75void setTimeoutPy(rogue::interfaces::ZmqClient& self, bp::object warnTime, bp::object failTime) {
76 if (PyBool_Check(warnTime.ptr()) || PyBool_Check(failTime.ptr())) {
77 PyErr_SetString(PyExc_TypeError,
78 "ZmqClient.setTimeout(warnTime, failTime=0): arguments must be int "
79 "milliseconds, not bool. The legacy (msecs, waitRetry[, maxRetries]) "
80 "form was removed; pass failTime=0 to retry forever.");
81 bp::throw_error_already_set();
82 }
83 self.setTimeout(bp::extract<uint32_t>(warnTime), bp::extract<uint32_t>(failTime));
84}
85
86#endif
87
88} // namespace
89
90rogue::interfaces::ZmqClientPtr rogue::interfaces::ZmqClient::create(const std::string& addr, uint16_t port, bool doString) {
91 rogue::interfaces::ZmqClientPtr ret = std::make_shared<rogue::interfaces::ZmqClient>(addr, port, doString);
92 return (ret);
93}
94
97#ifndef NO_PYTHON
98
99 bp::class_<rogue::interfaces::ZmqClientWrap, rogue::interfaces::ZmqClientWrapPtr, boost::noncopyable>(
100 "ZmqClient",
101 bp::init<std::string, uint16_t, bool>())
104 .def("setTimeout", &setTimeoutPy, (bp::arg("warnTime"), bp::arg("failTime") = bp::object(0u)))
110#endif
111}
112
113rogue::interfaces::ZmqClient::ZmqClient(const std::string& addr, uint16_t port, bool doString) {
114 std::string temp;
115 uint32_t val;
116 uint32_t reqPort;
117
118 this->doString_ = doString;
119 this->zmqCtx_ = zmq_ctx_new();
120 this->zmqSub_ = zmq_socket(this->zmqCtx_, ZMQ_SUB);
121 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_REQ);
122
123 log_ = rogue::Logging::create("ZmqClient");
124
125 // running_ is the dtor's cleanup sentinel; it is set last on success, so a
126 // throw in here would otherwise leak the context and sockets.
127 try {
128 if (!doString_) {
129 // Setup sub port
130 temp = "tcp://";
131 temp.append(addr);
132 temp.append(":");
133 temp.append(std::to_string(static_cast<int64_t>(port)));
134
135 if (zmq_setsockopt(this->zmqSub_, ZMQ_SUBSCRIBE, "", 0) != 0)
136 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket subscribe"));
137
138 val = 0;
139 if (zmq_setsockopt(this->zmqSub_, ZMQ_LINGER, &val, sizeof(int32_t)) != 0)
140 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket linger"));
141
142 val = 100;
143 if (zmq_setsockopt(this->zmqSub_, ZMQ_RCVTIMEO, &val, sizeof(int32_t)) != 0)
144 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket timeout"));
145
146 if (zmq_connect(this->zmqSub_, temp.c_str()) < 0)
147 throw(rogue::GeneralError::create("ZmqClient::ZmqClient",
148 "Failed to connect to port %" PRIu16 " at address %s",
149 port,
150 addr.c_str()));
151
152 reqPort = port + 1;
153 } else {
154 reqPort = port + 2;
155 }
156
157 // Setup request port
158 temp = "tcp://";
159 temp.append(addr);
160 temp.append(":");
161 temp.append(std::to_string(static_cast<int64_t>(reqPort)));
162
163 timeout_ = 1000; // 1 second warn/poll period
164 failTime_ = 1000; // fail on first timeout by default (no retry)
165 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_, sizeof(int32_t)) != 0)
166 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket timeout"));
167
168 val = 1;
169 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_CORRELATE, &val, sizeof(int32_t)) != 0)
170 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket correlate"));
171
172 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_RELAXED, &val, sizeof(int32_t)) != 0)
173 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket relaxed"));
174
175 val = 0;
176 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &val, sizeof(int32_t)) != 0)
177 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket linger"));
178
179 if (zmq_connect(this->zmqReq_, temp.c_str()) < 0)
180 throw(rogue::GeneralError::create("ZmqClient::ZmqClient",
181 "Failed to connect to port %" PRIu32 " at address %s",
182 reqPort,
183 addr.c_str()));
184
185 if (doString_) {
186 threadEn_ = false;
187 log_->info("Connected to Rogue server at port %" PRIu32, reqPort);
188 } else {
189 log_->info("Connected to Rogue server at ports %" PRIu16 ":%" PRIu32, port, reqPort);
190
191 threadEn_ = true;
192 thread_ = new std::thread(&rogue::interfaces::ZmqClient::runThread, this);
193 }
194 running_ = true;
195 } catch (...) {
196 if (zmqSub_ != nullptr) {
197 zmq_close(zmqSub_);
198 zmqSub_ = nullptr;
199 }
200 if (zmqReq_ != nullptr) {
201 zmq_close(zmqReq_);
202 zmqReq_ = nullptr;
203 }
204 if (zmqCtx_ != nullptr) {
205 zmq_ctx_destroy(zmqCtx_);
206 zmqCtx_ = nullptr;
207 }
208 throw;
209 }
210}
211
215
217 if (running_) {
218 running_ = false;
219 // Break any in-flight forever loop (failTime_ == 0) in send()/sendString()
220 // so the recv loop throws and releases reqLock_ within one RCVTIMEO
221 // period. Covers both string and binary modes.
222 stopping_ = true;
223
224 rogue::GilRelease noGil;
225
226 if (threadEn_) {
227 threadEn_ = false;
228 thread_->join();
229 delete thread_;
230 thread_ = nullptr;
231 }
232
233 // ZMQ sockets are not thread-safe. Serialize with any in-flight
234 // send()/sendString() before tearing down zmqReq_ and the context:
235 // stopping_ guarantees such a call releases reqLock_ promptly, so this
236 // cannot deadlock. The SUB thread is already joined, so zmqSub_ is idle.
237 std::lock_guard<std::mutex> lock(reqLock_);
238 if (!doString_) zmq_close(this->zmqSub_);
239 zmq_close(this->zmqReq_);
240 zmq_ctx_destroy(this->zmqCtx_);
241 }
242}
243
244void rogue::interfaces::ZmqClient::setTimeout(uint32_t warnTime, uint32_t failTime) {
245 // warnTime drives ZMQ_RCVTIMEO and the warn cadence. A value of 0 would make
246 // recv non-blocking, so the send()/sendString() loop would busy-spin without
247 // ever advancing seconds toward failTime. Reject it.
248 if (warnTime == 0)
249 throw rogue::GeneralError("ZmqClient::setTimeout", "warnTime must be greater than 0 milliseconds");
250
251 // ZMQ sockets are not thread-safe; serialize zmqReq_ access.
252 rogue::GilRelease noGil;
253 std::lock_guard<std::mutex> lock(reqLock_);
254
255 timeout_ = warnTime;
256 failTime_ = failTime;
257
258 log_->debug("Setting timeout to warnTime = %" PRIu32 " msecs, failTime = %" PRIu32 " msecs",
259 timeout_,
260 failTime_);
261
262 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_, sizeof(int32_t)) != 0)
263 throw(rogue::GeneralError("ZmqClient::setTimeout", "Failed to set socket timeout"));
264}
265
266std::string rogue::interfaces::ZmqClient::sendString(const std::string& path, const std::string& attr, const std::string& arg) {
267 std::string snd;
268 std::string ret;
269 zmq_msg_t msg;
270 std::string data;
271 double seconds = 0;
272 uint32_t lastLoggedSeconds = 0;
273
274 if (!doString_) throw rogue::GeneralError::create("ZmqClient::sendString", "Invalid send call in standard mode");
275
276 snd = "{\"attr\": \"" + attr + "\",";
277 snd += "\"path\": \"" + path + "\"";
278
279 if (arg != "") snd += ",\"args\": [\"" + arg + "\"]";
280
281 snd += "}";
282
283 rogue::GilRelease noGil;
284 std::lock_guard<std::mutex> lock(reqLock_);
285 zmq_send(this->zmqReq_, snd.c_str(), snd.size(), 0);
286
287 while (1) {
288 zmq_msg_init(&msg);
289 if (zmq_recvmsg(this->zmqReq_, &msg, 0) <= 0) {
290 seconds += static_cast<double>(timeout_) / 1000.0;
291 // failTime_ == 0 preserves the historic forever-retry contract
292 // (issue #1236). A finite failTime_ throws once the accumulated
293 // wait reaches the deadline; stopping_ breaks a forever loop on stop().
294 const bool deadlineReached = (failTime_ != 0 && seconds * 1000.0 >= static_cast<double>(failTime_));
295 if (!deadlineReached && !stopping_) {
296 logWaitRetry(log_, seconds, lastLoggedSeconds);
297 zmq_msg_close(&msg);
298 } else {
299 zmq_msg_close(&msg);
300 throw rogue::GeneralError::create("ZmqClient::sendString",
301 "Timeout waiting for response after %d Seconds.",
302 static_cast<int>(seconds));
303 }
304 } else {
305 break;
306 }
307 }
308
309 logWaitRecovered(log_, seconds);
310
311 data = std::string((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
312 zmq_msg_close(&msg);
313 return data;
314}
315
316std::string rogue::interfaces::ZmqClient::getDisp(const std::string& path) {
317 return sendString(path, "getDisp", "");
318}
319
320void rogue::interfaces::ZmqClient::setDisp(const std::string& path, const std::string& value) {
321 sendString(path, "setDisp", value);
322}
323
324std::string rogue::interfaces::ZmqClient::exec(const std::string& path, const std::string& arg) {
325 return sendString(path, "__call__", arg);
326}
327
328std::string rogue::interfaces::ZmqClient::valueDisp(const std::string& path) {
329 return sendString(path, "valueDisp", "");
330}
331
332#ifndef NO_PYTHON
333
334bp::object rogue::interfaces::ZmqClient::send(bp::object value) {
335 zmq_msg_t txMsg;
336 zmq_msg_t rxMsg;
337 Py_buffer valueBuf;
338 bp::object ret;
339 double seconds = 0;
340 uint32_t lastLoggedSeconds = 0;
341
342 if (doString_) throw rogue::GeneralError::create("ZmqClient::send", "Invalid send call in string mode");
343
344 // PyErr_Print surfaces the Python-level error (e.g. TypeError from a
345 // non-buffer argument) AND clears the thread's error indicator; without
346 // it boost::python may observe "exception already set" when translating
347 // our throw, masking the original cause.
348 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0) {
349 PyErr_Print();
350 throw(rogue::GeneralError::create("ZmqClient::send", "Failed to extract object data"));
351 }
352
353 // zmq_msg_init_size returns -1 on allocation failure; the message is left
354 // uninitialized so memcpy/zmq_msg_data must not run, and we still own
355 // valueBuf and have to release it before propagating the error.
356 if (zmq_msg_init_size(&txMsg, valueBuf.len) < 0) {
357 PyBuffer_Release(&valueBuf);
358 throw(rogue::GeneralError::create("ZmqClient::send", "zmq_msg_init_size failed"));
359 }
360 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
361 PyBuffer_Release(&valueBuf);
362
363 {
364 rogue::GilRelease noGil;
365 std::lock_guard<std::mutex> lock(reqLock_);
366 // On success, zmq_sendmsg transfers ownership and zeroes the message;
367 // on failure (-1, e.g. ETERM during shutdown) we retain ownership and
368 // must close to avoid leaking the libzmq message buffer. Throwing here
369 // also prevents the recv loop below from blocking on a request that
370 // was never put on the wire.
371 if (zmq_sendmsg(this->zmqReq_, &txMsg, 0) < 0) {
372 // Capture errno before zmq_msg_close, which may overwrite it.
373 // ETERM here is the issue #1234 signature: VirtualClient.__init__
374 // tore down the context via _cleanupFailedInit() but returned
375 // without raising, so the caller is now sending on a dead socket.
376 int err = zmq_errno();
377 zmq_msg_close(&txMsg);
378 throw rogue::GeneralError::create("ZmqClient::send",
379 "zmq_sendmsg failed: errno=%d %s",
380 err,
381 zmq_strerror(err));
382 }
383
384 while (1) {
385 zmq_msg_init(&rxMsg);
386 if (zmq_recvmsg(this->zmqReq_, &rxMsg, 0) <= 0) {
387 seconds += static_cast<double>(timeout_) / 1000.0;
388 // failTime_ == 0 preserves the historic forever-retry contract
389 // (issue #1236). A finite failTime_ throws once the accumulated
390 // wait reaches the deadline; stopping_ breaks a forever loop on stop().
391 const bool deadlineReached = (failTime_ != 0 && seconds * 1000.0 >= static_cast<double>(failTime_));
392 if (!deadlineReached && !stopping_) {
393 logWaitRetry(log_, seconds, lastLoggedSeconds);
394 zmq_msg_close(&rxMsg);
395 } else {
396 zmq_msg_close(&rxMsg);
398 "ZmqClient::send",
399 "Timeout waiting for response after %d Seconds, server may be busy!",
400 static_cast<int>(seconds));
401 }
402 } else {
403 break;
404 }
405 }
406 }
407
408 logWaitRecovered(log_, seconds);
409
410 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
411
412 // PyErr_Print surfaces the underlying Python error (e.g. MemoryError) AND
413 // clears the thread's error indicator; without it boost::python may
414 // observe "exception already set" when translating our throw.
415 if (val == NULL) {
416 PyErr_Print();
417 zmq_msg_close(&rxMsg);
418 throw(rogue::GeneralError::create("ZmqClient::send", "Failed to generate bytearray"));
419 }
420
421 zmq_msg_close(&rxMsg);
422
423 bp::handle<> handle(val);
424 ret = bp::object(handle);
425 return ret;
426}
427
429
430rogue::interfaces::ZmqClientWrap::ZmqClientWrap(const std::string& addr, uint16_t port, bool doString)
431 : rogue::interfaces::ZmqClient(addr, port, doString) {}
432
434 if (bp::override f = this->get_override("_doUpdate")) {
435 try {
436 f(data);
437 } catch (...) {
438 PyErr_Print();
439 }
440 }
442}
443
447
448#endif
449
450void rogue::interfaces::ZmqClient::runThread() {
451 zmq_msg_t msg;
452
453 log_->logThreadId();
454
455 while (threadEn_) {
456 zmq_msg_init(&msg);
457
458 // Get the message
459 if (zmq_recvmsg(this->zmqSub_, &msg, 0) > 0) {
460#ifndef NO_PYTHON
461 try {
463 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&msg), zmq_msg_size(&msg));
464
465 // PyErr_Print surfaces the Python-level error (e.g. MemoryError) AND clears
466 // the thread's error indicator; without this, the pending exception leaks
467 // into the next loop iteration's Py_BuildValue/bp::object calls.
468 if (val == NULL) {
469 PyErr_Print();
470 throw(rogue::GeneralError::create("ZmqClient::runThread", "Failed to generate bytearray"));
471 }
472
473 bp::handle<> handle(val);
474 bp::object dat = bp::object(handle);
475 this->doUpdate(dat);
476 } catch (const std::exception& e) {
477 log_->warning("ZmqClient::runThread: dropping update after exception: %s", e.what());
478 } catch (...) {
479 log_->warning("ZmqClient::runThread: dropping update after unknown exception");
480 }
481#endif
482 }
483 // Close even on RCVTIMEO: zmq_recvmsg() initializes msg regardless.
484 zmq_msg_close(&msg);
485 }
486}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
RAII helper that acquires the Python GIL for a scope.
Definition ScopedGil.h:35
void defDoUpdate(boost::python::object data)
Calls base-class doUpdate() implementation.
void doUpdate(boost::python::object data)
Handles an update message from the subscription path.
ZmqClientWrap(const std::string &addr, uint16_t port, bool doString)
Constructs wrapper client.
ZeroMQ client for Rogue control and update messaging.
Definition ZmqClient.h:50
std::string sendString(const std::string &path, const std::string &attr, const std::string &arg)
Sends a string-mode request.
void stop()
Stops client sockets and background thread.
std::string getDisp(const std::string &path)
Reads display-formatted value at a path (string mode).
virtual void doUpdate(boost::python::object data)
Handles async update payloads received on subscriber socket.
std::string exec(const std::string &path, const std::string &arg="")
Executes callable node at path (string mode).
boost::python::object send(boost::python::object data)
Sends binary request payload and receives binary response.
static void setup_python()
Registers Python bindings for this class.
Definition ZmqClient.cpp:96
void setTimeout(uint32_t warnTime, uint32_t failTime=0)
Sets request timeout behavior.
std::string valueDisp(const std::string &path)
Reads compact value display at a path (string mode).
static std::shared_ptr< rogue::interfaces::ZmqClient > create(const std::string &addr, uint16_t port, bool doString)
Creates a ZeroMQ client.
Definition ZmqClient.cpp:90
ZmqClient(const std::string &addr, uint16_t port, bool doString)
Constructs a ZeroMQ client and connects sockets.
virtual ~ZmqClient()
Destroys client and stops background activity.
void setDisp(const std::string &path, const std::string &value)
Writes display-formatted value at a path (string mode).
std::shared_ptr< rogue::interfaces::ZmqClient > ZmqClientPtr
Definition ZmqClient.h:209
std::shared_ptr< rogue::Logging > LoggingPtr
Shared pointer alias for Logging.
Definition Logging.h:205