34 #include <boost/python.hpp>
35namespace bp = boost::python;
47 bp::class_<rogue::interfaces::ZmqClientWrap, rogue::interfaces::ZmqClientWrapPtr, boost::noncopyable>(
49 bp::init<std::string, uint16_t, bool>())
66 this->doString_ = doString;
67 this->zmqCtx_ = zmq_ctx_new();
68 this->zmqSub_ = zmq_socket(this->zmqCtx_, ZMQ_SUB);
69 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_REQ);
78 temp.append(std::to_string(
static_cast<int64_t
>(port)));
80 if (zmq_setsockopt(this->zmqSub_, ZMQ_SUBSCRIBE,
"", 0) != 0)
84 if (zmq_setsockopt(this->zmqSub_, ZMQ_LINGER, &val,
sizeof(int32_t)) != 0)
87 if (zmq_connect(this->zmqSub_, temp.c_str()) < 0)
89 "Failed to connect to port %" PRIu16
" at address %s",
102 temp.append(std::to_string(
static_cast<int64_t
>(reqPort)));
106 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_,
sizeof(int32_t)) != 0)
110 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_CORRELATE, &val,
sizeof(int32_t)) != 0)
113 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_RELAXED, &val,
sizeof(int32_t)) != 0)
117 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &val,
sizeof(int32_t)) != 0)
120 if (zmq_connect(this->zmqReq_, temp.c_str()) < 0)
122 "Failed to connect to port %" PRIu32
" at address %s",
128 log_->info(
"Connected to Rogue server at port %" PRIu32, reqPort);
130 log_->info(
"Connected to Rogue server at ports %" PRIu16
":%" PRIu32, port, reqPort);
133 thread_ =
new std::thread(&rogue::interfaces::ZmqClient::runThread,
this);
151 if (!doString_) zmq_close(this->zmqSub_);
152 zmq_close(this->zmqReq_);
153 zmq_ctx_destroy(this->zmqCtx_);
158 waitRetry_ = waitRetry;
161 printf(
"ZmqClient::setTimeout: Setting timeout to %" PRIu32
" msecs, waitRetry = %" PRIu8
"\n",
165 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_,
sizeof(int32_t)) != 0)
178 snd =
"{\"attr\": \"" + attr +
"\",";
179 snd +=
"\"path\": \"" + path +
"\"";
181 if (arg !=
"") snd +=
",\"args\": [\"" + arg +
"\"]";
186 zmq_send(this->zmqReq_, snd.c_str(), snd.size(), 0);
190 if (zmq_recvmsg(this->zmqReq_, &msg, 0) <= 0) {
191 seconds +=
static_cast<double>(timeout_) / 1000.0;
193 log_->error(
"Timeout waiting for response after %d Seconds, server may be busy! Waiting...",
static_cast<int>(seconds));
197 "Timeout waiting for response after %d Seconds.",
198 static_cast<int>(seconds));
205 if (seconds != 0) log_->error(
"Finally got response from server after %d seconds!",
static_cast<int>(seconds));
207 data = std::string((
const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
213 return sendString(path,
"getDisp",
"");
217 sendString(path,
"setDisp", value);
221 return sendString(path,
"__call__", arg);
225 return sendString(path,
"valueDisp",
"");
239 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0)
242 zmq_msg_init_size(&txMsg, valueBuf.len);
243 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
244 PyBuffer_Release(&valueBuf);
248 zmq_sendmsg(this->zmqReq_, &txMsg, 0);
251 zmq_msg_init(&rxMsg);
252 if (zmq_recvmsg(this->zmqReq_, &rxMsg, 0) <= 0) {
253 seconds +=
static_cast<double>(timeout_) / 1000.0;
255 log_->error(
"Timeout waiting for response after %d Seconds, server may be busy! Waiting...",
256 static_cast<int>(seconds));
257 zmq_msg_close(&rxMsg);
261 "Timeout waiting for response after %d Seconds, server may be busy!",
262 static_cast<int>(seconds));
270 if (seconds != 0) log_->error(
"Finally got response from server after %d seconds!",
static_cast<int>(seconds));
272 PyObject* val = Py_BuildValue(
"y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
276 zmq_msg_close(&rxMsg);
278 bp::handle<> handle(val);
279 ret = bp::object(handle);
289 if (bp::override f = this->get_override(
"_doUpdate")) {
305void rogue::interfaces::ZmqClient::runThread() {
314 if (zmq_recvmsg(this->zmqSub_, &msg, 0) > 0) {
317 PyObject* val = Py_BuildValue(
"y#", zmq_msg_data(&msg), zmq_msg_size(&msg));
318 bp::handle<> handle(val);
319 bp::object dat = bp::object(handle);
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
RAII helper that acquires the Python GIL for a scope.
void defDoUpdate(boost::python::object data)
Calls base-class doUpdate() implementation.
void doUpdate(boost::python::object data)
Handles an update message from the subscription path.
ZmqClientWrap(const std::string &addr, uint16_t port, bool doString)
Constructs wrapper client.
ZeroMQ client for Rogue control and update messaging.
std::string sendString(const std::string &path, const std::string &attr, const std::string &arg)
Sends a string-mode request.
void stop()
Stops client sockets and background thread.
std::string getDisp(const std::string &path)
Reads display-formatted value at a path (string mode).
virtual void doUpdate(boost::python::object data)
Handles async update payloads received on subscriber socket.
std::string exec(const std::string &path, const std::string &arg="")
Executes callable node at path (string mode).
boost::python::object send(boost::python::object data)
Sends binary request payload and receives binary response.
static void setup_python()
Registers Python bindings for this class.
std::string valueDisp(const std::string &path)
Reads compact value display at a path (string mode).
static std::shared_ptr< rogue::interfaces::ZmqClient > create(const std::string &addr, uint16_t port, bool doString)
Creates a ZeroMQ client.
void setTimeout(uint32_t msecs, bool waitRetry)
Sets request timeout behavior.
ZmqClient(const std::string &addr, uint16_t port, bool doString)
Constructs a ZeroMQ client and connects sockets.
virtual ~ZmqClient()
Destroys client and stops background activity.
void setDisp(const std::string &path, const std::string &value)
Writes display-formatted value at a path (string mode).
std::shared_ptr< rogue::interfaces::ZmqClient > ZmqClientPtr