34 #include <boost/python.hpp>
35namespace bp = boost::python;
40constexpr uint32_t kRetryLogIntervalSeconds = 30;
42void logWaitRetry(
rogue::LoggingPtr log,
double seconds, uint32_t& lastLoggedSeconds) {
43 uint32_t elapsedSeconds =
static_cast<uint32_t
>(seconds);
45 if (elapsedSeconds == 0)
return;
47 if (lastLoggedSeconds == 0) {
49 "Timeout waiting for response after %" PRIu32
" seconds, server may be busy. Continuing to wait...",
51 lastLoggedSeconds = elapsedSeconds;
55 if (elapsedSeconds >= (lastLoggedSeconds + kRetryLogIntervalSeconds)) {
56 log->warning(
"Still waiting for response after %" PRIu32
" seconds, server may be busy...", elapsedSeconds);
57 lastLoggedSeconds = elapsedSeconds;
62 uint32_t elapsedSeconds =
static_cast<uint32_t
>(seconds);
64 if (elapsedSeconds != 0) {
65 log->warning(
"Received response from server after %" PRIu32
" seconds.", elapsedSeconds);
76 if (PyBool_Check(warnTime.ptr()) || PyBool_Check(failTime.ptr())) {
77 PyErr_SetString(PyExc_TypeError,
78 "ZmqClient.setTimeout(warnTime, failTime=0): arguments must be int "
79 "milliseconds, not bool. The legacy (msecs, waitRetry[, maxRetries]) "
80 "form was removed; pass failTime=0 to retry forever.");
81 bp::throw_error_already_set();
83 self.
setTimeout(bp::extract<uint32_t>(warnTime), bp::extract<uint32_t>(failTime));
99 bp::class_<rogue::interfaces::ZmqClientWrap, rogue::interfaces::ZmqClientWrapPtr, boost::noncopyable>(
101 bp::init<std::string, uint16_t, bool>())
104 .def(
"setTimeout", &setTimeoutPy, (bp::arg(
"warnTime"), bp::arg(
"failTime") = bp::object(0u)))
118 this->doString_ = doString;
119 this->zmqCtx_ = zmq_ctx_new();
120 this->zmqSub_ = zmq_socket(this->zmqCtx_, ZMQ_SUB);
121 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_REQ);
133 temp.append(std::to_string(
static_cast<int64_t
>(port)));
135 if (zmq_setsockopt(this->zmqSub_, ZMQ_SUBSCRIBE,
"", 0) != 0)
139 if (zmq_setsockopt(this->zmqSub_, ZMQ_LINGER, &val,
sizeof(int32_t)) != 0)
143 if (zmq_setsockopt(this->zmqSub_, ZMQ_RCVTIMEO, &val,
sizeof(int32_t)) != 0)
146 if (zmq_connect(this->zmqSub_, temp.c_str()) < 0)
148 "Failed to connect to port %" PRIu16
" at address %s",
161 temp.append(std::to_string(
static_cast<int64_t
>(reqPort)));
165 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_,
sizeof(int32_t)) != 0)
169 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_CORRELATE, &val,
sizeof(int32_t)) != 0)
172 if (zmq_setsockopt(this->zmqReq_, ZMQ_REQ_RELAXED, &val,
sizeof(int32_t)) != 0)
176 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &val,
sizeof(int32_t)) != 0)
179 if (zmq_connect(this->zmqReq_, temp.c_str()) < 0)
181 "Failed to connect to port %" PRIu32
" at address %s",
187 log_->info(
"Connected to Rogue server at port %" PRIu32, reqPort);
189 log_->info(
"Connected to Rogue server at ports %" PRIu16
":%" PRIu32, port, reqPort);
192 thread_ =
new std::thread(&rogue::interfaces::ZmqClient::runThread,
this);
196 if (zmqSub_ !=
nullptr) {
200 if (zmqReq_ !=
nullptr) {
204 if (zmqCtx_ !=
nullptr) {
205 zmq_ctx_destroy(zmqCtx_);
237 std::lock_guard<std::mutex> lock(reqLock_);
238 if (!doString_) zmq_close(this->zmqSub_);
239 zmq_close(this->zmqReq_);
240 zmq_ctx_destroy(this->zmqCtx_);
249 throw rogue::GeneralError(
"ZmqClient::setTimeout",
"warnTime must be greater than 0 milliseconds");
253 std::lock_guard<std::mutex> lock(reqLock_);
256 failTime_ = failTime;
258 log_->debug(
"Setting timeout to warnTime = %" PRIu32
" msecs, failTime = %" PRIu32
" msecs",
262 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &timeout_,
sizeof(int32_t)) != 0)
272 uint32_t lastLoggedSeconds = 0;
276 snd =
"{\"attr\": \"" + attr +
"\",";
277 snd +=
"\"path\": \"" + path +
"\"";
279 if (arg !=
"") snd +=
",\"args\": [\"" + arg +
"\"]";
284 std::lock_guard<std::mutex> lock(reqLock_);
285 zmq_send(this->zmqReq_, snd.c_str(), snd.size(), 0);
289 if (zmq_recvmsg(this->zmqReq_, &msg, 0) <= 0) {
290 seconds +=
static_cast<double>(timeout_) / 1000.0;
294 const bool deadlineReached = (failTime_ != 0 && seconds * 1000.0 >=
static_cast<double>(failTime_));
295 if (!deadlineReached && !stopping_) {
296 logWaitRetry(log_, seconds, lastLoggedSeconds);
301 "Timeout waiting for response after %d Seconds.",
302 static_cast<int>(seconds));
309 logWaitRecovered(log_, seconds);
311 data = std::string((
const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
317 return sendString(path,
"getDisp",
"");
321 sendString(path,
"setDisp", value);
325 return sendString(path,
"__call__", arg);
329 return sendString(path,
"valueDisp",
"");
340 uint32_t lastLoggedSeconds = 0;
348 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0) {
356 if (zmq_msg_init_size(&txMsg, valueBuf.len) < 0) {
357 PyBuffer_Release(&valueBuf);
360 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
361 PyBuffer_Release(&valueBuf);
365 std::lock_guard<std::mutex> lock(reqLock_);
371 if (zmq_sendmsg(this->zmqReq_, &txMsg, 0) < 0) {
376 int err = zmq_errno();
377 zmq_msg_close(&txMsg);
379 "zmq_sendmsg failed: errno=%d %s",
385 zmq_msg_init(&rxMsg);
386 if (zmq_recvmsg(this->zmqReq_, &rxMsg, 0) <= 0) {
387 seconds +=
static_cast<double>(timeout_) / 1000.0;
391 const bool deadlineReached = (failTime_ != 0 && seconds * 1000.0 >=
static_cast<double>(failTime_));
392 if (!deadlineReached && !stopping_) {
393 logWaitRetry(log_, seconds, lastLoggedSeconds);
394 zmq_msg_close(&rxMsg);
396 zmq_msg_close(&rxMsg);
399 "Timeout waiting for response after %d Seconds, server may be busy!",
400 static_cast<int>(seconds));
408 logWaitRecovered(log_, seconds);
410 PyObject* val = Py_BuildValue(
"y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
417 zmq_msg_close(&rxMsg);
421 zmq_msg_close(&rxMsg);
423 bp::handle<> handle(val);
424 ret = bp::object(handle);
434 if (bp::override f = this->get_override(
"_doUpdate")) {
450void rogue::interfaces::ZmqClient::runThread() {
459 if (zmq_recvmsg(this->zmqSub_, &msg, 0) > 0) {
463 PyObject* val = Py_BuildValue(
"y#", zmq_msg_data(&msg), zmq_msg_size(&msg));
473 bp::handle<> handle(val);
474 bp::object dat = bp::object(handle);
476 }
catch (
const std::exception& e) {
477 log_->warning(
"ZmqClient::runThread: dropping update after exception: %s", e.what());
479 log_->warning(
"ZmqClient::runThread: dropping update after unknown exception");
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
RAII helper that acquires the Python GIL for a scope.
void defDoUpdate(boost::python::object data)
Calls base-class doUpdate() implementation.
void doUpdate(boost::python::object data)
Handles an update message from the subscription path.
ZmqClientWrap(const std::string &addr, uint16_t port, bool doString)
Constructs wrapper client.
ZeroMQ client for Rogue control and update messaging.
std::string sendString(const std::string &path, const std::string &attr, const std::string &arg)
Sends a string-mode request.
void stop()
Stops client sockets and background thread.
std::string getDisp(const std::string &path)
Reads display-formatted value at a path (string mode).
virtual void doUpdate(boost::python::object data)
Handles async update payloads received on subscriber socket.
std::string exec(const std::string &path, const std::string &arg="")
Executes callable node at path (string mode).
boost::python::object send(boost::python::object data)
Sends binary request payload and receives binary response.
static void setup_python()
Registers Python bindings for this class.
void setTimeout(uint32_t warnTime, uint32_t failTime=0)
Sets request timeout behavior.
std::string valueDisp(const std::string &path)
Reads compact value display at a path (string mode).
static std::shared_ptr< rogue::interfaces::ZmqClient > create(const std::string &addr, uint16_t port, bool doString)
Creates a ZeroMQ client.
ZmqClient(const std::string &addr, uint16_t port, bool doString)
Constructs a ZeroMQ client and connects sockets.
virtual ~ZmqClient()
Destroys client and stops background activity.
void setDisp(const std::string &path, const std::string &value)
Writes display-formatted value at a path (string mode).
std::shared_ptr< rogue::interfaces::ZmqClient > ZmqClientPtr
std::shared_ptr< rogue::Logging > LoggingPtr
Shared pointer alias for Logging.