rogue
Loading...
Searching...
No Matches
ZmqServer.cpp
Go to the documentation of this file.
1
18#include "rogue/Directives.h"
19
21
22#include <inttypes.h>
23#include <zmq.h>
24
25#include <memory>
26#include <string>
27
28#include "rogue/GeneralError.h"
29#include "rogue/GilRelease.h"
30#include "rogue/ScopedGil.h"
31
32#ifndef NO_PYTHON
33 #include <boost/python.hpp>
34namespace bp = boost::python;
35#endif
36
38 rogue::interfaces::ZmqServerPtr ret = std::make_shared<rogue::interfaces::ZmqServer>(addr, port);
39 return (ret);
40}
41
44#ifndef NO_PYTHON
45
46 bp::class_<rogue::interfaces::ZmqServerWrap, rogue::interfaces::ZmqServerWrapPtr, boost::noncopyable>(
47 "ZmqServer",
48 bp::init<std::string, uint16_t>())
55#endif
56}
57
58rogue::interfaces::ZmqServer::ZmqServer(const std::string& addr, uint16_t port) {
59 log_ = rogue::Logging::create("ZmqServer");
60
61 this->addr_ = addr;
62 this->zmqCtx_ = zmq_ctx_new();
63 this->threadEn_ = false;
64 this->basePort_ = port;
65}
66
70
72 std::string dummy;
73 bool res = false;
74 uint16_t port;
75
76 if (this->threadEn_) return;
77 port = this->basePort_;
78
79 // Auto port
80 if (port == 0) {
81 for (this->basePort_ = 9099; this->basePort_ < (9099 + 100); this->basePort_ += 4) {
82 res = this->tryConnect();
83 if (res) break;
84 }
85 } else {
86 res = this->tryConnect();
87 }
88
89 if (!res) {
90 if (port == 0)
91 throw(rogue::GeneralError::create("ZmqServer::ZmqServer",
92 "Failed to auto bind server on interface %s.",
93 this->addr_.c_str()));
94 else
95 throw(rogue::GeneralError::create("ZmqServer::ZmqServer",
96 "Failed to bind server to port %" PRIu16
97 " on interface %s. Another process may be using this port.",
98 port + 1,
99 this->addr_.c_str()));
100 }
101
102 log_->info("Started Rogue server at ports %" PRIu16 ":%" PRIu16, this->basePort_, this->basePort_ + 1);
103
104 this->threadEn_ = true;
105 this->rThread_ = new std::thread(&rogue::interfaces::ZmqServer::runThread, this);
106 this->sThread_ = new std::thread(&rogue::interfaces::ZmqServer::strThread, this);
107
108 // Send empty frame
109 dummy = "null\n";
110 zmq_send(this->zmqPub_, dummy.c_str(), dummy.size(), 0);
111}
112
114 if (threadEn_) {
115 rogue::GilRelease noGil;
116 threadEn_ = false;
117 log_->info("Waiting for server thread to exit");
118 rThread_->join();
119 sThread_->join();
120 log_->info("Closing pub socket");
121 zmq_close(this->zmqPub_);
122 log_->info("Closing request socket");
123 zmq_close(this->zmqRep_);
124 log_->info("Closing string socket");
125 zmq_close(this->zmqStr_);
126 log_->info("Destroying Context");
127 zmq_ctx_destroy(this->zmqCtx_);
128 log_->info("Zmq server done. Exiting");
129 }
130}
131
132bool rogue::interfaces::ZmqServer::tryConnect() {
133 std::string temp;
134 uint32_t opt;
135
136 log_->debug("Trying to serve on ports %" PRIu16 ":%" PRIu16 ":%" PRIu16,
137 this->basePort_,
138 this->basePort_ + 1,
139 this->basePort_ + 2);
140
141 this->zmqPub_ = zmq_socket(this->zmqCtx_, ZMQ_PUB);
142 this->zmqRep_ = zmq_socket(this->zmqCtx_, ZMQ_REP);
143 this->zmqStr_ = zmq_socket(this->zmqCtx_, ZMQ_REP);
144
145 opt = 0;
146 if (zmq_setsockopt(this->zmqPub_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
147 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket linger"));
148
149 if (zmq_setsockopt(this->zmqRep_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
150 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket linger"));
151
152 if (zmq_setsockopt(this->zmqStr_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
153 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket linger"));
154
155 opt = 100;
156 if (zmq_setsockopt(this->zmqRep_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
157 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket receive timeout"));
158
159 if (zmq_setsockopt(this->zmqStr_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
160 throw(rogue::GeneralError("ZmqServer::tryConnect", "Failed to set socket receive timeout"));
161
162 // Setup publish port
163 temp = "tcp://";
164 temp.append(this->addr_);
165 temp.append(":");
166 temp.append(std::to_string(static_cast<int64_t>(this->basePort_)));
167
168 if (zmq_bind(this->zmqPub_, temp.c_str()) < 0) {
169 zmq_close(this->zmqPub_);
170 zmq_close(this->zmqRep_);
171 zmq_close(this->zmqStr_);
172 log_->debug("Failed to bind publish to port %" PRIu16, this->basePort_);
173 return false;
174 }
175
176 // Setup response port
177 temp = "tcp://";
178 temp.append(this->addr_);
179 temp.append(":");
180 temp.append(std::to_string(static_cast<int64_t>(this->basePort_ + 1)));
181
182 if (zmq_bind(this->zmqRep_, temp.c_str()) < 0) {
183 zmq_close(this->zmqPub_);
184 zmq_close(this->zmqRep_);
185 zmq_close(this->zmqStr_);
186 log_->debug("Failed to bind resp to port %" PRIu16, this->basePort_ + 1);
187 return false;
188 }
189
190 // Setup string port
191 temp = "tcp://";
192 temp.append(this->addr_);
193 temp.append(":");
194 temp.append(std::to_string(static_cast<int64_t>(this->basePort_ + 2)));
195
196 if (zmq_bind(this->zmqStr_, temp.c_str()) < 0) {
197 zmq_close(this->zmqPub_);
198 zmq_close(this->zmqRep_);
199 zmq_close(this->zmqStr_);
200 log_->debug("Failed to bind str resp to port %" PRIu16, this->basePort_ + 2);
201 return false;
202 }
203
204 return true;
205}
206
208 return this->basePort_;
209}
210
211std::string rogue::interfaces::ZmqServer::doString(const std::string& data) {
212 return "";
213}
214
215#ifndef NO_PYTHON
216
218 zmq_msg_t msg;
219 Py_buffer valueBuf;
220
221 if (!this->threadEn_) return;
222
223 if (PyObject_GetBuffer(value.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0)
224 throw(rogue::GeneralError::create("ZmqServer::publish", "Failed to extract object data"));
225
226 zmq_msg_init_size(&msg, valueBuf.len);
227 memcpy(zmq_msg_data(&msg), valueBuf.buf, valueBuf.len);
228 PyBuffer_Release(&valueBuf);
229
230 rogue::GilRelease noGil;
231 zmq_sendmsg(this->zmqPub_, &msg, 0);
232}
233
234bp::object rogue::interfaces::ZmqServer::doRequest(bp::object data) {
235 bp::handle<> handle(bp::borrowed(Py_None));
236 return bp::object(handle);
237}
238
240 : rogue::interfaces::ZmqServer(addr, port) {}
241
243 if (bp::override f = this->get_override("_doRequest")) {
244 try {
245 return (f(data));
246 } catch (...) {
247 PyErr_Print();
248 }
249 }
251}
252
256
257std::string rogue::interfaces::ZmqServerWrap::doString(const std::string& data) {
258 {
260 if (bp::override f = this->get_override("_doString")) {
261 try {
262 return (f(data));
263 } catch (...) {
264 PyErr_Print();
265 }
266 }
267 }
269}
270
271std::string rogue::interfaces::ZmqServerWrap::defDoString(const std::string& data) {
273}
274
275#endif
276
277void rogue::interfaces::ZmqServer::runThread() {
278 zmq_msg_t rxMsg;
279 zmq_msg_t txMsg;
280
281 log_->logThreadId();
282 log_->info("Started Rogue server thread");
283
284 while (threadEn_) {
285 zmq_msg_init(&rxMsg);
286
287 // Get the message
288 if (zmq_recvmsg(this->zmqRep_, &rxMsg, 0) > 0) {
289#ifndef NO_PYTHON
290 Py_buffer valueBuf;
292 PyObject* val = Py_BuildValue("y#", zmq_msg_data(&rxMsg), zmq_msg_size(&rxMsg));
293
294 if (val == NULL) throw(rogue::GeneralError::create("ZmqServer::runThread", "Failed to generate bytearray"));
295
296 bp::handle<> handle(val);
297
298 bp::object ret = this->doRequest(bp::object(handle));
299
300 if (PyObject_GetBuffer(ret.ptr(), &(valueBuf), PyBUF_SIMPLE) < 0)
301 throw(rogue::GeneralError::create("ZmqServer::runThread", "Failed to extract object data"));
302
303 zmq_msg_init_size(&txMsg, valueBuf.len);
304 memcpy(zmq_msg_data(&txMsg), valueBuf.buf, valueBuf.len);
305 PyBuffer_Release(&valueBuf);
306
307 zmq_sendmsg(this->zmqRep_, &txMsg, 0);
308#endif
309
310 zmq_msg_close(&rxMsg);
311 }
312 }
313 log_->info("Stopped Rogue server thread");
314}
315
316void rogue::interfaces::ZmqServer::strThread() {
317 std::string data;
318 std::string ret;
319 zmq_msg_t msg;
320
321 log_->logThreadId();
322 log_->info("Started Rogue string server thread");
323
324 while (threadEn_) {
325 zmq_msg_init(&msg);
326
327 // Get the message
328 if (zmq_recvmsg(this->zmqStr_, &msg, 0) > 0) {
329 data = std::string((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
330 ret = this->doString(data);
331 zmq_send(this->zmqStr_, ret.c_str(), ret.size(), 0);
332 zmq_msg_close(&msg);
333 }
334 }
335 log_->info("Stopped Rogue string server thread");
336}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:60
RAII helper that acquires the Python GIL for a scope.
Definition ScopedGil.h:35
boost::python::object defDoRequest(boost::python::object data)
Calls base-class doRequest() implementation.
std::string defDoString(const std::string &data)
Calls base-class doString() implementation.
boost::python::object doRequest(boost::python::object data)
Processes a Python-object request message.
std::string doString(const std::string &data)
Processes a string request message.
ZmqServerWrap(std::string addr, uint16_t port)
Constructs wrapper server.
ZeroMQ server for Rogue control, request/reply, and publish updates.
Definition ZmqServer.h:47
static void setup_python()
Registers Python bindings for this class.
Definition ZmqServer.cpp:43
uint16_t port()
Returns currently bound base port.
void start()
Starts server, binds sockets, and launches worker threads.
Definition ZmqServer.cpp:71
virtual ~ZmqServer()
Destroys server and stops worker threads/sockets.
Definition ZmqServer.cpp:67
void stop()
Stops server threads and closes sockets.
static std::shared_ptr< rogue::interfaces::ZmqServer > create(const std::string &addr, uint16_t port)
Creates a ZeroMQ server.
Definition ZmqServer.cpp:37
virtual boost::python::object doRequest(boost::python::object data)
Handles one binary request payload.
virtual std::string doString(const std::string &data)
Handles one string request payload.
void publish(boost::python::object data)
Publishes an update payload on the publish socket.
ZmqServer(const std::string &addr, uint16_t port)
Constructs a ZeroMQ server.
Definition ZmqServer.cpp:58
std::shared_ptr< rogue::interfaces::ZmqServer > ZmqServerPtr
Definition ZmqServer.h:146