rogue
Loading...
Searching...
No Matches
TcpCore.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <zmq.h>
23
24#include <cstring>
25#include <memory>
26#include <string>
27
28#include "rogue/GeneralError.h"
29#include "rogue/GilRelease.h"
30#include "rogue/Logging.h"
35
37
38#ifndef NO_PYTHON
39 #include <boost/python.hpp>
40namespace bp = boost::python;
41#endif
42
44ris::TcpCorePtr ris::TcpCore::create(const std::string& addr, uint16_t port, bool server) {
45 ris::TcpCorePtr r = std::make_shared<ris::TcpCore>(addr, port, server);
46 return (r);
47}
48
50ris::TcpCore::TcpCore(const std::string& addr, uint16_t port, bool server) {
51 int32_t opt;
52 std::string logstr;
53
54 logstr = "stream.TcpCore.";
55 logstr.append(addr);
56 logstr.append(".");
57 if (server)
58 logstr.append("Server.");
59 else
60 logstr.append("Client.");
61 logstr.append(std::to_string(port));
62
63 this->bridgeLog_ = rogue::Logging::create(logstr);
64
65 // Format address
66 this->pullAddr_ = "tcp://";
67 this->pullAddr_.append(addr);
68 this->pullAddr_.append(":");
69 this->pushAddr_ = this->pullAddr_;
70
71 this->zmqCtx_ = zmq_ctx_new();
72 this->zmqPull_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
73 this->zmqPush_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
74
75 // Don't buffer when no connection
76 opt = 1;
77 if (zmq_setsockopt(this->zmqPush_, ZMQ_IMMEDIATE, &opt, sizeof(int32_t)) != 0)
78 throw(rogue::GeneralError("stream::TcpCore::TcpCore", "Failed to set socket immediate"));
79
80 opt = 0;
81 if (zmq_setsockopt(this->zmqPush_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
82 throw(rogue::GeneralError("stream::TcpCore::TcpCore", "Failed to set socket linger"));
83
84 if (zmq_setsockopt(this->zmqPull_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
85 throw(rogue::GeneralError("stream::TcpCore::TcpCore", "Failed to set socket linger"));
86
87 opt = 100;
88 if (zmq_setsockopt(this->zmqPull_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
89 throw(rogue::GeneralError("stream::TcpCore::TcpCore", "Failed to set socket receive timeout"));
90
91 // Server mode
92 if (server) {
93 this->pullAddr_.append(std::to_string(static_cast<int64_t>(port)));
94 this->pushAddr_.append(std::to_string(static_cast<int64_t>(port + 1)));
95
96 this->bridgeLog_->debug("Creating pull server port: %s", this->pullAddr_.c_str());
97
98 if (zmq_bind(this->zmqPull_, this->pullAddr_.c_str()) < 0)
99 throw(rogue::GeneralError::create("stream::TcpCore::TcpCore",
100 "Failed to bind server to port %" PRIu16
101 " at address %s, another process may be using this port",
102 port,
103 addr.c_str()));
104
105 this->bridgeLog_->debug("Creating push server port: %s", this->pushAddr_.c_str());
106
107 if (zmq_bind(this->zmqPush_, this->pushAddr_.c_str()) < 0)
108 throw(rogue::GeneralError::create("stream::TcpCore::TcpCore",
109 "Failed to bind server to port %" PRIu16
110 " at address %s, another process may be using this port",
111 port + 1,
112 addr.c_str()));
113
114 // Client mode
115 } else {
116 this->pullAddr_.append(std::to_string(static_cast<int64_t>(port + 1)));
117 this->pushAddr_.append(std::to_string(static_cast<int64_t>(port)));
118
119 this->bridgeLog_->debug("Creating pull client port: %s", this->pullAddr_.c_str());
120
121 if (zmq_connect(this->zmqPull_, this->pullAddr_.c_str()) < 0)
122 throw(rogue::GeneralError::create("stream::TcpCore::TcpCore",
123 "Failed to connect to remote port %" PRIu16 " at address %s",
124 port + 1,
125 addr.c_str()));
126
127 this->bridgeLog_->debug("Creating push client port: %s", this->pushAddr_.c_str());
128
129 if (zmq_connect(this->zmqPush_, this->pushAddr_.c_str()) < 0)
130 throw(rogue::GeneralError::create("stream::TcpCore::TcpCore",
131 "Failed to connect to remote port %" PRIu16 " at address %s",
132 port,
133 addr.c_str()));
134 }
135
136 // Start rx thread
137 threadEn_ = true;
138 this->thread_ = new std::thread(&ris::TcpCore::runThread, this);
139
140 this->bridgeLog_->debug("TCP stream bridge ready. pull=%s push=%s",
141 this->pullAddr_.c_str(),
142 this->pushAddr_.c_str());
143
144 // Set a thread name
145#ifndef __MACH__
146 pthread_setname_np(thread_->native_handle(), "TcpCore");
147#endif
148}
149
151ris::TcpCore::~TcpCore() {
152 this->stop();
153}
154
155// deprecated
156void ris::TcpCore::close() {
157 this->stop();
158}
159
160void ris::TcpCore::stop() {
161 if (threadEn_) {
162 rogue::GilRelease noGil;
163 threadEn_ = false;
164 thread_->join();
165 this->bridgeLog_->debug("Stopping TCP stream bridge. pull=%s push=%s",
166 this->pullAddr_.c_str(),
167 this->pushAddr_.c_str());
168 zmq_close(this->zmqPull_);
169 zmq_close(this->zmqPush_);
170 zmq_ctx_destroy(this->zmqCtx_);
171 }
172}
173
175void ris::TcpCore::acceptFrame(ris::FramePtr frame) {
176 uint32_t x;
177 uint8_t* data;
178 uint16_t flags;
179 uint8_t chan;
180 uint8_t err;
181 zmq_msg_t msg[4];
182
183 rogue::GilRelease noGil;
184 ris::FrameLockPtr frLock = frame->lock();
185 std::lock_guard<std::mutex> lock(bridgeMtx_);
186
187 if ((zmq_msg_init_size(&(msg[0]), 2) < 0) || // Flags
188 (zmq_msg_init_size(&(msg[1]), 1) < 0) || // Channel
189 (zmq_msg_init_size(&(msg[2]), 1) < 0)) { // Error
190 bridgeLog_->warning("Failed to init message header");
191 return;
192 }
193
194 if (zmq_msg_init_size(&(msg[3]), frame->getPayload()) < 0) {
195 bridgeLog_->warning("Failed to init message with size %" PRIu32, frame->getPayload());
196 return;
197 }
198
199 flags = frame->getFlags();
200 std::memcpy(zmq_msg_data(&(msg[0])), &flags, 2);
201
202 chan = frame->getChannel();
203 std::memcpy(zmq_msg_data(&(msg[1])), &chan, 1);
204
205 err = frame->getError();
206 std::memcpy(zmq_msg_data(&(msg[2])), &err, 1);
207
208 // Copy data
209 ris::FrameIterator iter = frame->begin();
210 data = reinterpret_cast<uint8_t*>(zmq_msg_data(&(msg[3])));
211 ris::fromFrame(iter, frame->getPayload(), data);
212
213 // Send data
214 for (x = 0; x < 4; x++) {
215 if (zmq_sendmsg(this->zmqPush_, &(msg[x]), (x == 3) ? 0 : ZMQ_SNDMORE) < 0)
216 bridgeLog_->warning("Failed to push message with size %" PRIu32 " on %s: %s",
217 frame->getPayload(),
218 this->pushAddr_.c_str(),
219 zmq_strerror(zmq_errno()));
220 }
221 bridgeLog_->debug("Pushed TCP frame with size %" PRIu32 " on %s", frame->getPayload(), this->pushAddr_.c_str());
222}
223
225void ris::TcpCore::runThread() {
226 ris::FramePtr frame;
227 uint64_t more;
228 size_t moreSize;
229 uint8_t* data;
230 uint32_t size;
231 uint32_t msgCnt;
232 uint32_t x;
233 zmq_msg_t msg[4];
234 uint16_t flags;
235 uint8_t chan;
236 uint8_t err;
237
238 bridgeLog_->logThreadId();
239
240 while (threadEn_) {
241 for (x = 0; x < 4; x++) zmq_msg_init(&(msg[x]));
242 msgCnt = 0;
243 x = 0;
244
245 // Get message
246 do {
247 // Get the message
248 if (zmq_recvmsg(this->zmqPull_, &(msg[x]), 0) > 0) {
249 if (x != 3) x++;
250 msgCnt++;
251
252 // Is there more data?
253 more = 0;
254 moreSize = 8;
255 zmq_getsockopt(this->zmqPull_, ZMQ_RCVMORE, &more, &moreSize);
256 } else {
257 more = 1;
258 }
259 } while (threadEn_ && more);
260
261 // Proper message received
262 if (threadEn_ && (msgCnt == 4)) {
263 // Check sizes
264 if ((zmq_msg_size(&(msg[0])) != 2) || (zmq_msg_size(&(msg[1])) != 1) || (zmq_msg_size(&(msg[2])) != 1)) {
265 bridgeLog_->warning(
266 "Bad message sizes. flags=%zu channel=%zu error=%zu",
267 zmq_msg_size(&(msg[0])),
268 zmq_msg_size(&(msg[1])),
269 zmq_msg_size(&(msg[2])));
270 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
271 continue; // while (1)
272 }
273
274 // Get fields
275 std::memcpy(&flags, zmq_msg_data(&(msg[0])), 2);
276 std::memcpy(&chan, zmq_msg_data(&(msg[1])), 1);
277 std::memcpy(&err, zmq_msg_data(&(msg[2])), 1);
278
279 // Get message info
280 data = reinterpret_cast<uint8_t*>(zmq_msg_data(&(msg[3])));
281 size = zmq_msg_size(&(msg[3]));
282
283 // Generate frame
284 frame = reqLocalFrame(size, false);
285 frame->setPayload(size);
286
287 // Copy data
288 ris::FrameIterator iter = frame->begin();
289 ris::toFrame(iter, size, data);
290
291 // Set frame meta data and send
292 frame->setFlags(flags);
293 frame->setChannel(chan);
294 frame->setError(err);
295
296 bridgeLog_->debug("Pulled frame with size %" PRIu32, frame->getPayload());
297 sendFrame(frame);
298 }
299
300 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
301 }
302}
303
304void ris::TcpCore::setup_python() {
305#ifndef NO_PYTHON
306
307 bp::class_<ris::TcpCore, ris::TcpCorePtr, bp::bases<ris::Master, ris::Slave>, boost::noncopyable>("TcpCore",
308 bp::no_init)
309 .def("close", &ris::TcpCore::close);
310
311 bp::implicitly_convertible<ris::TcpCorePtr, ris::MasterPtr>();
312 bp::implicitly_convertible<ris::TcpCorePtr, ris::SlavePtr>();
313#endif
314}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
Random-access byte iterator across a Frame payload.
static void fromFrame(rogue::interfaces::stream::FrameIterator &iter, uint32_t size, void *dst)
Copies bytes from a frame iterator to a destination pointer.
std::shared_ptr< rogue::interfaces::stream::TcpCore > TcpCorePtr
Shared pointer alias for TcpCore.
Definition TcpCore.h:141
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
static void toFrame(rogue::interfaces::stream::FrameIterator &iter, uint32_t size, void *src)
Copies bytes from a source pointer into a frame iterator.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110