34 #include <boost/python.hpp>
35namespace bp = boost::python;
40constexpr uint32_t kRetryLogIntervalSeconds = 30;
42void logWaitRetry(
rogue::LoggingPtr log,
double seconds, uint32_t& lastLoggedSeconds) {
43 uint32_t elapsedSeconds =
static_cast<uint32_t
>(seconds);
45 if (elapsedSeconds == 0)
return;
47 if (lastLoggedSeconds == 0) {
49 "Timeout waiting for response after %" PRIu32
" seconds, server may be busy. Continuing to wait...",
51 lastLoggedSeconds = elapsedSeconds;
55 if (elapsedSeconds >= (lastLoggedSeconds + kRetryLogIntervalSeconds)) {
56 log->warning(
"Still waiting for response after %" PRIu32
" seconds, server may be busy...", elapsedSeconds);
57 lastLoggedSeconds = elapsedSeconds;
62 uint32_t elapsedSeconds =
static_cast<uint32_t
>(seconds);
64 if (elapsedSeconds != 0) {
65 log->warning(
"Received response from server after %" PRIu32
" seconds.", elapsedSeconds);
80 bp::class_<rogue::interfaces::ZmqClientWrap, rogue::interfaces::ZmqClientWrapPtr, boost::noncopyable>(
82 bp::init<std::string, uint16_t, bool>())
87 (bp::arg(
"msecs"), bp::arg(
"waitRetry"), bp::arg(
"maxRetries") = 0u))
101 this->doString_ = doString;
102 this->zmqCtx_ = zmq_ctx_new();
103 this->zmqSub_ = zmq_socket(this->zmqCtx_, ZMQ_SUB);
104 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_REQ);
116 temp.append(std::to_string(
static_cast<int64_t
>(port)));
118 if (zmq_setsockopt(this->zmqSub_, ZMQ_SUBSCRIBE,
"", 0) != 0)
122 if (zmq_setsockopt(this->zmqSub_, ZMQ_LINGER, &val,
sizeof(int32_t)) != 0)
126 if (zmq_setsockopt(this->zmqSub_, ZMQ_RCVTIMEO, &val,
sizeof(int32_t)) != 0)
129 if (zmq_connect(this->zmqSub_, temp.c_str()) < 0)
131 "Failed to connect to port %" PRIu16
" at address %s",
144 temp.append(std::to_string(
static_cast<int64_t
>(reqPort)));
149 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_,
sizeof(int32_t)) != 0)
153 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_CORRELATE, &val,
sizeof(int32_t)) != 0)
156 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_RELAXED, &val,
sizeof(int32_t)) != 0)
160 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &val,
sizeof(int32_t)) != 0)
163 if (zmq_connect(this->zmqReq_, temp.c_str()) < 0)
165 "Failed to connect to port %" PRIu32
" at address %s",
171 log_->info(
"Connected to Rogue server at port %" PRIu32, reqPort);
173 log_->info(
"Connected to Rogue server at ports %" PRIu16
":%" PRIu32, port, reqPort);
176 thread_ =
new std::thread(&rogue::interfaces::ZmqClient::runThread,
this);
180 if (zmqSub_ !=
nullptr) {
184 if (zmqReq_ !=
nullptr) {
188 if (zmqCtx_ !=
nullptr) {
189 zmq_ctx_destroy(zmqCtx_);
211 if (!doString_) zmq_close(this->zmqSub_);
212 zmq_close(this->zmqReq_);
213 zmq_ctx_destroy(this->zmqCtx_);
220 std::lock_guard<std::mutex> lock(reqLock_);
222 waitRetry_ = waitRetry;
224 maxRetries_ = maxRetries;
227 log_->debug(
"Setting timeout to %" PRIu32
" msecs, waitRetry = %d, maxRetries = %" PRIu32,
232 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_,
sizeof(int32_t)) != 0)
242 uint32_t lastLoggedSeconds = 0;
246 snd =
"{\"attr\": \"" + attr +
"\",";
247 snd +=
"\"path\": \"" + path +
"\"";
249 if (arg !=
"") snd +=
",\"args\": [\"" + arg +
"\"]";
254 std::lock_guard<std::mutex> lock(reqLock_);
255 zmq_send(this->zmqReq_, snd.c_str(), snd.size(), 0);
257 uint32_t retryCount = 0;
261 if (zmq_recvmsg(this->zmqReq_, &msg, 0) <= 0) {
262 seconds +=
static_cast<double>(timeout_) / 1000.0;
266 const bool retryBudgetExhausted = (maxRetries_ != 0 && ++retryCount >= maxRetries_);
267 if (waitRetry_ && !retryBudgetExhausted) {
268 logWaitRetry(log_, seconds, lastLoggedSeconds);
273 "Timeout waiting for response after %d Seconds.",
274 static_cast<int>(seconds));
281 logWaitRecovered(log_, seconds);
283 data = std::string((
const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
289 return sendString(path,
"getDisp",
"");
293 sendString(path,
"setDisp", value);
297 return sendString(path,
"__call__", arg);
301 return sendString(path,
"valueDisp",
"");
312 uint32_t lastLoggedSeconds = 0;
320 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0) {
328 if (zmq_msg_init_size(&txMsg, valueBuf.len) < 0) {
329 PyBuffer_Release(&valueBuf);
332 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
333 PyBuffer_Release(&valueBuf);
337 std::lock_guard<std::mutex> lock(reqLock_);
343 if (zmq_sendmsg(this->zmqReq_, &txMsg, 0) < 0) {
348 int err = zmq_errno();
349 zmq_msg_close(&txMsg);
351 "zmq_sendmsg failed: errno=%d %s",
356 uint32_t retryCount = 0;
359 zmq_msg_init(&rxMsg);
360 if (zmq_recvmsg(this->zmqReq_, &rxMsg, 0) <= 0) {
361 seconds +=
static_cast<double>(timeout_) / 1000.0;
365 const bool retryBudgetExhausted = (maxRetries_ != 0 && ++retryCount >= maxRetries_);
366 if (waitRetry_ && !retryBudgetExhausted) {
367 logWaitRetry(log_, seconds, lastLoggedSeconds);
368 zmq_msg_close(&rxMsg);
370 zmq_msg_close(&rxMsg);
373 "Timeout waiting for response after %d Seconds, server may be busy!",
374 static_cast<int>(seconds));
382 logWaitRecovered(log_, seconds);
384 PyObject* val = Py_BuildValue(
"y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
391 zmq_msg_close(&rxMsg);
395 zmq_msg_close(&rxMsg);
397 bp::handle<> handle(val);
398 ret = bp::object(handle);
408 if (bp::override f = this->get_override(
"_doUpdate")) {
424void rogue::interfaces::ZmqClient::runThread() {
433 if (zmq_recvmsg(this->zmqSub_, &msg, 0) > 0) {
437 PyObject* val = Py_BuildValue(
"y#", zmq_msg_data(&msg), zmq_msg_size(&msg));
447 bp::handle<> handle(val);
448 bp::object dat = bp::object(handle);
450 }
catch (
const std::exception& e) {
451 log_->warning(
"ZmqClient::runThread: dropping update after exception: %s", e.what());
453 log_->warning(
"ZmqClient::runThread: dropping update after unknown exception");
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
RAII helper that acquires the Python GIL for a scope.
void defDoUpdate(boost::python::object data)
Calls base-class doUpdate() implementation.
void doUpdate(boost::python::object data)
Handles an update message from the subscription path.
ZmqClientWrap(const std::string &addr, uint16_t port, bool doString)
Constructs wrapper client.
ZeroMQ client for Rogue control and update messaging.
std::string sendString(const std::string &path, const std::string &attr, const std::string &arg)
Sends a string-mode request.
void stop()
Stops client sockets and background thread.
std::string getDisp(const std::string &path)
Reads display-formatted value at a path (string mode).
virtual void doUpdate(boost::python::object data)
Handles async update payloads received on subscriber socket.
std::string exec(const std::string &path, const std::string &arg="")
Executes callable node at path (string mode).
boost::python::object send(boost::python::object data)
Sends binary request payload and receives binary response.
static void setup_python()
Registers Python bindings for this class.
std::string valueDisp(const std::string &path)
Reads compact value display at a path (string mode).
static std::shared_ptr< rogue::interfaces::ZmqClient > create(const std::string &addr, uint16_t port, bool doString)
Creates a ZeroMQ client.
ZmqClient(const std::string &addr, uint16_t port, bool doString)
Constructs a ZeroMQ client and connects sockets.
virtual ~ZmqClient()
Destroys client and stops background activity.
void setTimeout(uint32_t msecs, bool waitRetry, uint32_t maxRetries=0)
Sets request timeout behavior.
void setDisp(const std::string &path, const std::string &value)
Writes display-formatted value at a path (string mode).
std::shared_ptr< rogue::interfaces::ZmqClient > ZmqClientPtr
std::shared_ptr< rogue::Logging > LoggingPtr
Shared pointer alias for Logging.