50ris::TcpCore::TcpCore(
const std::string& addr, uint16_t port,
bool server) {
54 logstr =
"stream.TcpCore.";
58 logstr.append(
"Server.");
60 logstr.append(
"Client.");
61 logstr.append(std::to_string(port));
66 this->pullAddr_ =
"tcp://";
67 this->pullAddr_.append(addr);
68 this->pullAddr_.append(
":");
69 this->pushAddr_ = this->pullAddr_;
71 this->zmqCtx_ = zmq_ctx_new();
72 this->zmqPull_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
73 this->zmqPush_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
77 if (zmq_setsockopt(this->zmqPush_, ZMQ_IMMEDIATE, &opt,
sizeof(int32_t)) != 0)
81 if (zmq_setsockopt(this->zmqPush_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
84 if (zmq_setsockopt(this->zmqPull_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
88 if (zmq_setsockopt(this->zmqPull_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
89 throw(
rogue::GeneralError(
"stream::TcpCore::TcpCore",
"Failed to set socket receive timeout"));
93 this->pullAddr_.append(std::to_string(
static_cast<int64_t
>(port)));
94 this->pushAddr_.append(std::to_string(
static_cast<int64_t
>(port + 1)));
96 this->bridgeLog_->debug(
"Creating pull server port: %s", this->pullAddr_.c_str());
98 if (zmq_bind(this->zmqPull_, this->pullAddr_.c_str()) < 0)
100 "Failed to bind server to port %" PRIu16
101 " at address %s, another process may be using this port",
105 this->bridgeLog_->debug(
"Creating push server port: %s", this->pushAddr_.c_str());
107 if (zmq_bind(this->zmqPush_, this->pushAddr_.c_str()) < 0)
109 "Failed to bind server to port %" PRIu16
110 " at address %s, another process may be using this port",
116 this->pullAddr_.append(std::to_string(
static_cast<int64_t
>(port + 1)));
117 this->pushAddr_.append(std::to_string(
static_cast<int64_t
>(port)));
119 this->bridgeLog_->debug(
"Creating pull client port: %s", this->pullAddr_.c_str());
121 if (zmq_connect(this->zmqPull_, this->pullAddr_.c_str()) < 0)
123 "Failed to connect to remote port %" PRIu16
" at address %s",
127 this->bridgeLog_->debug(
"Creating push client port: %s", this->pushAddr_.c_str());
129 if (zmq_connect(this->zmqPush_, this->pushAddr_.c_str()) < 0)
131 "Failed to connect to remote port %" PRIu16
" at address %s",
138 this->thread_ =
new std::thread(&ris::TcpCore::runThread,
this);
140 this->bridgeLog_->debug(
"TCP stream bridge ready. pull=%s push=%s",
141 this->pullAddr_.c_str(),
142 this->pushAddr_.c_str());
146 pthread_setname_np(thread_->native_handle(),
"TcpCore");
185 std::lock_guard<std::mutex> lock(bridgeMtx_);
187 if ((zmq_msg_init_size(&(msg[0]), 2) < 0) ||
188 (zmq_msg_init_size(&(msg[1]), 1) < 0) ||
189 (zmq_msg_init_size(&(msg[2]), 1) < 0)) {
190 bridgeLog_->warning(
"Failed to init message header");
194 if (zmq_msg_init_size(&(msg[3]), frame->getPayload()) < 0) {
195 bridgeLog_->warning(
"Failed to init message with size %" PRIu32, frame->getPayload());
199 flags = frame->getFlags();
200 std::memcpy(zmq_msg_data(&(msg[0])), &flags, 2);
202 chan = frame->getChannel();
203 std::memcpy(zmq_msg_data(&(msg[1])), &chan, 1);
205 err = frame->getError();
206 std::memcpy(zmq_msg_data(&(msg[2])), &err, 1);
210 data =
reinterpret_cast<uint8_t*
>(zmq_msg_data(&(msg[3])));
214 for (x = 0; x < 4; x++) {
215 if (zmq_sendmsg(this->zmqPush_, &(msg[x]), (x == 3) ? 0 : ZMQ_SNDMORE) < 0)
216 bridgeLog_->warning(
"Failed to push message with size %" PRIu32
" on %s: %s",
218 this->pushAddr_.c_str(),
219 zmq_strerror(zmq_errno()));
221 bridgeLog_->debug(
"Pushed TCP frame with size %" PRIu32
" on %s", frame->getPayload(), this->pushAddr_.c_str());
225void ris::TcpCore::runThread() {
238 bridgeLog_->logThreadId();
241 for (x = 0; x < 4; x++) zmq_msg_init(&(msg[x]));
248 if (zmq_recvmsg(this->zmqPull_, &(msg[x]), 0) > 0) {
255 zmq_getsockopt(this->zmqPull_, ZMQ_RCVMORE, &more, &moreSize);
259 }
while (threadEn_ && more);
262 if (threadEn_ && (msgCnt == 4)) {
264 if ((zmq_msg_size(&(msg[0])) != 2) || (zmq_msg_size(&(msg[1])) != 1) || (zmq_msg_size(&(msg[2])) != 1)) {
266 "Bad message sizes. flags=%zu channel=%zu error=%zu",
267 zmq_msg_size(&(msg[0])),
268 zmq_msg_size(&(msg[1])),
269 zmq_msg_size(&(msg[2])));
270 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
275 std::memcpy(&flags, zmq_msg_data(&(msg[0])), 2);
276 std::memcpy(&chan, zmq_msg_data(&(msg[1])), 1);
277 std::memcpy(&err, zmq_msg_data(&(msg[2])), 1);
280 data =
reinterpret_cast<uint8_t*
>(zmq_msg_data(&(msg[3])));
281 size = zmq_msg_size(&(msg[3]));
284 frame = reqLocalFrame(size,
false);
285 frame->setPayload(size);
292 frame->setFlags(flags);
293 frame->setChannel(chan);
294 frame->setError(err);
296 bridgeLog_->debug(
"Pulled frame with size %" PRIu32, frame->getPayload());
300 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
304void ris::TcpCore::setup_python() {
307 bp::class_<ris::TcpCore, ris::TcpCorePtr, bp::bases<ris::Master, ris::Slave>, boost::noncopyable>(
"TcpCore",
309 .def(
"close", &ris::TcpCore::close);
311 bp::implicitly_convertible<ris::TcpCorePtr, ris::MasterPtr>();
312 bp::implicitly_convertible<ris::TcpCorePtr, ris::SlavePtr>();