rogue
Loading...
Searching...
No Matches
ZmqClient.cpp
Go to the documentation of this file.
1
18#include "rogue/Directives.h"
19
21
22#include <inttypes.h>
23#include <zmq.h>
24
25#include <cstdio>
26#include <memory>
27#include <string>
28
29#include "rogue/GeneralError.h"
30#include "rogue/GilRelease.h"
31#include "rogue/ScopedGil.h"
32
33#ifndef NO_PYTHON
34 #include <boost/python.hpp>
35namespace bp = boost::python;
36#endif
37
38namespace {
39
40constexpr uint32_t kRetryLogIntervalSeconds = 30;
41
42void logWaitRetry(rogue::LoggingPtr log, double seconds, uint32_t& lastLoggedSeconds) {
43 uint32_t elapsedSeconds = static_cast<uint32_t>(seconds);
44
45 if (elapsedSeconds == 0) return;
46
47 if (lastLoggedSeconds == 0) {
48 log->warning(
49 "Timeout waiting for response after %" PRIu32 " seconds, server may be busy. Continuing to wait...",
50 elapsedSeconds);
51 lastLoggedSeconds = elapsedSeconds;
52 return;
53 }
54
55 if (elapsedSeconds >= (lastLoggedSeconds + kRetryLogIntervalSeconds)) {
56 log->warning("Still waiting for response after %" PRIu32 " seconds, server may be busy...", elapsedSeconds);
57 lastLoggedSeconds = elapsedSeconds;
58 }
59}
60
61void logWaitRecovered(rogue::LoggingPtr log, double seconds) {
62 uint32_t elapsedSeconds = static_cast<uint32_t>(seconds);
63
64 if (elapsedSeconds != 0) {
65 log->warning("Received response from server after %" PRIu32 " seconds.", elapsedSeconds);
66 }
67}
68
69} // namespace
70
71rogue::interfaces::ZmqClientPtr rogue::interfaces::ZmqClient::create(const std::string& addr, uint16_t port, bool doString) {
72 rogue::interfaces::ZmqClientPtr ret = std::make_shared<rogue::interfaces::ZmqClient>(addr, port, doString);
73 return (ret);
74}
75
78#ifndef NO_PYTHON
79
80 bp::class_<rogue::interfaces::ZmqClientWrap, rogue::interfaces::ZmqClientWrapPtr, boost::noncopyable>(
81 "ZmqClient",
82 bp::init<std::string, uint16_t, bool>())
85 .def("setTimeout",
87 (bp::arg("msecs"), bp::arg("waitRetry"), bp::arg("maxRetries") = 0u))
93#endif
94}
95
96rogue::interfaces::ZmqClient::ZmqClient(const std::string& addr, uint16_t port, bool doString) {
97 std::string temp;
98 uint32_t val;
99 uint32_t reqPort;
100
101 this->doString_ = doString;
102 this->zmqCtx_ = zmq_ctx_new();
103 this->zmqSub_ = zmq_socket(this->zmqCtx_, ZMQ_SUB);
104 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_REQ);
105
106 log_ = rogue::Logging::create("ZmqClient");
107
108 // running_ is the dtor's cleanup sentinel; it is set last on success, so a
109 // throw in here would otherwise leak the context and sockets.
110 try {
111 if (!doString_) {
112 // Setup sub port
113 temp = "tcp://";
114 temp.append(addr);
115 temp.append(":");
116 temp.append(std::to_string(static_cast<int64_t>(port)));
117
118 if (zmq_setsockopt(this->zmqSub_, ZMQ_SUBSCRIBE, "", 0) != 0)
119 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket subscribe"));
120
121 val = 0;
122 if (zmq_setsockopt(this->zmqSub_, ZMQ_LINGER, &val, sizeof(int32_t)) != 0)
123 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket linger"));
124
125 val = 100;
126 if (zmq_setsockopt(this->zmqSub_, ZMQ_RCVTIMEO, &val, sizeof(int32_t)) != 0)
127 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket timeout"));
128
129 if (zmq_connect(this->zmqSub_, temp.c_str()) < 0)
130 throw(rogue::GeneralError::create("ZmqClient::ZmqClient",
131 "Failed to connect to port %" PRIu16 " at address %s",
132 port,
133 addr.c_str()));
134
135 reqPort = port + 1;
136 } else {
137 reqPort = port + 2;
138 }
139
140 // Setup request port
141 temp = "tcp://";
142 temp.append(addr);
143 temp.append(":");
144 temp.append(std::to_string(static_cast<int64_t>(reqPort)));
145
146 waitRetry_ = false; // Don't keep waiting after timeout
147 timeout_ = 1000; // 1 second
148 maxRetries_ = 0; // unbounded retry budget when waitRetry_ becomes true
149 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_, sizeof(int32_t)) != 0)
150 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket timeout"));
151
152 val = 1;
153 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_CORRELATE, &val, sizeof(int32_t)) != 0)
154 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket correlate"));
155
156 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_RELAXED, &val, sizeof(int32_t)) != 0)
157 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket relaxed"));
158
159 val = 0;
160 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &val, sizeof(int32_t)) != 0)
161 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket linger"));
162
163 if (zmq_connect(this->zmqReq_, temp.c_str()) < 0)
164 throw(rogue::GeneralError::create("ZmqClient::ZmqClient",
165 "Failed to connect to port %" PRIu32 " at address %s",
166 reqPort,
167 addr.c_str()));
168
169 if (doString_) {
170 threadEn_ = false;
171 log_->info("Connected to Rogue server at port %" PRIu32, reqPort);
172 } else {
173 log_->info("Connected to Rogue server at ports %" PRIu16 ":%" PRIu32, port, reqPort);
174
175 threadEn_ = true;
176 thread_ = new std::thread(&rogue::interfaces::ZmqClient::runThread, this);
177 }
178 running_ = true;
179 } catch (...) {
180 if (zmqSub_ != nullptr) {
181 zmq_close(zmqSub_);
182 zmqSub_ = nullptr;
183 }
184 if (zmqReq_ != nullptr) {
185 zmq_close(zmqReq_);
186 zmqReq_ = nullptr;
187 }
188 if (zmqCtx_ != nullptr) {
189 zmq_ctx_destroy(zmqCtx_);
190 zmqCtx_ = nullptr;
191 }
192 throw;
193 }
194}
195
199
201 if (running_) {
202 running_ = false;
203 if (threadEn_) {
204 rogue::GilRelease noGil;
205 waitRetry_ = false;
206 threadEn_ = false;
207 thread_->join();
208 delete thread_;
209 thread_ = nullptr;
210 }
211 if (!doString_) zmq_close(this->zmqSub_);
212 zmq_close(this->zmqReq_);
213 zmq_ctx_destroy(this->zmqCtx_);
214 }
215}
216
217void rogue::interfaces::ZmqClient::setTimeout(uint32_t msecs, bool waitRetry, uint32_t maxRetries) {
218 // ZMQ sockets are not thread-safe; serialize zmqReq_ access.
219 rogue::GilRelease noGil;
220 std::lock_guard<std::mutex> lock(reqLock_);
221
222 waitRetry_ = waitRetry;
223 timeout_ = msecs;
224 maxRetries_ = maxRetries;
225
226 // %d (not PRIu8): bool promotes to int through varargs.
227 log_->debug("Setting timeout to %" PRIu32 " msecs, waitRetry = %d, maxRetries = %" PRIu32,
228 timeout_,
229 waitRetry_,
230 maxRetries_);
231
232 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_, sizeof(int32_t)) != 0)
233 throw(rogue::GeneralError("ZmqClient::setTimeout", "Failed to set socket timeout"));
234}
235
236std::string rogue::interfaces::ZmqClient::sendString(const std::string& path, const std::string& attr, const std::string& arg) {
237 std::string snd;
238 std::string ret;
239 zmq_msg_t msg;
240 std::string data;
241 double seconds = 0;
242 uint32_t lastLoggedSeconds = 0;
243
244 if (!doString_) throw rogue::GeneralError::create("ZmqClient::sendString", "Invalid send call in standard mode");
245
246 snd = "{\"attr\": \"" + attr + "\",";
247 snd += "\"path\": \"" + path + "\"";
248
249 if (arg != "") snd += ",\"args\": [\"" + arg + "\"]";
250
251 snd += "}";
252
253 rogue::GilRelease noGil;
254 std::lock_guard<std::mutex> lock(reqLock_);
255 zmq_send(this->zmqReq_, snd.c_str(), snd.size(), 0);
256
257 uint32_t retryCount = 0;
258
259 while (1) {
260 zmq_msg_init(&msg);
261 if (zmq_recvmsg(this->zmqReq_, &msg, 0) <= 0) {
262 seconds += static_cast<double>(timeout_) / 1000.0;
263 // maxRetries_ == 0 preserves the historic unbounded-retry contract
264 // (issue #1236). Caps the loop only when the caller explicitly
265 // requested a finite retry budget via setTimeout(..., maxRetries).
266 const bool retryBudgetExhausted = (maxRetries_ != 0 && ++retryCount >= maxRetries_);
267 if (waitRetry_ && !retryBudgetExhausted) {
268 logWaitRetry(log_, seconds, lastLoggedSeconds);
269 zmq_msg_close(&msg);
270 } else {
271 zmq_msg_close(&msg);
272 throw rogue::GeneralError::create("ZmqClient::sendString",
273 "Timeout waiting for response after %d Seconds.",
274 static_cast<int>(seconds));
275 }
276 } else {
277 break;
278 }
279 }
280
281 logWaitRecovered(log_, seconds);
282
283 data = std::string((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
284 zmq_msg_close(&msg);
285 return data;
286}
287
288std::string rogue::interfaces::ZmqClient::getDisp(const std::string& path) {
289 return sendString(path, "getDisp", "");
290}
291
292void rogue::interfaces::ZmqClient::setDisp(const std::string& path, const std::string& value) {
293 sendString(path, "setDisp", value);
294}
295
296std::string rogue::interfaces::ZmqClient::exec(const std::string& path, const std::string& arg) {
297 return sendString(path, "__call__", arg);
298}
299
300std::string rogue::interfaces::ZmqClient::valueDisp(const std::string& path) {
301 return sendString(path, "valueDisp", "");
302}
303
304#ifndef NO_PYTHON
305
306bp::object rogue::interfaces::ZmqClient::send(bp::object value) {
307 zmq_msg_t txMsg;
308 zmq_msg_t rxMsg;
309 Py_buffer valueBuf;
310 bp::object ret;
311 double seconds = 0;
312 uint32_t lastLoggedSeconds = 0;
313
314 if (doString_) throw rogue::GeneralError::create("ZmqClient::send", "Invalid send call in string mode");
315
316 // PyErr_Print surfaces the Python-level error (e.g. TypeError from a
317 // non-buffer argument) AND clears the thread's error indicator; without
318 // it boost::python may observe "exception already set" when translating
319 // our throw, masking the original cause.
320 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0) {
321 PyErr_Print();
322 throw(rogue::GeneralError::create("ZmqClient::send", "Failed to extract object data"));
323 }
324
325 // zmq_msg_init_size returns -1 on allocation failure; the message is left
326 // uninitialized so memcpy/zmq_msg_data must not run, and we still own
327 // valueBuf and have to release it before propagating the error.
328 if (zmq_msg_init_size(&txMsg, valueBuf.len) < 0) {
329 PyBuffer_Release(&valueBuf);
330 throw(rogue::GeneralError::create("ZmqClient::send", "zmq_msg_init_size failed"));
331 }
332 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
333 PyBuffer_Release(&valueBuf);
334
335 {
336 rogue::GilRelease noGil;
337 std::lock_guard<std::mutex> lock(reqLock_);
338 // On success, zmq_sendmsg transfers ownership and zeroes the message;
339 // on failure (-1, e.g. ETERM during shutdown) we retain ownership and
340 // must close to avoid leaking the libzmq message buffer. Throwing here
341 // also prevents the recv loop below from blocking on a request that
342 // was never put on the wire.
343 if (zmq_sendmsg(this->zmqReq_, &txMsg, 0) < 0) {
344 // Capture errno before zmq_msg_close, which may overwrite it.
345 // ETERM here is the issue #1234 signature: VirtualClient.__init__
346 // tore down the context via _cleanupFailedInit() but returned
347 // without raising, so the caller is now sending on a dead socket.
348 int err = zmq_errno();
349 zmq_msg_close(&txMsg);
350 throw rogue::GeneralError::create("ZmqClient::send",
351 "zmq_sendmsg failed: errno=%d %s",
352 err,
353 zmq_strerror(err));
354 }
355
356 uint32_t retryCount = 0;
357
358 while (1) {
359 zmq_msg_init(&rxMsg);
360 if (zmq_recvmsg(this->zmqReq_, &rxMsg, 0) <= 0) {
361 seconds += static_cast<double>(timeout_) / 1000.0;
362 // maxRetries_ == 0 preserves the historic unbounded-retry contract
363 // (issue #1236). Caps the loop only when the caller explicitly
364 // requested a finite retry budget via setTimeout(..., maxRetries).
365 const bool retryBudgetExhausted = (maxRetries_ != 0 && ++retryCount >= maxRetries_);
366 if (waitRetry_ && !retryBudgetExhausted) {
367 logWaitRetry(log_, seconds, lastLoggedSeconds);
368 zmq_msg_close(&rxMsg);
369 } else {
370 zmq_msg_close(&rxMsg);
372 "ZmqClient::send",
373 "Timeout waiting for response after %d Seconds, server may be busy!",
374 static_cast<int>(seconds));
375 }
376 } else {
377 break;
378 }
379 }
380 }
381
382 logWaitRecovered(log_, seconds);
383
384 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
385
386 // PyErr_Print surfaces the underlying Python error (e.g. MemoryError) AND
387 // clears the thread's error indicator; without it boost::python may
388 // observe "exception already set" when translating our throw.
389 if (val == NULL) {
390 PyErr_Print();
391 zmq_msg_close(&rxMsg);
392 throw(rogue::GeneralError::create("ZmqClient::send", "Failed to generate bytearray"));
393 }
394
395 zmq_msg_close(&rxMsg);
396
397 bp::handle<> handle(val);
398 ret = bp::object(handle);
399 return ret;
400}
401
403
404rogue::interfaces::ZmqClientWrap::ZmqClientWrap(const std::string& addr, uint16_t port, bool doString)
405 : rogue::interfaces::ZmqClient(addr, port, doString) {}
406
408 if (bp::override f = this->get_override("_doUpdate")) {
409 try {
410 f(data);
411 } catch (...) {
412 PyErr_Print();
413 }
414 }
416}
417
421
422#endif
423
424void rogue::interfaces::ZmqClient::runThread() {
425 zmq_msg_t msg;
426
427 log_->logThreadId();
428
429 while (threadEn_) {
430 zmq_msg_init(&msg);
431
432 // Get the message
433 if (zmq_recvmsg(this->zmqSub_, &msg, 0) > 0) {
434#ifndef NO_PYTHON
435 try {
437 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&msg), zmq_msg_size(&msg));
438
439 // PyErr_Print surfaces the Python-level error (e.g. MemoryError) AND clears
440 // the thread's error indicator; without this, the pending exception leaks
441 // into the next loop iteration's Py_BuildValue/bp::object calls.
442 if (val == NULL) {
443 PyErr_Print();
444 throw(rogue::GeneralError::create("ZmqClient::runThread", "Failed to generate bytearray"));
445 }
446
447 bp::handle<> handle(val);
448 bp::object dat = bp::object(handle);
449 this->doUpdate(dat);
450 } catch (const std::exception& e) {
451 log_->warning("ZmqClient::runThread: dropping update after exception: %s", e.what());
452 } catch (...) {
453 log_->warning("ZmqClient::runThread: dropping update after unknown exception");
454 }
455#endif
456 }
457 // Close even on RCVTIMEO: zmq_recvmsg() initializes msg regardless.
458 zmq_msg_close(&msg);
459 }
460}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
RAII helper that acquires the Python GIL for a scope.
Definition ScopedGil.h:35
void defDoUpdate(boost::python::object data)
Calls base-class doUpdate() implementation.
void doUpdate(boost::python::object data)
Handles an update message from the subscription path.
ZmqClientWrap(const std::string &addr, uint16_t port, bool doString)
Constructs wrapper client.
ZeroMQ client for Rogue control and update messaging.
Definition ZmqClient.h:50
std::string sendString(const std::string &path, const std::string &attr, const std::string &arg)
Sends a string-mode request.
void stop()
Stops client sockets and background thread.
std::string getDisp(const std::string &path)
Reads display-formatted value at a path (string mode).
virtual void doUpdate(boost::python::object data)
Handles async update payloads received on subscriber socket.
std::string exec(const std::string &path, const std::string &arg="")
Executes callable node at path (string mode).
boost::python::object send(boost::python::object data)
Sends binary request payload and receives binary response.
static void setup_python()
Registers Python bindings for this class.
Definition ZmqClient.cpp:77
std::string valueDisp(const std::string &path)
Reads compact value display at a path (string mode).
static std::shared_ptr< rogue::interfaces::ZmqClient > create(const std::string &addr, uint16_t port, bool doString)
Creates a ZeroMQ client.
Definition ZmqClient.cpp:71
ZmqClient(const std::string &addr, uint16_t port, bool doString)
Constructs a ZeroMQ client and connects sockets.
Definition ZmqClient.cpp:96
virtual ~ZmqClient()
Destroys client and stops background activity.
void setTimeout(uint32_t msecs, bool waitRetry, uint32_t maxRetries=0)
Sets request timeout behavior.
void setDisp(const std::string &path, const std::string &value)
Writes display-formatted value at a path (string mode).
std::shared_ptr< rogue::interfaces::ZmqClient > ZmqClientPtr
Definition ZmqClient.h:197
std::shared_ptr< rogue::Logging > LoggingPtr
Shared pointer alias for Logging.
Definition Logging.h:205