rogue
Loading...
Searching...
No Matches
TcpServer.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <zmq.h>
23
24#include <cstring>
25#include <memory>
26#include <string>
27
28#include "rogue/GeneralError.h"
29#include "rogue/GilRelease.h"
30#include "rogue/Logging.h"
32
34
35#ifndef NO_PYTHON
36 #include <boost/python.hpp>
37namespace bp = boost::python;
38#endif
39
41rim::TcpServerPtr rim::TcpServer::create(std::string addr, uint16_t port) {
42 rim::TcpServerPtr r = std::make_shared<rim::TcpServer>(addr, port);
43 return (r);
44}
45
47rim::TcpServer::TcpServer(std::string addr, uint16_t port) {
48 std::string logstr;
49 uint32_t opt;
50
51 logstr = "memory.TcpServer.";
52 logstr.append(addr);
53 logstr.append(".");
54 logstr.append(std::to_string(port));
55
56 this->bridgeLog_ = rogue::Logging::create(logstr);
57
58 // Format address
59 this->respAddr_ = "tcp://";
60 this->respAddr_.append(addr);
61 this->respAddr_.append(":");
62 this->reqAddr_ = this->respAddr_;
63
64 this->zmqCtx_ = zmq_ctx_new();
65 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
66 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
67
68 // Throws before threadEn_=true skip the dtor's stop(); free the context here.
69 try {
70 this->respAddr_.append(std::to_string(static_cast<int64_t>(port + 1)));
71 this->reqAddr_.append(std::to_string(static_cast<int64_t>(port)));
72
73 this->bridgeLog_->debug("Creating response client port: %s", this->respAddr_.c_str());
74
75 opt = 0;
76 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
77 throw(rogue::GeneralError("memory::TcpServer::TcpServer", "Failed to set socket linger"));
78
79 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
80 throw(rogue::GeneralError("memory::TcpServer::TcpServer", "Failed to set socket linger"));
81
82 opt = 100;
83 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
84 throw(rogue::GeneralError("memory::TcpServer::TcpServer", "Failed to set socket receive timeout"));
85
86 if (zmq_bind(this->zmqResp_, this->respAddr_.c_str()) < 0)
87 throw(rogue::GeneralError::create("memory::TcpServer::TcpServer",
88 "Failed to bind server to port %" PRIu16
89 " at address %s, another process may be using this port",
90 port + 1,
91 addr.c_str()));
92
93 this->bridgeLog_->debug("Creating request client port: %s", this->reqAddr_.c_str());
94
95 if (zmq_bind(this->zmqReq_, this->reqAddr_.c_str()) < 0)
96 throw(rogue::GeneralError::create("memory::TcpServer::TcpServer",
97 "Failed to bind server to port %" PRIu16
98 " at address %s, another process may be using this port",
99 port,
100 addr.c_str()));
101
102 this->bridgeLog_->debug("TCP memory bridge ready. request=%s response=%s",
103 this->reqAddr_.c_str(),
104 this->respAddr_.c_str());
105
106 threadEn_ = true;
107 this->thread_ = std::make_unique<std::thread>(&rim::TcpServer::runThread, this);
108
109 // Set a thread name
110#ifndef __MACH__
111 pthread_setname_np(thread_->native_handle(), "TcpServer");
112#endif
113 } catch (...) {
114 // ~std::thread on a joinable thread calls std::terminate(); join first.
115 // RCVTIMEO=100 was already set on zmqReq_ above so the worker exits
116 // within ~100 ms of threadEn_=false. Release the GIL around join() to
117 // match stop(): the worker may be blocked acquiring the GIL to deliver
118 // a transaction into Python, and joining while holding the GIL would
119 // deadlock.
120 threadEn_ = false;
121 if (thread_) {
122 {
123 rogue::GilRelease noGil;
124 thread_->join();
125 }
126 thread_.reset();
127 }
128 if (zmqResp_ != nullptr) {
129 zmq_close(zmqResp_);
130 zmqResp_ = nullptr;
131 }
132 if (zmqReq_ != nullptr) {
133 zmq_close(zmqReq_);
134 zmqReq_ = nullptr;
135 }
136 if (zmqCtx_ != nullptr) {
137 zmq_ctx_destroy(zmqCtx_);
138 zmqCtx_ = nullptr;
139 }
140 throw;
141 }
142}
143
145rim::TcpServer::~TcpServer() {
146 this->stop();
147}
148
149void rim::TcpServer::close() {
150 this->stop();
151}
152
153void rim::TcpServer::start() {
154 // The bridge is fully bound and the worker thread is already running by
155 // the time the constructor returns. This hook exists for managed-lifecycle
156 // symmetry with TcpClient.
157}
158
159int rim::TcpServer::sendResponseMsg_(void* msg, int flags) {
160 return zmq_sendmsg(this->zmqResp_, reinterpret_cast<zmq_msg_t*>(msg), flags);
161}
162
163void rim::TcpServer::stop() {
164 if (threadEn_) {
165 rogue::GilRelease noGil;
166 threadEn_ = false;
167 thread_->join();
168 thread_.reset();
169 this->bridgeLog_->debug("Stopping TCP memory bridge. request=%s response=%s",
170 this->reqAddr_.c_str(),
171 this->respAddr_.c_str());
172 if (zmqResp_ != nullptr) {
173 zmq_close(zmqResp_);
174 zmqResp_ = nullptr;
175 }
176 if (zmqReq_ != nullptr) {
177 zmq_close(zmqReq_);
178 zmqReq_ = nullptr;
179 }
180 if (zmqCtx_ != nullptr) {
181 zmq_ctx_destroy(zmqCtx_);
182 zmqCtx_ = nullptr;
183 }
184 }
185}
186
188void rim::TcpServer::runThread() {
189 uint8_t* data;
190 uint64_t more;
191 size_t moreSize;
192 uint32_t x;
193 uint32_t msgCnt;
194 zmq_msg_t msg[6];
195 uint32_t id;
196 uint64_t addr;
197 uint32_t size;
198 uint32_t type;
199 std::string result;
200
201 bridgeLog_->logThreadId();
202
203 while (threadEn_) {
204 for (x = 0; x < 6; x++) zmq_msg_init(&(msg[x]));
205 msgCnt = 0;
206 x = 0;
207
208 // Get message
209 do {
210 // Get the message
211 if (zmq_recvmsg(this->zmqReq_, &(msg[x]), 0) >= 0) {
212 if (x != 4) x++;
213 msgCnt++;
214
215 // Is there more data?
216 more = 0;
217 moreSize = 8;
218 zmq_getsockopt(this->zmqReq_, ZMQ_RCVMORE, &more, &moreSize);
219 } else {
220 more = 1;
221 }
222 } while (threadEn_ && more);
223
224 // Proper message received
225 if (threadEn_ && (msgCnt == 4 || msgCnt == 5)) {
226 // Check sizes
227 if ((zmq_msg_size(&(msg[0])) != 4) || (zmq_msg_size(&(msg[1])) != 8) || (zmq_msg_size(&(msg[2])) != 4) ||
228 (zmq_msg_size(&(msg[3])) != 4)) {
229 bridgeLog_->warning(
230 "Bad message sizes. id=%zu addr=%zu size=%zu type=%zu",
231 zmq_msg_size(&(msg[0])),
232 zmq_msg_size(&(msg[1])),
233 zmq_msg_size(&(msg[2])),
234 zmq_msg_size(&(msg[3])));
235 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
236 continue; // while (1)
237 }
238
239 // Get return fields
240 std::memcpy(&id, zmq_msg_data(&(msg[0])), 4);
241 std::memcpy(&addr, zmq_msg_data(&(msg[1])), 8);
242 std::memcpy(&size, zmq_msg_data(&(msg[2])), 4);
243 std::memcpy(&type, zmq_msg_data(&(msg[3])), 4);
244
245 // Bridge readiness probe is handled locally.
246 if (type == rim::TcpBridgeProbe) {
247 if ((msgCnt != 4) || (size != 0)) {
248 bridgeLog_->warning("Malformed readiness probe. Id=%" PRIu32, id);
249 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
250 continue; // while (1)
251 }
252 zmq_msg_init_size(&(msg[4]), 0);
253 result = "OK";
254
255 // Write data is expected
256 } else if ((type == rim::Write) || (type == rim::Post)) {
257 if ((msgCnt != 5) || (zmq_msg_size(&(msg[4])) != size)) {
258 bridgeLog_->warning("Transaction write data error. Id=%" PRIu32, id);
259 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
260 continue; // while (1)
261 }
262 } else {
263 zmq_msg_init_size(&(msg[4]), size);
264 }
265
266 if (type != rim::TcpBridgeProbe) {
267 // Data pointer
268 data = reinterpret_cast<uint8_t*>(zmq_msg_data(&(msg[4])));
269
270 bridgeLog_->debug("Starting transaction id=%" PRIu32 ", addr=0x%" PRIx64 ", size=%" PRIu32
271 ", type=%" PRIu32,
272 id,
273 addr,
274 size,
275 type);
276
277 // Execute transaction and wait for result
278 this->clearError();
279 reqTransaction(addr, size, data, type);
280 waitTransaction(0);
281 result = getError();
282
283 bridgeLog_->debug("Done transaction id=%" PRIu32 ", addr=0x%" PRIx64 ", size=%" PRIu32
284 ", type=%" PRIu32 ", result=(%s)",
285 id,
286 addr,
287 size,
288 type,
289 result.c_str());
290 }
291
292 // Result message, at least one char needs to be sent
293 if (result.length() == 0) result = "OK";
294 if (zmq_msg_init_size(&(msg[5]), result.length()) < 0) {
295 bridgeLog_->warning("zmq_msg_init_size failed for result (%" PRIu32 " bytes): %s",
296 static_cast<uint32_t>(result.length()), zmq_strerror(zmq_errno()));
297 for (x = 0; x < 5; x++) zmq_msg_close(&(msg[x]));
298 continue;
299 }
300 std::memcpy(zmq_msg_data(&(msg[5])), result.c_str(), result.length());
301
302 uint32_t sendFailed = 0;
303 for (x = 0; x < 6; x++) {
304 if (this->sendResponseMsg_(&(msg[x]), (x == 5) ? 0 : ZMQ_SNDMORE) < 0) {
305 bridgeLog_->warning("zmq_sendmsg failed on part %" PRIu32 " for id=%" PRIu32 ": %s",
306 x, id, zmq_strerror(zmq_errno()));
307 sendFailed = 1;
308 zmq_msg_close(&(msg[x]));
309 for (uint32_t y = x + 1; y < 6; y++) zmq_msg_close(&(msg[y]));
310 break;
311 }
312 }
313 if (sendFailed) {
314 bridgeLog_->error("Multi-part reply for id=%" PRIu32
315 " failed mid-stream on part %" PRIu32
316 "; peer may have received a torso-only response. "
317 "Resetting response socket to clear PUSH multipart FSM.",
318 id, x);
319
320 // Rebuild response socket to reset multipart FSM.
321 if (this->zmqResp_ != nullptr) {
322 if (zmq_unbind(this->zmqResp_, this->respAddr_.c_str()) != 0) {
323 bridgeLog_->warning("Failed to unbind response socket from %s during recovery: %s",
324 this->respAddr_.c_str(), zmq_strerror(zmq_errno()));
325 }
326 zmq_close(this->zmqResp_);
327 this->zmqResp_ = nullptr;
328 }
329
330 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
331 bool rebuilt = (this->zmqResp_ != nullptr);
332
333 if (rebuilt) {
334 int32_t lopt = 0;
335 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &lopt, sizeof(lopt)) != 0) {
336 bridgeLog_->error("Failed to set ZMQ_LINGER on rebuilt response socket: %s",
337 zmq_strerror(zmq_errno()));
338 rebuilt = false;
339 } else if (zmq_bind(this->zmqResp_, this->respAddr_.c_str()) < 0) {
340 bridgeLog_->error("Failed to rebind response socket to %s: %s",
341 this->respAddr_.c_str(), zmq_strerror(zmq_errno()));
342 rebuilt = false;
343 }
344 }
345
346 if (!rebuilt) {
347 if (this->zmqResp_ != nullptr) {
348 zmq_close(this->zmqResp_);
349 this->zmqResp_ = nullptr;
350 }
351 bridgeLog_->error("Unable to recover TcpServer response socket; "
352 "exiting bridge worker thread (stop()/dtor will "
353 "complete teardown)");
354 return;
355 }
356 }
357 } else {
358 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
359 }
360 }
361}
362
363void rim::TcpServer::setup_python() {
364#ifndef NO_PYTHON
365
366#pragma GCC diagnostic push
367#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
368 bp::class_<rim::TcpServer, rim::TcpServerPtr, bp::bases<rim::Master>, boost::noncopyable>(
369 "TcpServer",
370 bp::init<std::string, uint16_t>())
371 .def("close", &rim::TcpServer::close)
372 .def("_start", &rim::TcpServer::start)
373 .def("_stop", &rim::TcpServer::stop);
374#pragma GCC diagnostic pop
375
376 bp::implicitly_convertible<rim::TcpServerPtr, rim::MasterPtr>();
377#endif
378}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
std::shared_ptr< rogue::interfaces::memory::TcpServer > TcpServerPtr
Shared pointer alias for TcpServer.
Definition TcpServer.h:156
static const uint32_t TcpBridgeProbe
Internal TCP bridge readiness probe transaction type.
Definition Constants.h:67
static const uint32_t Write
Memory write transaction type.
Definition Constants.h:43
static const uint32_t Post
Memory posted write transaction type.
Definition Constants.h:50