33 #include <boost/python.hpp>
34namespace bp = boost::python;
46 bp::class_<rogue::interfaces::ZmqServerWrap, rogue::interfaces::ZmqServerWrapPtr, boost::noncopyable>(
48 bp::init<std::string, uint16_t>())
62 this->zmqCtx_ = zmq_ctx_new();
63 this->threadEn_ =
false;
64 this->basePort_ = port;
76 if (this->threadEn_)
return;
77 port = this->basePort_;
81 for (this->basePort_ = 9099; this->basePort_ < (9099 + 100); this->basePort_ += 4) {
82 res = this->tryConnect();
86 res = this->tryConnect();
92 "Failed to auto bind server on interface %s.",
93 this->addr_.c_str()));
96 "Failed to bind server to port %" PRIu16
97 " on interface %s. Another process may be using this port.",
99 this->addr_.c_str()));
102 log_->info(
"Started Rogue server at ports %" PRIu16
":%" PRIu16, this->basePort_, this->basePort_ + 1);
104 this->threadEn_ =
true;
105 this->rThread_ =
new std::thread(&rogue::interfaces::ZmqServer::runThread,
this);
106 this->sThread_ =
new std::thread(&rogue::interfaces::ZmqServer::strThread,
this);
110 zmq_send(this->zmqPub_, dummy.c_str(), dummy.size(), 0);
117 log_->info(
"Waiting for server thread to exit");
120 log_->info(
"Closing pub socket");
121 zmq_close(this->zmqPub_);
122 log_->info(
"Closing request socket");
123 zmq_close(this->zmqRep_);
124 log_->info(
"Closing string socket");
125 zmq_close(this->zmqStr_);
126 log_->info(
"Destroying Context");
127 zmq_ctx_destroy(this->zmqCtx_);
128 log_->info(
"Zmq server done. Exiting");
132bool rogue::interfaces::ZmqServer::tryConnect() {
136 log_->debug(
"Trying to serve on ports %" PRIu16
":%" PRIu16
":%" PRIu16,
139 this->basePort_ + 2);
141 this->zmqPub_ = zmq_socket(this->zmqCtx_, ZMQ_PUB);
142 this->zmqRep_ = zmq_socket(this->zmqCtx_, ZMQ_REP);
143 this->zmqStr_ = zmq_socket(this->zmqCtx_, ZMQ_REP);
146 if (zmq_setsockopt(this->zmqPub_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
149 if (zmq_setsockopt(this->zmqRep_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
152 if (zmq_setsockopt(this->zmqStr_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
156 if (zmq_setsockopt(this->zmqRep_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
157 throw(
rogue::GeneralError(
"ZmqServer::tryConnect",
"Failed to set socket receive timeout"));
159 if (zmq_setsockopt(this->zmqStr_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
160 throw(
rogue::GeneralError(
"ZmqServer::tryConnect",
"Failed to set socket receive timeout"));
164 temp.append(this->addr_);
166 temp.append(std::to_string(
static_cast<int64_t
>(this->basePort_)));
168 if (zmq_bind(this->zmqPub_, temp.c_str()) < 0) {
169 zmq_close(this->zmqPub_);
170 zmq_close(this->zmqRep_);
171 zmq_close(this->zmqStr_);
172 log_->debug(
"Failed to bind publish to port %" PRIu16, this->basePort_);
178 temp.append(this->addr_);
180 temp.append(std::to_string(
static_cast<int64_t
>(this->basePort_ + 1)));
182 if (zmq_bind(this->zmqRep_, temp.c_str()) < 0) {
183 zmq_close(this->zmqPub_);
184 zmq_close(this->zmqRep_);
185 zmq_close(this->zmqStr_);
186 log_->debug(
"Failed to bind resp to port %" PRIu16, this->basePort_ + 1);
192 temp.append(this->addr_);
194 temp.append(std::to_string(
static_cast<int64_t
>(this->basePort_ + 2)));
196 if (zmq_bind(this->zmqStr_, temp.c_str()) < 0) {
197 zmq_close(this->zmqPub_);
198 zmq_close(this->zmqRep_);
199 zmq_close(this->zmqStr_);
200 log_->debug(
"Failed to bind str resp to port %" PRIu16, this->basePort_ + 2);
208 return this->basePort_;
221 if (!this->threadEn_)
return;
223 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0)
226 zmq_msg_init_size(&msg, valueBuf.len);
227 memcpy(zmq_msg_data(&msg), valueBuf.buf, valueBuf.len);
228 PyBuffer_Release(&valueBuf);
231 zmq_sendmsg(this->zmqPub_, &msg, 0);
235 bp::handle<> handle(bp::borrowed(Py_None));
236 return bp::object(handle);
243 if (bp::override f = this->get_override(
"_doRequest")) {
260 if (bp::override f = this->get_override(
"_doString")) {
277void rogue::interfaces::ZmqServer::runThread() {
282 log_->info(
"Started Rogue server thread");
285 zmq_msg_init(&rxMsg);
288 if (zmq_recvmsg(this->zmqRep_, &rxMsg, 0) > 0) {
292 PyObject* val = Py_BuildValue(
"y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
296 bp::handle<> handle(val);
298 bp::object ret = this->doRequest(bp::object(handle));
300 if (PyObject_GetBuffer(ret.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0)
303 zmq_msg_init_size(&txMsg, valueBuf.len);
304 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
305 PyBuffer_Release(&valueBuf);
307 zmq_sendmsg(this->zmqRep_, &txMsg, 0);
310 zmq_msg_close(&rxMsg);
313 log_->info(
"Stopped Rogue server thread");
316void rogue::interfaces::ZmqServer::strThread() {
322 log_->info(
"Started Rogue string server thread");
328 if (zmq_recvmsg(this->zmqStr_, &msg, 0) > 0) {
329 data = std::string((
const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
330 ret = this->doString(data);
331 zmq_send(this->zmqStr_, ret.c_str(), ret.size(), 0);
335 log_->info(
"Stopped Rogue string server thread");
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
RAII helper that acquires the Python GIL for a scope.
boost::python::object defDoRequest(boost::python::object data)
Calls base-class doRequest() implementation.
std::string defDoString(const std::string &data)
Calls base-class doString() implementation.
boost::python::object doRequest(boost::python::object data)
Processes a Python-object request message.
std::string doString(const std::string &data)
Processes a string request message.
ZmqServerWrap(std::string addr, uint16_t port)
Constructs wrapper server.
ZeroMQ server for Rogue control, request/reply, and publish updates.
static void setup_python()
Registers Python bindings for this class.
uint16_t port()
Returns currently bound base port.
void start()
Starts server, binds sockets, and launches worker threads.
virtual ~ZmqServer()
Destroys server and stops worker threads/sockets.
void stop()
Stops server threads and closes sockets.
static std::shared_ptr< rogue::interfaces::ZmqServer > create(const std::string &addr, uint16_t port)
Creates a ZeroMQ server.
virtual boost::python::object doRequest(boost::python::object data)
Handles one binary request payload.
virtual std::string doString(const std::string &data)
Handles one string request payload.
void publish(boost::python::object data)
Publishes an update payload on the publish socket.
ZmqServer(const std::string &addr, uint16_t port)
Constructs a ZeroMQ server.
std::shared_ptr< rogue::interfaces::ZmqServer > ZmqServerPtr