33 #include <boost/python.hpp>
34namespace bp = boost::python;
46 bp::class_<rogue::interfaces::ZmqServerWrap, rogue::interfaces::ZmqServerWrapPtr, boost::noncopyable>(
48 bp::init<std::string, uint16_t>())
62 this->zmqCtx_ = zmq_ctx_new();
63 this->threadEn_ =
false;
64 this->basePort_ = port;
70 if (zmqCtx_ !=
nullptr) {
71 zmq_ctx_destroy(zmqCtx_);
81 if (this->threadEn_)
return;
82 port = this->basePort_;
86 for (this->basePort_ = 9099; this->basePort_ < (9099 + 100); this->basePort_ += 4) {
87 res = this->tryConnect();
91 res = this->tryConnect();
97 "Failed to auto bind server on interface %s.",
98 this->addr_.c_str()));
101 "Failed to bind server to port %" PRIu16
102 " on interface %s. Another process may be using this port.",
104 this->addr_.c_str()));
107 log_->debug(
"Started Rogue server on %s at ports %" PRIu16
":%" PRIu16
":%" PRIu16,
111 this->basePort_ + 2);
113 this->threadEn_ =
true;
114 this->rThread_ =
new std::thread(&rogue::interfaces::ZmqServer::runThread,
this);
115 this->sThread_ =
new std::thread(&rogue::interfaces::ZmqServer::strThread,
this);
119 zmq_send(this->zmqPub_, dummy.c_str(), dummy.size(), 0);
126 log_->info(
"Waiting for server thread to exit");
129 if (rThread_ !=
nullptr) {
134 if (sThread_ !=
nullptr) {
139 log_->info(
"Closing pub socket");
140 zmq_close(this->zmqPub_);
142 log_->info(
"Closing request socket");
143 zmq_close(this->zmqRep_);
145 log_->info(
"Closing string socket");
146 zmq_close(this->zmqStr_);
148 log_->info(
"Destroying Context");
149 zmq_ctx_destroy(this->zmqCtx_);
151 log_->info(
"Zmq server done. Exiting");
155bool rogue::interfaces::ZmqServer::tryConnect() {
159 log_->debug(
"Trying to serve on ports %" PRIu16
":%" PRIu16
":%" PRIu16,
162 this->basePort_ + 2);
164 this->zmqPub_ = zmq_socket(this->zmqCtx_, ZMQ_PUB);
165 this->zmqRep_ = zmq_socket(this->zmqCtx_, ZMQ_REP);
166 this->zmqStr_ = zmq_socket(this->zmqCtx_, ZMQ_REP);
171 if (zmq_setsockopt(this->zmqPub_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
174 if (zmq_setsockopt(this->zmqRep_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
177 if (zmq_setsockopt(this->zmqStr_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
181 if (zmq_setsockopt(this->zmqRep_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
182 throw(
rogue::GeneralError(
"ZmqServer::tryConnect",
"Failed to set socket receive timeout"));
184 if (zmq_setsockopt(this->zmqStr_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
185 throw(
rogue::GeneralError(
"ZmqServer::tryConnect",
"Failed to set socket receive timeout"));
187 zmq_close(this->zmqPub_);
189 zmq_close(this->zmqRep_);
191 zmq_close(this->zmqStr_);
198 temp.append(this->addr_);
200 temp.append(std::to_string(
static_cast<int64_t
>(this->basePort_)));
203 if (zmq_bind(this->zmqPub_, temp.c_str()) < 0) {
204 zmq_close(this->zmqPub_);
206 zmq_close(this->zmqRep_);
208 zmq_close(this->zmqStr_);
210 log_->debug(
"Failed to bind publish socket to %s: %s", temp.c_str(), zmq_strerror(zmq_errno()));
216 temp.append(this->addr_);
218 temp.append(std::to_string(
static_cast<int64_t
>(this->basePort_ + 1)));
220 if (zmq_bind(this->zmqRep_, temp.c_str()) < 0) {
221 zmq_close(this->zmqPub_);
223 zmq_close(this->zmqRep_);
225 zmq_close(this->zmqStr_);
227 log_->debug(
"Failed to bind request socket to %s: %s", temp.c_str(), zmq_strerror(zmq_errno()));
233 temp.append(this->addr_);
235 temp.append(std::to_string(
static_cast<int64_t
>(this->basePort_ + 2)));
237 if (zmq_bind(this->zmqStr_, temp.c_str()) < 0) {
238 zmq_close(this->zmqPub_);
240 zmq_close(this->zmqRep_);
242 zmq_close(this->zmqStr_);
244 log_->debug(
"Failed to bind string request socket to %s: %s", temp.c_str(), zmq_strerror(zmq_errno()));
252 return this->basePort_;
265 if (!this->threadEn_)
return;
271 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0) {
279 if (zmq_msg_init_size(&msg, valueBuf.len) < 0) {
280 PyBuffer_Release(&valueBuf);
283 memcpy(zmq_msg_data(&msg), valueBuf.buf, valueBuf.len);
284 PyBuffer_Release(&valueBuf);
290 if (zmq_sendmsg(this->zmqPub_, &msg, 0) < 0) {
297 bp::handle<> handle(bp::borrowed(Py_None));
298 return bp::object(handle);
305 if (bp::override f = this->get_override(
"_doRequest")) {
322 if (bp::override f = this->get_override(
"_doString")) {
339void rogue::interfaces::ZmqServer::runThread() {
344 log_->info(
"Started Rogue server thread");
347 zmq_msg_init(&rxMsg);
350 if (zmq_recvmsg(this->zmqRep_, &rxMsg, 0) > 0) {
359 PyObject* val = Py_BuildValue(
"y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
369 bp::handle<> handle(val);
371 bp::object ret = this->doRequest(bp::object(handle));
373 if (PyObject_GetBuffer(ret.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0) {
382 if (zmq_msg_init_size(&txMsg, valueBuf.len) < 0) {
383 PyBuffer_Release(&valueBuf);
387 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
388 PyBuffer_Release(&valueBuf);
396 if (zmq_sendmsg(this->zmqRep_, &txMsg, 0) < 0) {
397 zmq_msg_close(&txMsg);
402 }
catch (
const std::exception& e) {
403 log_->warning(
"ZmqServer::runThread: dropping request after exception: %s", e.what());
404 if (txInit) zmq_msg_close(&txMsg);
405 zmq_send(this->zmqRep_,
"", 0, 0);
407 log_->warning(
"ZmqServer::runThread: dropping request after unknown exception");
408 if (txInit) zmq_msg_close(&txMsg);
409 zmq_send(this->zmqRep_,
"", 0, 0);
414 zmq_msg_close(&rxMsg);
416 log_->info(
"Stopped Rogue server thread");
419void rogue::interfaces::ZmqServer::strThread() {
423 log_->info(
"Started Rogue string server thread");
429 if (zmq_recvmsg(this->zmqStr_, &msg, 0) > 0) {
434 std::string data((
const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
435 std::string ret = this->doString(data);
436 zmq_send(this->zmqStr_, ret.c_str(), ret.size(), 0);
437 }
catch (
const std::exception& e) {
438 log_->warning(
"ZmqServer::strThread: dropping request after exception: %s", e.what());
439 zmq_send(this->zmqStr_,
"", 0, 0);
441 log_->warning(
"ZmqServer::strThread: dropping request after unknown exception");
442 zmq_send(this->zmqStr_,
"", 0, 0);
448 log_->info(
"Stopped Rogue string server thread");
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
RAII helper that acquires the Python GIL for a scope.
boost::python::object defDoRequest(boost::python::object data)
Calls base-class doRequest() implementation.
std::string defDoString(const std::string &data)
Calls base-class doString() implementation.
boost::python::object doRequest(boost::python::object data)
Processes a Python-object request message.
std::string doString(const std::string &data)
Processes a string request message.
ZmqServerWrap(std::string addr, uint16_t port)
Constructs wrapper server.
ZeroMQ server for Rogue control, request/reply, and publish updates.
static void setup_python()
Registers Python bindings for this class.
uint16_t port()
Returns currently bound base port.
void start()
Starts server, binds sockets, and launches worker threads.
virtual ~ZmqServer()
Destroys server and stops worker threads/sockets.
void stop()
Stops server threads and closes sockets.
static std::shared_ptr< rogue::interfaces::ZmqServer > create(const std::string &addr, uint16_t port)
Creates a ZeroMQ server.
virtual boost::python::object doRequest(boost::python::object data)
Handles one binary request payload.
virtual std::string doString(const std::string &data)
Handles one string request payload.
void publish(boost::python::object data)
Publishes an update payload on the publish socket.
ZmqServer(const std::string &addr, uint16_t port)
Constructs a ZeroMQ server.
std::shared_ptr< rogue::interfaces::ZmqServer > ZmqServerPtr