rogue
Loading...
Searching...
No Matches
ZmqClient.cpp
Go to the documentation of this file.
1
18#include "rogue/Directives.h"
19
21
22#include <inttypes.h>
23#include <zmq.h>
24
25#include <cstdio>
26#include <memory>
27#include <string>
28
29#include "rogue/GeneralError.h"
30#include "rogue/GilRelease.h"
31#include "rogue/ScopedGil.h"
32
33#ifndef NO_PYTHON
34 #include <boost/python.hpp>
35namespace bp = boost::python;
36#endif
37
38rogue::interfaces::ZmqClientPtr rogue::interfaces::ZmqClient::create(const std::string& addr, uint16_t port, bool doString) {
39 rogue::interfaces::ZmqClientPtr ret = std::make_shared<rogue::interfaces::ZmqClient>(addr, port, doString);
40 return (ret);
41}
42
45#ifndef NO_PYTHON
46
47 bp::class_<rogue::interfaces::ZmqClientWrap, rogue::interfaces::ZmqClientWrapPtr, boost::noncopyable>(
48 "ZmqClient",
49 bp::init<std::string, uint16_t, bool>())
58#endif
59}
60
61rogue::interfaces::ZmqClient::ZmqClient(const std::string& addr, uint16_t port, bool doString) {
62 std::string temp;
63 uint32_t val;
64 uint32_t reqPort;
65
66 this->doString_ = doString;
67 this->zmqCtx_ = zmq_ctx_new();
68 this->zmqSub_ = zmq_socket(this->zmqCtx_, ZMQ_SUB);
69 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_REQ);
70
71 log_ = rogue::Logging::create("ZmqClient");
72
73 if (!doString_) {
74 // Setup sub port
75 temp = "tcp://";
76 temp.append(addr);
77 temp.append(":");
78 temp.append(std::to_string(static_cast<int64_t>(port)));
79
80 if (zmq_setsockopt(this->zmqSub_, ZMQ_SUBSCRIBE, "", 0) != 0)
81 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket subscribe"));
82
83 val = 0;
84 if (zmq_setsockopt(this->zmqSub_, ZMQ_LINGER, &val, sizeof(int32_t)) != 0)
85 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket linger"));
86
87 if (zmq_connect(this->zmqSub_, temp.c_str()) < 0)
88 throw(rogue::GeneralError::create("ZmqClient::ZmqClient",
89 "Failed to connect to port %" PRIu16 " at address %s",
90 port,
91 addr.c_str()));
92
93 reqPort = port + 1;
94 } else {
95 reqPort = port + 2;
96 }
97
98 // Setup request port
99 temp = "tcp://";
100 temp.append(addr);
101 temp.append(":");
102 temp.append(std::to_string(static_cast<int64_t>(reqPort)));
103
104 waitRetry_ = false; // Don't keep waiting after timeout
105 timeout_ = 1000; // 1 second
106 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_, sizeof(int32_t)) != 0)
107 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket timeout"));
108
109 val = 1;
110 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_CORRELATE, &val, sizeof(int32_t)) != 0)
111 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket correlate"));
112
113 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_RELAXED, &val, sizeof(int32_t)) != 0)
114 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket relaxed"));
115
116 val = 0;
117 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &val, sizeof(int32_t)) != 0)
118 throw(rogue::GeneralError("ZmqClient::ZmqClient", "Failed to set socket linger"));
119
120 if (zmq_connect(this->zmqReq_, temp.c_str()) < 0)
121 throw(rogue::GeneralError::create("ZmqClient::ZmqClient",
122 "Failed to connect to port %" PRIu32 " at address %s",
123 reqPort,
124 addr.c_str()));
125
126 if (doString_) {
127 threadEn_ = false;
128 log_->info("Connected to Rogue server at port %" PRIu32, reqPort);
129 } else {
130 log_->info("Connected to Rogue server at ports %" PRIu16 ":%" PRIu32, port, reqPort);
131
132 threadEn_ = true;
133 thread_ = new std::thread(&rogue::interfaces::ZmqClient::runThread, this);
134 }
135 running_ = true;
136}
137
141
143 if (running_) {
144 running_ = false;
145 if (threadEn_) {
146 rogue::GilRelease noGil;
147 waitRetry_ = false;
148 threadEn_ = false;
149 thread_->join();
150 }
151 if (!doString_) zmq_close(this->zmqSub_);
152 zmq_close(this->zmqReq_);
153 zmq_ctx_destroy(this->zmqCtx_);
154 }
155}
156
157void rogue::interfaces::ZmqClient::setTimeout(uint32_t msecs, bool waitRetry) {
158 waitRetry_ = waitRetry;
159 timeout_ = msecs;
160
161 printf("ZmqClient::setTimeout: Setting timeout to %" PRIu32 " msecs, waitRetry = %" PRIu8 "\n",
162 timeout_,
163 waitRetry_);
164
165 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_, sizeof(int32_t)) != 0)
166 throw(rogue::GeneralError("ZmqClient::setTimeout", "Failed to set socket timeout"));
167}
168
169std::string rogue::interfaces::ZmqClient::sendString(const std::string& path, const std::string& attr, const std::string& arg) {
170 std::string snd;
171 std::string ret;
172 zmq_msg_t msg;
173 std::string data;
174 double seconds = 0;
175
176 if (!doString_) throw rogue::GeneralError::create("ZmqClient::sendString", "Invalid send call in standard mode");
177
178 snd = "{\"attr\": \"" + attr + "\",";
179 snd += "\"path\": \"" + path + "\"";
180
181 if (arg != "") snd += ",\"args\": [\"" + arg + "\"]";
182
183 snd += "}";
184
185 rogue::GilRelease noGil;
186 zmq_send(this->zmqReq_, snd.c_str(), snd.size(), 0);
187
188 while (1) {
189 zmq_msg_init(&msg);
190 if (zmq_recvmsg(this->zmqReq_, &msg, 0) <= 0) {
191 seconds += static_cast<double>(timeout_) / 1000.0;
192 if (waitRetry_) {
193 log_->error("Timeout waiting for response after %d Seconds, server may be busy! Waiting...", static_cast<int>(seconds));
194 zmq_msg_close(&msg);
195 } else {
196 throw rogue::GeneralError::create("ZmqClient::sendString",
197 "Timeout waiting for response after %d Seconds.",
198 static_cast<int>(seconds));
199 }
200 } else {
201 break;
202 }
203 }
204
205 if (seconds != 0) log_->error("Finally got response from server after %d seconds!", static_cast<int>(seconds));
206
207 data = std::string((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
208 zmq_msg_close(&msg);
209 return data;
210}
211
212std::string rogue::interfaces::ZmqClient::getDisp(const std::string& path) {
213 return sendString(path, "getDisp", "");
214}
215
216void rogue::interfaces::ZmqClient::setDisp(const std::string& path, const std::string& value) {
217 sendString(path, "setDisp", value);
218}
219
220std::string rogue::interfaces::ZmqClient::exec(const std::string& path, const std::string& arg) {
221 return sendString(path, "__call__", arg);
222}
223
224std::string rogue::interfaces::ZmqClient::valueDisp(const std::string& path) {
225 return sendString(path, "valueDisp", "");
226}
227
228#ifndef NO_PYTHON
229
230bp::object rogue::interfaces::ZmqClient::send(bp::object value) {
231 zmq_msg_t txMsg;
232 zmq_msg_t rxMsg;
233 Py_buffer valueBuf;
234 bp::object ret;
235 double seconds = 0;
236
237 if (doString_) throw rogue::GeneralError::create("ZmqClient::send", "Invalid send call in string mode");
238
239 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0)
240 throw(rogue::GeneralError::create("ZmqClient::send", "Failed to extract object data"));
241
242 zmq_msg_init_size(&txMsg, valueBuf.len);
243 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
244 PyBuffer_Release(&valueBuf);
245
246 {
247 rogue::GilRelease noGil;
248 zmq_sendmsg(this->zmqReq_, &txMsg, 0);
249
250 while (1) {
251 zmq_msg_init(&rxMsg);
252 if (zmq_recvmsg(this->zmqReq_, &rxMsg, 0) <= 0) {
253 seconds += static_cast<double>(timeout_) / 1000.0;
254 if (waitRetry_) {
255 log_->error("Timeout waiting for response after %d Seconds, server may be busy! Waiting...",
256 static_cast<int>(seconds));
257 zmq_msg_close(&rxMsg);
258 } else {
260 "ZmqClient::send",
261 "Timeout waiting for response after %d Seconds, server may be busy!",
262 static_cast<int>(seconds));
263 }
264 } else {
265 break;
266 }
267 }
268 }
269
270 if (seconds != 0) log_->error("Finally got response from server after %d seconds!", static_cast<int>(seconds));
271
272 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
273
274 if (val == NULL) throw(rogue::GeneralError::create("ZmqClient::send", "Failed to generate bytearray"));
275
276 zmq_msg_close(&rxMsg);
277
278 bp::handle<> handle(val);
279 ret = bp::object(handle);
280 return ret;
281}
282
284
285rogue::interfaces::ZmqClientWrap::ZmqClientWrap(const std::string& addr, uint16_t port, bool doString)
286 : rogue::interfaces::ZmqClient(addr, port, doString) {}
287
289 if (bp::override f = this->get_override("_doUpdate")) {
290 try {
291 f(data);
292 } catch (...) {
293 PyErr_Print();
294 }
295 }
297}
298
302
303#endif
304
305void rogue::interfaces::ZmqClient::runThread() {
306 zmq_msg_t msg;
307
308 log_->logThreadId();
309
310 while (threadEn_) {
311 zmq_msg_init(&msg);
312
313 // Get the message
314 if (zmq_recvmsg(this->zmqSub_, &msg, 0) > 0) {
315#ifndef NO_PYTHON
317 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&msg), zmq_msg_size(&msg));
318 bp::handle<> handle(val);
319 bp::object dat = bp::object(handle);
320 this->doUpdate(dat);
321#endif
322 zmq_msg_close(&msg);
323 }
324 }
325}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:60
RAII helper that acquires the Python GIL for a scope.
Definition ScopedGil.h:35
void defDoUpdate(boost::python::object data)
Calls base-class doUpdate() implementation.
void doUpdate(boost::python::object data)
Handles an update message from the subscription path.
ZmqClientWrap(const std::string &addr, uint16_t port, bool doString)
Constructs wrapper client.
ZeroMQ client for Rogue control and update messaging.
Definition ZmqClient.h:48
std::string sendString(const std::string &path, const std::string &attr, const std::string &arg)
Sends a string-mode request.
void stop()
Stops client sockets and background thread.
std::string getDisp(const std::string &path)
Reads display-formatted value at a path (string mode).
virtual void doUpdate(boost::python::object data)
Handles async update payloads received on subscriber socket.
std::string exec(const std::string &path, const std::string &arg="")
Executes callable node at path (string mode).
boost::python::object send(boost::python::object data)
Sends binary request payload and receives binary response.
static void setup_python()
Registers Python bindings for this class.
Definition ZmqClient.cpp:44
std::string valueDisp(const std::string &path)
Reads compact value display at a path (string mode).
static std::shared_ptr< rogue::interfaces::ZmqClient > create(const std::string &addr, uint16_t port, bool doString)
Creates a ZeroMQ client.
Definition ZmqClient.cpp:38
void setTimeout(uint32_t msecs, bool waitRetry)
Sets request timeout behavior.
ZmqClient(const std::string &addr, uint16_t port, bool doString)
Constructs a ZeroMQ client and connects sockets.
Definition ZmqClient.cpp:61
virtual ~ZmqClient()
Destroys client and stops background activity.
void setDisp(const std::string &path, const std::string &value)
Writes display-formatted value at a path (string mode).
std::shared_ptr< rogue::interfaces::ZmqClient > ZmqClientPtr
Definition ZmqClient.h:177