36 #include <boost/python.hpp>
37namespace bp = boost::python;
47rim::TcpServer::TcpServer(std::string addr, uint16_t port) {
51 logstr =
"memory.TcpServer.";
54 logstr.append(std::to_string(port));
59 this->respAddr_ =
"tcp://";
60 this->respAddr_.append(addr);
61 this->respAddr_.append(
":");
62 this->reqAddr_ = this->respAddr_;
64 this->zmqCtx_ = zmq_ctx_new();
65 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
66 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
70 this->respAddr_.append(std::to_string(
static_cast<int64_t
>(port + 1)));
71 this->reqAddr_.append(std::to_string(
static_cast<int64_t
>(port)));
73 this->bridgeLog_->debug(
"Creating response client port: %s", this->respAddr_.c_str());
76 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
79 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
83 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
84 throw(
rogue::GeneralError(
"memory::TcpServer::TcpServer",
"Failed to set socket receive timeout"));
86 if (zmq_bind(this->zmqResp_, this->respAddr_.c_str()) < 0)
88 "Failed to bind server to port %" PRIu16
89 " at address %s, another process may be using this port",
93 this->bridgeLog_->debug(
"Creating request client port: %s", this->reqAddr_.c_str());
95 if (zmq_bind(this->zmqReq_, this->reqAddr_.c_str()) < 0)
97 "Failed to bind server to port %" PRIu16
98 " at address %s, another process may be using this port",
102 this->bridgeLog_->debug(
"TCP memory bridge ready. request=%s response=%s",
103 this->reqAddr_.c_str(),
104 this->respAddr_.c_str());
107 this->thread_ = std::make_unique<std::thread>(&rim::TcpServer::runThread,
this);
111 pthread_setname_np(thread_->native_handle(),
"TcpServer");
128 if (zmqResp_ !=
nullptr) {
132 if (zmqReq_ !=
nullptr) {
136 if (zmqCtx_ !=
nullptr) {
137 zmq_ctx_destroy(zmqCtx_);
145rim::TcpServer::~TcpServer() {
149void rim::TcpServer::close() {
153void rim::TcpServer::start() {
159int rim::TcpServer::sendResponseMsg_(
void* msg,
int flags) {
160 return zmq_sendmsg(this->zmqResp_,
reinterpret_cast<zmq_msg_t*
>(msg), flags);
163void rim::TcpServer::stop() {
169 this->bridgeLog_->debug(
"Stopping TCP memory bridge. request=%s response=%s",
170 this->reqAddr_.c_str(),
171 this->respAddr_.c_str());
172 if (zmqResp_ !=
nullptr) {
176 if (zmqReq_ !=
nullptr) {
180 if (zmqCtx_ !=
nullptr) {
181 zmq_ctx_destroy(zmqCtx_);
188void rim::TcpServer::runThread() {
201 bridgeLog_->logThreadId();
204 for (x = 0; x < 6; x++) zmq_msg_init(&(msg[x]));
211 if (zmq_recvmsg(this->zmqReq_, &(msg[x]), 0) >= 0) {
218 zmq_getsockopt(this->zmqReq_, ZMQ_RCVMORE, &more, &moreSize);
222 }
while (threadEn_ && more);
225 if (threadEn_ && (msgCnt == 4 || msgCnt == 5)) {
227 if ((zmq_msg_size(&(msg[0])) != 4) || (zmq_msg_size(&(msg[1])) != 8) || (zmq_msg_size(&(msg[2])) != 4) ||
228 (zmq_msg_size(&(msg[3])) != 4)) {
230 "Bad message sizes. id=%zu addr=%zu size=%zu type=%zu",
231 zmq_msg_size(&(msg[0])),
232 zmq_msg_size(&(msg[1])),
233 zmq_msg_size(&(msg[2])),
234 zmq_msg_size(&(msg[3])));
235 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
240 std::memcpy(&
id, zmq_msg_data(&(msg[0])), 4);
241 std::memcpy(&addr, zmq_msg_data(&(msg[1])), 8);
242 std::memcpy(&size, zmq_msg_data(&(msg[2])), 4);
243 std::memcpy(&type, zmq_msg_data(&(msg[3])), 4);
247 if ((msgCnt != 4) || (size != 0)) {
248 bridgeLog_->warning(
"Malformed readiness probe. Id=%" PRIu32,
id);
249 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
252 zmq_msg_init_size(&(msg[4]), 0);
257 if ((msgCnt != 5) || (zmq_msg_size(&(msg[4])) != size)) {
258 bridgeLog_->warning(
"Transaction write data error. Id=%" PRIu32,
id);
259 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
263 zmq_msg_init_size(&(msg[4]), size);
268 data =
reinterpret_cast<uint8_t*
>(zmq_msg_data(&(msg[4])));
270 bridgeLog_->debug(
"Starting transaction id=%" PRIu32
", addr=0x%" PRIx64
", size=%" PRIu32
279 reqTransaction(addr, size, data, type);
283 bridgeLog_->debug(
"Done transaction id=%" PRIu32
", addr=0x%" PRIx64
", size=%" PRIu32
284 ", type=%" PRIu32
", result=(%s)",
293 if (result.length() == 0) result =
"OK";
294 if (zmq_msg_init_size(&(msg[5]), result.length()) < 0) {
295 bridgeLog_->warning(
"zmq_msg_init_size failed for result (%" PRIu32
" bytes): %s",
296 static_cast<uint32_t
>(result.length()), zmq_strerror(zmq_errno()));
297 for (x = 0; x < 5; x++) zmq_msg_close(&(msg[x]));
300 std::memcpy(zmq_msg_data(&(msg[5])), result.c_str(), result.length());
302 uint32_t sendFailed = 0;
303 for (x = 0; x < 6; x++) {
304 if (this->sendResponseMsg_(&(msg[x]), (x == 5) ? 0 : ZMQ_SNDMORE) < 0) {
305 bridgeLog_->warning(
"zmq_sendmsg failed on part %" PRIu32
" for id=%" PRIu32
": %s",
306 x,
id, zmq_strerror(zmq_errno()));
308 zmq_msg_close(&(msg[x]));
309 for (uint32_t y = x + 1; y < 6; y++) zmq_msg_close(&(msg[y]));
314 bridgeLog_->error(
"Multi-part reply for id=%" PRIu32
315 " failed mid-stream on part %" PRIu32
316 "; peer may have received a torso-only response. "
317 "Resetting response socket to clear PUSH multipart FSM.",
321 if (this->zmqResp_ !=
nullptr) {
322 if (zmq_unbind(this->zmqResp_, this->respAddr_.c_str()) != 0) {
323 bridgeLog_->warning(
"Failed to unbind response socket from %s during recovery: %s",
324 this->respAddr_.c_str(), zmq_strerror(zmq_errno()));
326 zmq_close(this->zmqResp_);
327 this->zmqResp_ =
nullptr;
330 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
331 bool rebuilt = (this->zmqResp_ !=
nullptr);
335 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &lopt,
sizeof(lopt)) != 0) {
336 bridgeLog_->error(
"Failed to set ZMQ_LINGER on rebuilt response socket: %s",
337 zmq_strerror(zmq_errno()));
339 }
else if (zmq_bind(this->zmqResp_, this->respAddr_.c_str()) < 0) {
340 bridgeLog_->error(
"Failed to rebind response socket to %s: %s",
341 this->respAddr_.c_str(), zmq_strerror(zmq_errno()));
347 if (this->zmqResp_ !=
nullptr) {
348 zmq_close(this->zmqResp_);
349 this->zmqResp_ =
nullptr;
351 bridgeLog_->error(
"Unable to recover TcpServer response socket; "
352 "exiting bridge worker thread (stop()/dtor will "
353 "complete teardown)");
358 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
363void rim::TcpServer::setup_python() {
366#pragma GCC diagnostic push
367#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
368 bp::class_<rim::TcpServer, rim::TcpServerPtr, bp::bases<rim::Master>, boost::noncopyable>(
370 bp::init<std::string, uint16_t>())
371 .def(
"close", &rim::TcpServer::close)
372 .def(
"_start", &rim::TcpServer::start)
373 .def(
"_stop", &rim::TcpServer::stop);
374#pragma GCC diagnostic pop
376 bp::implicitly_convertible<rim::TcpServerPtr, rim::MasterPtr>();
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
std::shared_ptr< rogue::interfaces::memory::TcpServer > TcpServerPtr
Shared pointer alias for TcpServer.
static const uint32_t TcpBridgeProbe
Internal TCP bridge readiness probe transaction type.
static const uint32_t Write
Memory write transaction type.
static const uint32_t Post
Memory posted write transaction type.