50ris::TcpCore::TcpCore(
const std::string& addr, uint16_t port,
bool server) {
54 logstr =
"stream.TcpCore.";
58 logstr.append(
"Server.");
60 logstr.append(
"Client.");
61 logstr.append(std::to_string(port));
66 this->pullAddr_ =
"tcp://";
67 this->pullAddr_.append(addr);
68 this->pullAddr_.append(
":");
69 this->pushAddr_ = this->pullAddr_;
71 this->zmqCtx_ = zmq_ctx_new();
72 this->zmqPull_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
73 this->zmqPush_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
77 if (zmq_setsockopt(this->zmqPush_, ZMQ_IMMEDIATE, &opt,
sizeof(int32_t)) != 0)
81 if (zmq_setsockopt(this->zmqPush_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
84 if (zmq_setsockopt(this->zmqPull_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
88 if (zmq_setsockopt(this->zmqPull_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
89 throw(
rogue::GeneralError(
"stream::TcpCore::TcpCore",
"Failed to set socket receive timeout"));
93 this->pullAddr_.append(std::to_string(
static_cast<int64_t
>(port)));
94 this->pushAddr_.append(std::to_string(
static_cast<int64_t
>(port + 1)));
96 this->bridgeLog_->debug(
"Creating pull server port: %s", this->pullAddr_.c_str());
98 if (zmq_bind(this->zmqPull_, this->pullAddr_.c_str()) < 0)
100 "Failed to bind server to port %" PRIu16
101 " at address %s, another process may be using this port",
105 this->bridgeLog_->debug(
"Creating push server port: %s", this->pushAddr_.c_str());
107 if (zmq_bind(this->zmqPush_, this->pushAddr_.c_str()) < 0)
109 "Failed to bind server to port %" PRIu16
110 " at address %s, another process may be using this port",
116 this->pullAddr_.append(std::to_string(
static_cast<int64_t
>(port + 1)));
117 this->pushAddr_.append(std::to_string(
static_cast<int64_t
>(port)));
119 this->bridgeLog_->debug(
"Creating pull client port: %s", this->pullAddr_.c_str());
121 if (zmq_connect(this->zmqPull_, this->pullAddr_.c_str()) < 0)
123 "Failed to connect to remote port %" PRIu16
" at address %s",
127 this->bridgeLog_->debug(
"Creating push client port: %s", this->pushAddr_.c_str());
129 if (zmq_connect(this->zmqPush_, this->pushAddr_.c_str()) < 0)
131 "Failed to connect to remote port %" PRIu16
" at address %s",
138 this->thread_ =
new std::thread(&ris::TcpCore::runThread,
this);
142 pthread_setname_np(thread_->native_handle(),
"TcpCore");
178 std::lock_guard<std::mutex> lock(bridgeMtx_);
180 if ((zmq_msg_init_size(&(msg[0]), 2) < 0) ||
181 (zmq_msg_init_size(&(msg[1]), 1) < 0) ||
182 (zmq_msg_init_size(&(msg[2]), 1) < 0)) {
183 bridgeLog_->warning(
"Failed to init message header");
187 if (zmq_msg_init_size(&(msg[3]), frame->getPayload()) < 0) {
188 bridgeLog_->warning(
"Failed to init message with size %" PRIu32, frame->getPayload());
192 flags = frame->getFlags();
193 std::memcpy(zmq_msg_data(&(msg[0])), &flags, 2);
195 chan = frame->getChannel();
196 std::memcpy(zmq_msg_data(&(msg[1])), &chan, 1);
198 err = frame->getError();
199 std::memcpy(zmq_msg_data(&(msg[2])), &err, 1);
203 data =
reinterpret_cast<uint8_t*
>(zmq_msg_data(&(msg[3])));
207 for (x = 0; x < 4; x++) {
208 if (zmq_sendmsg(this->zmqPush_, &(msg[x]), (x == 3) ? 0 : ZMQ_SNDMORE) < 0)
209 bridgeLog_->warning(
"Failed to push message with size %" PRIu32
" on %s",
211 this->pushAddr_.c_str());
213 bridgeLog_->debug(
"Pushed TCP frame with size %" PRIu32
" on %s", frame->getPayload(), this->pushAddr_.c_str());
217void ris::TcpCore::runThread() {
230 bridgeLog_->logThreadId();
233 for (x = 0; x < 4; x++) zmq_msg_init(&(msg[x]));
240 if (zmq_recvmsg(this->zmqPull_, &(msg[x]), 0) > 0) {
247 zmq_getsockopt(this->zmqPull_, ZMQ_RCVMORE, &more, &moreSize);
251 }
while (threadEn_ && more);
254 if (threadEn_ && (msgCnt == 4)) {
256 if ((zmq_msg_size(&(msg[0])) != 2) || (zmq_msg_size(&(msg[1])) != 1) || (zmq_msg_size(&(msg[2])) != 1)) {
257 bridgeLog_->warning(
"Bad message sizes");
258 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
263 std::memcpy(&flags, zmq_msg_data(&(msg[0])), 2);
264 std::memcpy(&chan, zmq_msg_data(&(msg[1])), 1);
265 std::memcpy(&err, zmq_msg_data(&(msg[2])), 1);
268 data =
reinterpret_cast<uint8_t*
>(zmq_msg_data(&(msg[3])));
269 size = zmq_msg_size(&(msg[3]));
272 frame = reqLocalFrame(size,
false);
273 frame->setPayload(size);
280 frame->setFlags(flags);
281 frame->setChannel(chan);
282 frame->setError(err);
284 bridgeLog_->debug(
"Pulled frame with size %" PRIu32, frame->getPayload());
288 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
292void ris::TcpCore::setup_python() {
295 bp::class_<ris::TcpCore, ris::TcpCorePtr, bp::bases<ris::Master, ris::Slave>, boost::noncopyable>(
"TcpCore",
297 .def(
"close", &ris::TcpCore::close);
299 bp::implicitly_convertible<ris::TcpCorePtr, ris::MasterPtr>();
300 bp::implicitly_convertible<ris::TcpCorePtr, ris::SlavePtr>();