rogue
Loading...
Searching...
No Matches
ZmqClient.cpp
Go to the documentation of this file.
1
18#include "rogue/Directives.h"
19
21
22#include <inttypes.h>
23#include <zmq.h>
24
25#include <cstdio>
26#include <memory>
27#include <string>
28
29#include "rogue/GeneralError.h"
30#include "rogue/GilRelease.h"
31#include "rogue/ScopedGil.h"
32
33#ifndef NO_PYTHON
34 #include <boost/python.hpp>
35namespace bp = boost::python;
36#endif
37
38namespace {
39
40constexpr uint32_t kRetryLogIntervalSeconds = 30;
41
42void logWaitRetry(rogue::LoggingPtr log, double seconds, uint32_t& lastLoggedSeconds) {
43 uint32_t elapsedSeconds = static_cast<uint32_t>(seconds);
44
45 if (elapsedSeconds == 0) return;
46
47 if (lastLoggedSeconds == 0) {
48 log->warning(
49 "Timeout waiting for response after %" PRIu32 " seconds, server may be busy. Continuing to wait...",
50 elapsedSeconds);
51 lastLoggedSeconds = elapsedSeconds;
52 return;
53 }
54
55 if (elapsedSeconds >= (lastLoggedSeconds + kRetryLogIntervalSeconds)) {
56 log->warning("Still waiting for response after %" PRIu32 " seconds, server may be busy...", elapsedSeconds);
57 lastLoggedSeconds = elapsedSeconds;
58 }
59}
60
61void logWaitRecovered(rogue::LoggingPtr log, double seconds) {
62 uint32_t elapsedSeconds = static_cast<uint32_t>(seconds);
63
64 if (elapsedSeconds != 0) {
65 log->warning("Received response from server after %" PRIu32 " seconds.", elapsedSeconds);
66 }
67}
68
69} // namespace
70
71rogue::interfaces::ZmqClientPtr rogue::interfaces::ZmqClient::create(const std::string& addr, uint16_t port, bool doString) {
72 rogue::interfaces::ZmqClientPtr ret = std::make_shared<rogue::interfaces::ZmqClient>(addr, port, doString);
73 return (ret);
74}
75
78#ifndef NO_PYTHON
79
80 bp::class_<rogue::interfaces::ZmqClientWrap, rogue::interfaces::ZmqClientWrapPtr, boost::noncopyable>(
81 "ZmqClient",
82 bp::init<std::string, uint16_t, bool>())
91#endif
92}
93
94rogue::interfaces::ZmqClient::ZmqClient(const std::string& addr, uint16_t port, bool doString) {
95 std::string temp;
96 uint32_t val;
97 uint32_t reqPort;
98
99 this->doString_ = doString;
100 this->zmqCtx_ = zmq_ctx_new();
101 this->zmqSub_ = zmq_socket(this->zmqCtx_, ZMQ_SUB);
102 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_REQ);
103
104 log_ = rogue::Logging::create("ZmqClient");
105
106 if (!doString_) {
107 // Setup sub port
108 temp = "tcp://";
109 temp.append(addr);
110 temp.append(":");
111 temp.append(std::to_string(static_cast<int64_t>(port)));
112
113 if (zmq_setsockopt(this->zmqSub_, ZMQ_SUBSCRIBE, "", 0) != 0)
114 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket subscribe"));
115
116 val = 0;
117 if (zmq_setsockopt(this->zmqSub_, ZMQ_LINGER, &val, sizeof(int32_t)) != 0)
118 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket linger"));
119
120 val = 100;
121 if (zmq_setsockopt(this->zmqSub_, ZMQ_RCVTIMEO, &val, sizeof(int32_t)) != 0)
122 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket timeout"));
123
124 if (zmq_connect(this->zmqSub_, temp.c_str()) < 0)
125 throw(rogue::GeneralError::create("ZmqClient::ZmqClient",
126 "Failed to connect to port %" PRIu16 " at address %s",
127 port,
128 addr.c_str()));
129
130 reqPort = port + 1;
131 } else {
132 reqPort = port + 2;
133 }
134
135 // Setup request port
136 temp = "tcp://";
137 temp.append(addr);
138 temp.append(":");
139 temp.append(std::to_string(static_cast<int64_t>(reqPort)));
140
141 waitRetry_ = false; // Don't keep waiting after timeout
142 timeout_ = 1000; // 1 second
143 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_, sizeof(int32_t)) != 0)
144 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket timeout"));
145
146 val = 1;
147 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_CORRELATE, &val, sizeof(int32_t)) != 0)
148 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket correlate"));
149
150 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_RELAXED, &val, sizeof(int32_t)) != 0)
151 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket relaxed"));
152
153 val = 0;
154 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &val, sizeof(int32_t)) != 0)
155 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket linger"));
156
157 if (zmq_connect(this->zmqReq_, temp.c_str()) < 0)
158 throw(rogue::GeneralError::create("ZmqClient::ZmqClient",
159 "Failed to connect to port %" PRIu32 " at address %s",
160 reqPort,
161 addr.c_str()));
162
163 if (doString_) {
164 threadEn_ = false;
165 log_->info("Connected to Rogue server at port %" PRIu32, reqPort);
166 } else {
167 log_->info("Connected to Rogue server at ports %" PRIu16 ":%" PRIu32, port, reqPort);
168
169 threadEn_ = true;
170 thread_ = new std::thread(&rogue::interfaces::ZmqClient::runThread, this);
171 }
172 running_ = true;
173}
174
178
180 if (running_) {
181 running_ = false;
182 if (threadEn_) {
183 rogue::GilRelease noGil;
184 waitRetry_ = false;
185 threadEn_ = false;
186 thread_->join();
187 }
188 if (!doString_) zmq_close(this->zmqSub_);
189 zmq_close(this->zmqReq_);
190 zmq_ctx_destroy(this->zmqCtx_);
191 }
192}
193
194void rogue::interfaces::ZmqClient::setTimeout(uint32_t msecs, bool waitRetry) {
195 waitRetry_ = waitRetry;
196 timeout_ = msecs;
197
198 log_->debug("Setting timeout to %" PRIu32 " msecs, waitRetry = %" PRIu8, timeout_, waitRetry_);
199
200 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_, sizeof(int32_t)) != 0)
201 throw(rogue::GeneralError("ZmqClient::setTimeout", "Failed to set socket timeout"));
202}
203
204std::string rogue::interfaces::ZmqClient::sendString(const std::string& path, const std::string& attr, const std::string& arg) {
205 std::string snd;
206 std::string ret;
207 zmq_msg_t msg;
208 std::string data;
209 double seconds = 0;
210 uint32_t lastLoggedSeconds = 0;
211
212 if (!doString_) throw rogue::GeneralError::create("ZmqClient::sendString", "Invalid send call in standard mode");
213
214 snd = "{\"attr\": \"" + attr + "\",";
215 snd += "\"path\": \"" + path + "\"";
216
217 if (arg != "") snd += ",\"args\": [\"" + arg + "\"]";
218
219 snd += "}";
220
221 rogue::GilRelease noGil;
222 zmq_send(this->zmqReq_, snd.c_str(), snd.size(), 0);
223
224 while (1) {
225 zmq_msg_init(&msg);
226 if (zmq_recvmsg(this->zmqReq_, &msg, 0) <= 0) {
227 seconds += static_cast<double>(timeout_) / 1000.0;
228 if (waitRetry_) {
229 logWaitRetry(log_, seconds, lastLoggedSeconds);
230 zmq_msg_close(&msg);
231 } else {
232 throw rogue::GeneralError::create("ZmqClient::sendString",
233 "Timeout waiting for response after %d Seconds.",
234 static_cast<int>(seconds));
235 }
236 } else {
237 break;
238 }
239 }
240
241 logWaitRecovered(log_, seconds);
242
243 data = std::string((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
244 zmq_msg_close(&msg);
245 return data;
246}
247
248std::string rogue::interfaces::ZmqClient::getDisp(const std::string& path) {
249 return sendString(path, "getDisp", "");
250}
251
252void rogue::interfaces::ZmqClient::setDisp(const std::string& path, const std::string& value) {
253 sendString(path, "setDisp", value);
254}
255
256std::string rogue::interfaces::ZmqClient::exec(const std::string& path, const std::string& arg) {
257 return sendString(path, "__call__", arg);
258}
259
260std::string rogue::interfaces::ZmqClient::valueDisp(const std::string& path) {
261 return sendString(path, "valueDisp", "");
262}
263
264#ifndef NO_PYTHON
265
266bp::object rogue::interfaces::ZmqClient::send(bp::object value) {
267 zmq_msg_t txMsg;
268 zmq_msg_t rxMsg;
269 Py_buffer valueBuf;
270 bp::object ret;
271 double seconds = 0;
272 uint32_t lastLoggedSeconds = 0;
273
274 if (doString_) throw rogue::GeneralError::create("ZmqClient::send", "Invalid send call in string mode");
275
276 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0)
277 throw(rogue::GeneralError::create("ZmqClient::send", "Failed to extract object data"));
278
279 zmq_msg_init_size(&txMsg, valueBuf.len);
280 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
281 PyBuffer_Release(&valueBuf);
282
283 {
284 rogue::GilRelease noGil;
285 zmq_sendmsg(this->zmqReq_, &txMsg, 0);
286
287 while (1) {
288 zmq_msg_init(&rxMsg);
289 if (zmq_recvmsg(this->zmqReq_, &rxMsg, 0) <= 0) {
290 seconds += static_cast<double>(timeout_) / 1000.0;
291 if (waitRetry_) {
292 logWaitRetry(log_, seconds, lastLoggedSeconds);
293 zmq_msg_close(&rxMsg);
294 } else {
296 "ZmqClient::send",
297 "Timeout waiting for response after %d Seconds, server may be busy!",
298 static_cast<int>(seconds));
299 }
300 } else {
301 break;
302 }
303 }
304 }
305
306 logWaitRecovered(log_, seconds);
307
308 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
309
310 if (val == NULL) throw(rogue::GeneralError::create("ZmqClient::send", "Failed to generate bytearray"));
311
312 zmq_msg_close(&rxMsg);
313
314 bp::handle<> handle(val);
315 ret = bp::object(handle);
316 return ret;
317}
318
320
321rogue::interfaces::ZmqClientWrap::ZmqClientWrap(const std::string& addr, uint16_t port, bool doString)
322 : rogue::interfaces::ZmqClient(addr, port, doString) {}
323
325 if (bp::override f = this->get_override("_doUpdate")) {
326 try {
327 f(data);
328 } catch (...) {
329 PyErr_Print();
330 }
331 }
333}
334
338
339#endif
340
341void rogue::interfaces::ZmqClient::runThread() {
342 zmq_msg_t msg;
343
344 log_->logThreadId();
345
346 while (threadEn_) {
347 zmq_msg_init(&msg);
348
349 // Get the message
350 if (zmq_recvmsg(this->zmqSub_, &msg, 0) > 0) {
351#ifndef NO_PYTHON
353 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&msg), zmq_msg_size(&msg));
354 bp::handle<> handle(val);
355 bp::object dat = bp::object(handle);
356 this->doUpdate(dat);
357#endif
358 zmq_msg_close(&msg);
359 } else {
360 zmq_msg_close(&msg);
361 }
362 }
363}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
RAII helper that acquires the Python GIL for a scope.
Definition ScopedGil.h:35
void defDoUpdate(boost::python::object data)
Calls base-class doUpdate() implementation.
void doUpdate(boost::python::object data)
Handles an update message from the subscription path.
ZmqClientWrap(const std::string &addr, uint16_t port, bool doString)
Constructs wrapper client.
ZeroMQ client for Rogue control and update messaging.
Definition ZmqClient.h:48
std::string sendString(const std::string &path, const std::string &attr, const std::string &arg)
Sends a string-mode request.
void stop()
Stops client sockets and background thread.
std::string getDisp(const std::string &path)
Reads display-formatted value at a path (string mode).
virtual void doUpdate(boost::python::object data)
Handles async update payloads received on subscriber socket.
std::string exec(const std::string &path, const std::string &arg="")
Executes callable node at path (string mode).
boost::python::object send(boost::python::object data)
Sends binary request payload and receives binary response.
static void setup_python()
Registers Python bindings for this class.
Definition ZmqClient.cpp:77
std::string valueDisp(const std::string &path)
Reads compact value display at a path (string mode).
static std::shared_ptr< rogue::interfaces::ZmqClient > create(const std::string &addr, uint16_t port, bool doString)
Creates a ZeroMQ client.
Definition ZmqClient.cpp:71
void setTimeout(uint32_t msecs, bool waitRetry)
Sets request timeout behavior.
ZmqClient(const std::string &addr, uint16_t port, bool doString)
Constructs a ZeroMQ client and connects sockets.
Definition ZmqClient.cpp:94
virtual ~ZmqClient()
Destroys client and stops background activity.
void setDisp(const std::string &path, const std::string &value)
Writes display-formatted value at a path (string mode).
std::shared_ptr< rogue::interfaces::ZmqClient > ZmqClientPtr
Definition ZmqClient.h:177
std::shared_ptr< rogue::Logging > LoggingPtr
Shared pointer alias for Logging.
Definition Logging.h:205