34 #include <boost/python.hpp>
35namespace bp = boost::python;
40constexpr uint32_t kRetryLogIntervalSeconds = 30;
42void logWaitRetry(
rogue::LoggingPtr log,
double seconds, uint32_t& lastLoggedSeconds) {
43 uint32_t elapsedSeconds =
static_cast<uint32_t
>(seconds);
45 if (elapsedSeconds == 0)
return;
47 if (lastLoggedSeconds == 0) {
49 "Timeout waiting for response after %" PRIu32
" seconds, server may be busy. Continuing to wait...",
51 lastLoggedSeconds = elapsedSeconds;
55 if (elapsedSeconds >= (lastLoggedSeconds + kRetryLogIntervalSeconds)) {
56 log->warning(
"Still waiting for response after %" PRIu32
" seconds, server may be busy...", elapsedSeconds);
57 lastLoggedSeconds = elapsedSeconds;
62 uint32_t elapsedSeconds =
static_cast<uint32_t
>(seconds);
64 if (elapsedSeconds != 0) {
65 log->warning(
"Received response from server after %" PRIu32
" seconds.", elapsedSeconds);
80 bp::class_<rogue::interfaces::ZmqClientWrap, rogue::interfaces::ZmqClientWrapPtr, boost::noncopyable>(
82 bp::init<std::string, uint16_t, bool>())
99 this->doString_ = doString;
100 this->zmqCtx_ = zmq_ctx_new();
101 this->zmqSub_ = zmq_socket(this->zmqCtx_, ZMQ_SUB);
102 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_REQ);
111 temp.append(std::to_string(
static_cast<int64_t
>(port)));
113 if (zmq_setsockopt(this->zmqSub_, ZMQ_SUBSCRIBE,
"", 0) != 0)
117 if (zmq_setsockopt(this->zmqSub_, ZMQ_LINGER, &val,
sizeof(int32_t)) != 0)
121 if (zmq_setsockopt(this->zmqSub_, ZMQ_RCVTIMEO, &val,
sizeof(int32_t)) != 0)
124 if (zmq_connect(this->zmqSub_, temp.c_str()) < 0)
126 "Failed to connect to port %" PRIu16
" at address %s",
139 temp.append(std::to_string(
static_cast<int64_t
>(reqPort)));
143 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_,
sizeof(int32_t)) != 0)
147 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_CORRELATE, &val,
sizeof(int32_t)) != 0)
150 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_RELAXED, &val,
sizeof(int32_t)) != 0)
154 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &val,
sizeof(int32_t)) != 0)
157 if (zmq_connect(this->zmqReq_, temp.c_str()) < 0)
159 "Failed to connect to port %" PRIu32
" at address %s",
165 log_->info(
"Connected to Rogue server at port %" PRIu32, reqPort);
167 log_->info(
"Connected to Rogue server at ports %" PRIu16
":%" PRIu32, port, reqPort);
170 thread_ =
new std::thread(&rogue::interfaces::ZmqClient::runThread,
this);
188 if (!doString_) zmq_close(this->zmqSub_);
189 zmq_close(this->zmqReq_);
190 zmq_ctx_destroy(this->zmqCtx_);
195 waitRetry_ = waitRetry;
198 log_->debug(
"Setting timeout to %" PRIu32
" msecs, waitRetry = %" PRIu8, timeout_, waitRetry_);
200 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_,
sizeof(int32_t)) != 0)
210 uint32_t lastLoggedSeconds = 0;
214 snd =
"{\"attr\": \"" + attr +
"\",";
215 snd +=
"\"path\": \"" + path +
"\"";
217 if (arg !=
"") snd +=
",\"args\": [\"" + arg +
"\"]";
222 zmq_send(this->zmqReq_, snd.c_str(), snd.size(), 0);
226 if (zmq_recvmsg(this->zmqReq_, &msg, 0) <= 0) {
227 seconds +=
static_cast<double>(timeout_) / 1000.0;
229 logWaitRetry(log_, seconds, lastLoggedSeconds);
233 "Timeout waiting for response after %d Seconds.",
234 static_cast<int>(seconds));
241 logWaitRecovered(log_, seconds);
243 data = std::string((
const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
249 return sendString(path,
"getDisp",
"");
253 sendString(path,
"setDisp", value);
257 return sendString(path,
"__call__", arg);
261 return sendString(path,
"valueDisp",
"");
272 uint32_t lastLoggedSeconds = 0;
276 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0)
279 zmq_msg_init_size(&txMsg, valueBuf.len);
280 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
281 PyBuffer_Release(&valueBuf);
285 zmq_sendmsg(this->zmqReq_, &txMsg, 0);
288 zmq_msg_init(&rxMsg);
289 if (zmq_recvmsg(this->zmqReq_, &rxMsg, 0) <= 0) {
290 seconds +=
static_cast<double>(timeout_) / 1000.0;
292 logWaitRetry(log_, seconds, lastLoggedSeconds);
293 zmq_msg_close(&rxMsg);
297 "Timeout waiting for response after %d Seconds, server may be busy!",
298 static_cast<int>(seconds));
306 logWaitRecovered(log_, seconds);
308 PyObject* val = Py_BuildValue(
"y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
312 zmq_msg_close(&rxMsg);
314 bp::handle<> handle(val);
315 ret = bp::object(handle);
325 if (bp::override f = this->get_override(
"_doUpdate")) {
341void rogue::interfaces::ZmqClient::runThread() {
350 if (zmq_recvmsg(this->zmqSub_, &msg, 0) > 0) {
353 PyObject* val = Py_BuildValue(
"y#", zmq_msg_data(&msg), zmq_msg_size(&msg));
354 bp::handle<> handle(val);
355 bp::object dat = bp::object(handle);
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
RAII helper that acquires the Python GIL for a scope.
void defDoUpdate(boost::python::object data)
Calls base-class doUpdate() implementation.
void doUpdate(boost::python::object data)
Handles an update message from the subscription path.
ZmqClientWrap(const std::string &addr, uint16_t port, bool doString)
Constructs wrapper client.
ZeroMQ client for Rogue control and update messaging.
std::string sendString(const std::string &path, const std::string &attr, const std::string &arg)
Sends a string-mode request.
void stop()
Stops client sockets and background thread.
std::string getDisp(const std::string &path)
Reads display-formatted value at a path (string mode).
virtual void doUpdate(boost::python::object data)
Handles async update payloads received on subscriber socket.
std::string exec(const std::string &path, const std::string &arg="")
Executes callable node at path (string mode).
boost::python::object send(boost::python::object data)
Sends binary request payload and receives binary response.
static void setup_python()
Registers Python bindings for this class.
std::string valueDisp(const std::string &path)
Reads compact value display at a path (string mode).
static std::shared_ptr< rogue::interfaces::ZmqClient > create(const std::string &addr, uint16_t port, bool doString)
Creates a ZeroMQ client.
void setTimeout(uint32_t msecs, bool waitRetry)
Sets request timeout behavior.
ZmqClient(const std::string &addr, uint16_t port, bool doString)
Constructs a ZeroMQ client and connects sockets.
virtual ~ZmqClient()
Destroys client and stops background activity.
void setDisp(const std::string &path, const std::string &value)
Writes display-formatted value at a path (string mode).
std::shared_ptr< rogue::interfaces::ZmqClient > ZmqClientPtr
std::shared_ptr< rogue::Logging > LoggingPtr
Shared pointer alias for Logging.