36 #include <boost/python.hpp>
37namespace bp = boost::python;
47rim::TcpServer::TcpServer(std::string addr, uint16_t port) {
51 logstr =
"memory.TcpServer.";
54 logstr.append(std::to_string(port));
59 this->respAddr_ =
"tcp://";
60 this->respAddr_.append(addr);
61 this->respAddr_.append(
":");
62 this->reqAddr_ = this->respAddr_;
64 this->zmqCtx_ = zmq_ctx_new();
65 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
66 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
68 this->respAddr_.append(std::to_string(
static_cast<int64_t
>(port + 1)));
69 this->reqAddr_.append(std::to_string(
static_cast<int64_t
>(port)));
71 this->bridgeLog_->debug(
"Creating response client port: %s", this->respAddr_.c_str());
74 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
77 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
81 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
82 throw(
rogue::GeneralError(
"memory::TcpServer::TcpServer",
"Failed to set socket receive timeout"));
84 if (zmq_bind(this->zmqResp_, this->respAddr_.c_str()) < 0)
86 "Failed to bind server to port %" PRIu16
87 " at address %s, another process may be using this port",
91 this->bridgeLog_->debug(
"Creating request client port: %s", this->reqAddr_.c_str());
93 if (zmq_bind(this->zmqReq_, this->reqAddr_.c_str()) < 0)
95 "Failed to bind server to port %" PRIu16
96 " at address %s, another process may be using this port",
102 this->thread_ =
new std::thread(&rim::TcpServer::runThread,
this);
106 pthread_setname_np(thread_->native_handle(),
"TcpServer");
111rim::TcpServer::~TcpServer() {
115void rim::TcpServer::close() {
119void rim::TcpServer::start() {
125void rim::TcpServer::stop() {
130 zmq_close(this->zmqResp_);
131 zmq_close(this->zmqReq_);
132 zmq_ctx_destroy(this->zmqCtx_);
137void rim::TcpServer::runThread() {
150 bridgeLog_->logThreadId();
153 for (x = 0; x < 6; x++) zmq_msg_init(&(msg[x]));
160 if (zmq_recvmsg(this->zmqReq_, &(msg[x]), 0) >= 0) {
167 zmq_getsockopt(this->zmqReq_, ZMQ_RCVMORE, &more, &moreSize);
171 }
while (threadEn_ && more);
174 if (threadEn_ && (msgCnt == 4 || msgCnt == 5)) {
176 if ((zmq_msg_size(&(msg[0])) != 4) || (zmq_msg_size(&(msg[1])) != 8) || (zmq_msg_size(&(msg[2])) != 4) ||
177 (zmq_msg_size(&(msg[3])) != 4)) {
178 bridgeLog_->warning(
"Bad message sizes");
179 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
184 std::memcpy(&
id, zmq_msg_data(&(msg[0])), 4);
185 std::memcpy(&addr, zmq_msg_data(&(msg[1])), 8);
186 std::memcpy(&size, zmq_msg_data(&(msg[2])), 4);
187 std::memcpy(&type, zmq_msg_data(&(msg[3])), 4);
191 if ((msgCnt != 4) || (size != 0)) {
192 bridgeLog_->warning(
"Malformed readiness probe. Id=%" PRIu32,
id);
193 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
196 zmq_msg_init_size(&(msg[4]), 0);
201 if ((msgCnt != 5) || (zmq_msg_size(&(msg[4])) != size)) {
202 bridgeLog_->warning(
"Transaction write data error. Id=%" PRIu32,
id);
203 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
207 zmq_msg_init_size(&(msg[4]), size);
212 data =
reinterpret_cast<uint8_t*
>(zmq_msg_data(&(msg[4])));
214 bridgeLog_->debug(
"Starting transaction id=%" PRIu32
", addr=0x%" PRIx64
", size=%" PRIu32
223 reqTransaction(addr, size, data, type);
227 bridgeLog_->debug(
"Done transaction id=%" PRIu32
", addr=0x%" PRIx64
", size=%" PRIu32
228 ", type=%" PRIu32
", result=(%s)",
237 if (result.length() == 0) result =
"OK";
238 zmq_msg_init_size(&(msg[5]), result.length());
239 std::memcpy(zmq_msg_data(&(msg[5])), result.c_str(), result.length());
242 for (x = 0; x < 6; x++) zmq_sendmsg(this->zmqResp_, &(msg[x]), (x == 5) ? 0 : ZMQ_SNDMORE);
244 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
249void rim::TcpServer::setup_python() {
252 bp::class_<rim::TcpServer, rim::TcpServerPtr, bp::bases<rim::Master>, boost::noncopyable>(
254 bp::init<std::string, uint16_t>())
255 .def(
"close", &rim::TcpServer::close)
256 .def(
"_start", &rim::TcpServer::start)
257 .def(
"_stop", &rim::TcpServer::stop);
259 bp::implicitly_convertible<rim::TcpServerPtr, rim::MasterPtr>();
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
std::shared_ptr< rogue::interfaces::memory::TcpServer > TcpServerPtr
Shared pointer alias for TcpServer.
static const uint32_t TcpBridgeProbe
Internal TCP bridge readiness probe transaction type.
static const uint32_t Write
Memory write transaction type.
static const uint32_t Post
Memory posted write transaction type.