50ris::TcpCore::TcpCore(
const std::string& addr, uint16_t port,
bool server) {
54 this->server_ = server;
56 logstr =
"stream.TcpCore.";
60 logstr.append(
"Server.");
62 logstr.append(
"Client.");
63 logstr.append(std::to_string(port));
68 this->pullAddr_ =
"tcp://";
69 this->pullAddr_.append(addr);
70 this->pullAddr_.append(
":");
71 this->pushAddr_ = this->pullAddr_;
73 this->zmqCtx_ = zmq_ctx_new();
74 this->zmqPull_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
75 this->zmqPush_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
81 if (zmq_setsockopt(this->zmqPush_, ZMQ_IMMEDIATE, &opt,
sizeof(int32_t)) != 0)
85 if (zmq_setsockopt(this->zmqPush_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
88 if (zmq_setsockopt(this->zmqPull_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
92 if (zmq_setsockopt(this->zmqPull_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
93 throw(
rogue::GeneralError(
"stream::TcpCore::TcpCore",
"Failed to set socket receive timeout"));
97 this->pullAddr_.append(std::to_string(
static_cast<int64_t
>(port)));
98 this->pushAddr_.append(std::to_string(
static_cast<int64_t
>(port + 1)));
100 this->bridgeLog_->debug(
"Creating pull server port: %s", this->pullAddr_.c_str());
102 if (zmq_bind(this->zmqPull_, this->pullAddr_.c_str()) < 0)
104 "Failed to bind server to port %" PRIu16
105 " at address %s, another process may be using this port",
109 this->bridgeLog_->debug(
"Creating push server port: %s", this->pushAddr_.c_str());
111 if (zmq_bind(this->zmqPush_, this->pushAddr_.c_str()) < 0)
113 "Failed to bind server to port %" PRIu16
114 " at address %s, another process may be using this port",
120 this->pullAddr_.append(std::to_string(
static_cast<int64_t
>(port + 1)));
121 this->pushAddr_.append(std::to_string(
static_cast<int64_t
>(port)));
123 this->bridgeLog_->debug(
"Creating pull client port: %s", this->pullAddr_.c_str());
125 if (zmq_connect(this->zmqPull_, this->pullAddr_.c_str()) < 0)
127 "Failed to connect to remote port %" PRIu16
" at address %s",
131 this->bridgeLog_->debug(
"Creating push client port: %s", this->pushAddr_.c_str());
133 if (zmq_connect(this->zmqPush_, this->pushAddr_.c_str()) < 0)
135 "Failed to connect to remote port %" PRIu16
" at address %s",
140 this->bridgeLog_->debug(
"TCP stream bridge ready. pull=%s push=%s",
141 this->pullAddr_.c_str(),
142 this->pushAddr_.c_str());
145 this->thread_ = std::make_unique<std::thread>(&ris::TcpCore::runThread,
this);
149 pthread_setname_np(thread_->native_handle(),
"TcpCore");
166 if (zmqPull_ !=
nullptr) {
170 if (zmqPush_ !=
nullptr) {
174 if (zmqCtx_ !=
nullptr) {
175 zmq_ctx_destroy(zmqCtx_);
218bool ris::TcpCore::rebuildPushSocket() {
219 if (this->zmqPush_ !=
nullptr) {
220 zmq_close(this->zmqPush_);
221 this->zmqPush_ =
nullptr;
224 this->zmqPush_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
225 if (this->zmqPush_ ==
nullptr) {
226 bridgeLog_->error(
"Failed to create replacement push socket on %s: %s",
227 this->pushAddr_.c_str(),
228 zmq_strerror(zmq_errno()));
233 if (zmq_setsockopt(this->zmqPush_, ZMQ_IMMEDIATE, &opt,
sizeof(opt)) != 0) {
234 bridgeLog_->error(
"Failed to set ZMQ_IMMEDIATE on rebuilt push socket: %s",
235 zmq_strerror(zmq_errno()));
236 zmq_close(this->zmqPush_);
237 this->zmqPush_ =
nullptr;
242 if (zmq_setsockopt(this->zmqPush_, ZMQ_LINGER, &opt,
sizeof(opt)) != 0) {
243 bridgeLog_->error(
"Failed to set ZMQ_LINGER on rebuilt push socket: %s",
244 zmq_strerror(zmq_errno()));
245 zmq_close(this->zmqPush_);
246 this->zmqPush_ =
nullptr;
251 if (zmq_bind(this->zmqPush_, this->pushAddr_.c_str()) < 0) {
252 bridgeLog_->error(
"Failed to rebind push socket to %s: %s",
253 this->pushAddr_.c_str(),
254 zmq_strerror(zmq_errno()));
255 zmq_close(this->zmqPush_);
256 this->zmqPush_ =
nullptr;
260 if (zmq_connect(this->zmqPush_, this->pushAddr_.c_str()) < 0) {
261 bridgeLog_->error(
"Failed to reconnect push socket to %s: %s",
262 this->pushAddr_.c_str(),
263 zmq_strerror(zmq_errno()));
264 zmq_close(this->zmqPush_);
265 this->zmqPush_ =
nullptr;
284 std::lock_guard<std::mutex> lock(bridgeMtx_);
287 bridgeLog_->debug(
"Dropping frame on stopped bridge %s (payload=%" PRIu32
")",
288 this->pushAddr_.c_str(),
289 frame->getPayload());
294 if (this->zmqPush_ ==
nullptr) {
295 if (!rebuildPushSocket()) {
296 bridgeLog_->warning(
"Push socket unavailable on %s; dropping frame "
297 "(payload=%" PRIu32
")",
298 this->pushAddr_.c_str(),
299 frame->getPayload());
304 if (zmq_msg_init_size(&(msg[0]), 2) < 0) {
305 bridgeLog_->warning(
"Failed to init message header");
308 if (zmq_msg_init_size(&(msg[1]), 1) < 0) {
309 bridgeLog_->warning(
"Failed to init message header");
310 zmq_msg_close(&(msg[0]));
313 if (zmq_msg_init_size(&(msg[2]), 1) < 0) {
314 bridgeLog_->warning(
"Failed to init message header");
315 zmq_msg_close(&(msg[0]));
316 zmq_msg_close(&(msg[1]));
320 if (zmq_msg_init_size(&(msg[3]), frame->getPayload()) < 0) {
321 bridgeLog_->warning(
"Failed to init message with size %" PRIu32, frame->getPayload());
322 zmq_msg_close(&(msg[0]));
323 zmq_msg_close(&(msg[1]));
324 zmq_msg_close(&(msg[2]));
328 flags = frame->getFlags();
329 std::memcpy(zmq_msg_data(&(msg[0])), &flags, 2);
331 chan = frame->getChannel();
332 std::memcpy(zmq_msg_data(&(msg[1])), &chan, 1);
334 err = frame->getError();
335 std::memcpy(zmq_msg_data(&(msg[2])), &err, 1);
339 data =
reinterpret_cast<uint8_t*
>(zmq_msg_data(&(msg[3])));
342 bool sendFailed =
false;
343 for (x = 0; x < 4; x++) {
344 if (zmq_sendmsg(this->zmqPush_, &(msg[x]), (x == 3) ? 0 : ZMQ_SNDMORE) < 0) {
345 bridgeLog_->warning(
"Failed to push message part %" PRIu32
" (frame size %" PRIu32
") on %s: %s",
348 this->pushAddr_.c_str(),
349 zmq_strerror(zmq_errno()));
351 zmq_msg_close(&(msg[x]));
352 for (uint32_t y = x + 1; y < 4; y++) zmq_msg_close(&(msg[y]));
358 bridgeLog_->error(
"Multi-part frame failed mid-stream on part %" PRIu32
359 "; peer may have received a torso-only frame. "
360 "Resetting push socket to clear PUSH multipart FSM.",
363 if (!rebuildPushSocket()) {
364 bridgeLog_->error(
"Unable to recover TcpCore push socket on %s; "
365 "next acceptFrame() will retry the rebuild",
366 this->pushAddr_.c_str());
371 bridgeLog_->debug(
"Pushed TCP frame with size %" PRIu32
" on %s", frame->getPayload(), this->pushAddr_.c_str());
375void ris::TcpCore::runThread() {
388 bridgeLog_->logThreadId();
391 for (x = 0; x < 4; x++) zmq_msg_init(&(msg[x]));
398 if (zmq_recvmsg(this->zmqPull_, &(msg[x]), 0) > 0) {
405 zmq_getsockopt(this->zmqPull_, ZMQ_RCVMORE, &more, &moreSize);
409 }
while (threadEn_ && more);
412 if (threadEn_ && (msgCnt == 4)) {
414 if ((zmq_msg_size(&(msg[0])) != 2) || (zmq_msg_size(&(msg[1])) != 1) || (zmq_msg_size(&(msg[2])) != 1)) {
416 "Bad message sizes. flags=%zu channel=%zu error=%zu",
417 zmq_msg_size(&(msg[0])),
418 zmq_msg_size(&(msg[1])),
419 zmq_msg_size(&(msg[2])));
420 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
425 std::memcpy(&flags, zmq_msg_data(&(msg[0])), 2);
426 std::memcpy(&chan, zmq_msg_data(&(msg[1])), 1);
427 std::memcpy(&err, zmq_msg_data(&(msg[2])), 1);
430 data =
reinterpret_cast<uint8_t*
>(zmq_msg_data(&(msg[3])));
431 size = zmq_msg_size(&(msg[3]));
434 frame = reqLocalFrame(size,
false);
435 frame->setPayload(size);
442 frame->setFlags(flags);
443 frame->setChannel(chan);
444 frame->setError(err);
446 bridgeLog_->debug(
"Pulled frame with size %" PRIu32, frame->getPayload());
450 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
454void ris::TcpCore::setup_python() {
457#pragma GCC diagnostic push
458#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
459 bp::class_<ris::TcpCore, ris::TcpCorePtr, bp::bases<ris::Master, ris::Slave>, boost::noncopyable>(
"TcpCore",
461 .def(
"close", &ris::TcpCore::close);
462#pragma GCC diagnostic pop
464 bp::implicitly_convertible<ris::TcpCorePtr, ris::MasterPtr>();
465 bp::implicitly_convertible<ris::TcpCorePtr, ris::SlavePtr>();