rogue
Loading...
Searching...
No Matches
TcpCore.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <zmq.h>
23
24#include <cstring>
25#include <memory>
26#include <string>
27
28#include "rogue/GeneralError.h"
29#include "rogue/GilRelease.h"
30#include "rogue/Logging.h"
35
37
38#ifndef NO_PYTHON
39 #include <boost/python.hpp>
40namespace bp = boost::python;
41#endif
42
44ris::TcpCorePtr ris::TcpCore::create(const std::string& addr, uint16_t port, bool server) {
45 ris::TcpCorePtr r = std::make_shared<ris::TcpCore>(addr, port, server);
46 return (r);
47}
48
50ris::TcpCore::TcpCore(const std::string& addr, uint16_t port, bool server) {
51 int32_t opt;
52 std::string logstr;
53
54 logstr = "stream.TcpCore.";
55 logstr.append(addr);
56 logstr.append(".");
57 if (server)
58 logstr.append("Server.");
59 else
60 logstr.append("Client.");
61 logstr.append(std::to_string(port));
62
63 this->bridgeLog_ = rogue::Logging::create(logstr);
64
65 // Format address
66 this->pullAddr_ = "tcp://";
67 this->pullAddr_.append(addr);
68 this->pullAddr_.append(":");
69 this->pushAddr_ = this->pullAddr_;
70
71 this->zmqCtx_ = zmq_ctx_new();
72 this->zmqPull_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
73 this->zmqPush_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
74
75 // Don't buffer when no connection
76 opt = 1;
77 if (zmq_setsockopt(this->zmqPush_, ZMQ_IMMEDIATE, &opt, sizeof(int32_t)) != 0)
78 throw(rogue::GeneralError("stream::TcpCore::TcpCore", "Failed to set socket immediate"));
79
80 opt = 0;
81 if (zmq_setsockopt(this->zmqPush_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
82 throw(rogue::GeneralError("stream::TcpCore::TcpCore", "Failed to set socket linger"));
83
84 if (zmq_setsockopt(this->zmqPull_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
85 throw(rogue::GeneralError("stream::TcpCore::TcpCore", "Failed to set socket linger"));
86
87 opt = 100;
88 if (zmq_setsockopt(this->zmqPull_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
89 throw(rogue::GeneralError("stream::TcpCore::TcpCore", "Failed to set socket receive timeout"));
90
91 // Server mode
92 if (server) {
93 this->pullAddr_.append(std::to_string(static_cast<int64_t>(port)));
94 this->pushAddr_.append(std::to_string(static_cast<int64_t>(port + 1)));
95
96 this->bridgeLog_->debug("Creating pull server port: %s", this->pullAddr_.c_str());
97
98 if (zmq_bind(this->zmqPull_, this->pullAddr_.c_str()) < 0)
99 throw(rogue::GeneralError::create("stream::TcpCore::TcpCore",
100 "Failed to bind server to port %" PRIu16
101 " at address %s, another process may be using this port",
102 port,
103 addr.c_str()));
104
105 this->bridgeLog_->debug("Creating push server port: %s", this->pushAddr_.c_str());
106
107 if (zmq_bind(this->zmqPush_, this->pushAddr_.c_str()) < 0)
108 throw(rogue::GeneralError::create("stream::TcpCore::TcpCore",
109 "Failed to bind server to port %" PRIu16
110 " at address %s, another process may be using this port",
111 port + 1,
112 addr.c_str()));
113
114 // Client mode
115 } else {
116 this->pullAddr_.append(std::to_string(static_cast<int64_t>(port + 1)));
117 this->pushAddr_.append(std::to_string(static_cast<int64_t>(port)));
118
119 this->bridgeLog_->debug("Creating pull client port: %s", this->pullAddr_.c_str());
120
121 if (zmq_connect(this->zmqPull_, this->pullAddr_.c_str()) < 0)
122 throw(rogue::GeneralError::create("stream::TcpCore::TcpCore",
123 "Failed to connect to remote port %" PRIu16 " at address %s",
124 port + 1,
125 addr.c_str()));
126
127 this->bridgeLog_->debug("Creating push client port: %s", this->pushAddr_.c_str());
128
129 if (zmq_connect(this->zmqPush_, this->pushAddr_.c_str()) < 0)
130 throw(rogue::GeneralError::create("stream::TcpCore::TcpCore",
131 "Failed to connect to remote port %" PRIu16 " at address %s",
132 port,
133 addr.c_str()));
134 }
135
136 // Start rx thread
137 threadEn_ = true;
138 this->thread_ = new std::thread(&ris::TcpCore::runThread, this);
139
140 // Set a thread name
141#ifndef __MACH__
142 pthread_setname_np(thread_->native_handle(), "TcpCore");
143#endif
144}
145
147ris::TcpCore::~TcpCore() {
148 this->stop();
149}
150
151// deprecated
152void ris::TcpCore::close() {
153 this->stop();
154}
155
156void ris::TcpCore::stop() {
157 if (threadEn_) {
158 rogue::GilRelease noGil;
159 threadEn_ = false;
160 thread_->join();
161 zmq_close(this->zmqPull_);
162 zmq_close(this->zmqPush_);
163 zmq_ctx_destroy(this->zmqCtx_);
164 }
165}
166
168void ris::TcpCore::acceptFrame(ris::FramePtr frame) {
169 uint32_t x;
170 uint8_t* data;
171 uint16_t flags;
172 uint8_t chan;
173 uint8_t err;
174 zmq_msg_t msg[4];
175
176 rogue::GilRelease noGil;
177 ris::FrameLockPtr frLock = frame->lock();
178 std::lock_guard<std::mutex> lock(bridgeMtx_);
179
180 if ((zmq_msg_init_size(&(msg[0]), 2) < 0) || // Flags
181 (zmq_msg_init_size(&(msg[1]), 1) < 0) || // Channel
182 (zmq_msg_init_size(&(msg[2]), 1) < 0)) { // Error
183 bridgeLog_->warning("Failed to init message header");
184 return;
185 }
186
187 if (zmq_msg_init_size(&(msg[3]), frame->getPayload()) < 0) {
188 bridgeLog_->warning("Failed to init message with size %" PRIu32, frame->getPayload());
189 return;
190 }
191
192 flags = frame->getFlags();
193 std::memcpy(zmq_msg_data(&(msg[0])), &flags, 2);
194
195 chan = frame->getChannel();
196 std::memcpy(zmq_msg_data(&(msg[1])), &chan, 1);
197
198 err = frame->getError();
199 std::memcpy(zmq_msg_data(&(msg[2])), &err, 1);
200
201 // Copy data
202 ris::FrameIterator iter = frame->begin();
203 data = reinterpret_cast<uint8_t*>(zmq_msg_data(&(msg[3])));
204 ris::fromFrame(iter, frame->getPayload(), data);
205
206 // Send data
207 for (x = 0; x < 4; x++) {
208 if (zmq_sendmsg(this->zmqPush_, &(msg[x]), (x == 3) ? 0 : ZMQ_SNDMORE) < 0)
209 bridgeLog_->warning("Failed to push message with size %" PRIu32 " on %s",
210 frame->getPayload(),
211 this->pushAddr_.c_str());
212 }
213 bridgeLog_->debug("Pushed TCP frame with size %" PRIu32 " on %s", frame->getPayload(), this->pushAddr_.c_str());
214}
215
217void ris::TcpCore::runThread() {
218 ris::FramePtr frame;
219 uint64_t more;
220 size_t moreSize;
221 uint8_t* data;
222 uint32_t size;
223 uint32_t msgCnt;
224 uint32_t x;
225 zmq_msg_t msg[4];
226 uint16_t flags;
227 uint8_t chan;
228 uint8_t err;
229
230 bridgeLog_->logThreadId();
231
232 while (threadEn_) {
233 for (x = 0; x < 4; x++) zmq_msg_init(&(msg[x]));
234 msgCnt = 0;
235 x = 0;
236
237 // Get message
238 do {
239 // Get the message
240 if (zmq_recvmsg(this->zmqPull_, &(msg[x]), 0) > 0) {
241 if (x != 3) x++;
242 msgCnt++;
243
244 // Is there more data?
245 more = 0;
246 moreSize = 8;
247 zmq_getsockopt(this->zmqPull_, ZMQ_RCVMORE, &more, &moreSize);
248 } else {
249 more = 1;
250 }
251 } while (threadEn_ && more);
252
253 // Proper message received
254 if (threadEn_ && (msgCnt == 4)) {
255 // Check sizes
256 if ((zmq_msg_size(&(msg[0])) != 2) || (zmq_msg_size(&(msg[1])) != 1) || (zmq_msg_size(&(msg[2])) != 1)) {
257 bridgeLog_->warning("Bad message sizes");
258 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
259 continue; // while (1)
260 }
261
262 // Get fields
263 std::memcpy(&flags, zmq_msg_data(&(msg[0])), 2);
264 std::memcpy(&chan, zmq_msg_data(&(msg[1])), 1);
265 std::memcpy(&err, zmq_msg_data(&(msg[2])), 1);
266
267 // Get message info
268 data = reinterpret_cast<uint8_t*>(zmq_msg_data(&(msg[3])));
269 size = zmq_msg_size(&(msg[3]));
270
271 // Generate frame
272 frame = reqLocalFrame(size, false);
273 frame->setPayload(size);
274
275 // Copy data
276 ris::FrameIterator iter = frame->begin();
277 ris::toFrame(iter, size, data);
278
279 // Set frame meta data and send
280 frame->setFlags(flags);
281 frame->setChannel(chan);
282 frame->setError(err);
283
284 bridgeLog_->debug("Pulled frame with size %" PRIu32, frame->getPayload());
285 sendFrame(frame);
286 }
287
288 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
289 }
290}
291
292void ris::TcpCore::setup_python() {
293#ifndef NO_PYTHON
294
295 bp::class_<ris::TcpCore, ris::TcpCorePtr, bp::bases<ris::Master, ris::Slave>, boost::noncopyable>("TcpCore",
296 bp::no_init)
297 .def("close", &ris::TcpCore::close);
298
299 bp::implicitly_convertible<ris::TcpCorePtr, ris::MasterPtr>();
300 bp::implicitly_convertible<ris::TcpCorePtr, ris::SlavePtr>();
301#endif
302}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:60
Random-access byte iterator across a Frame payload.
static void fromFrame(rogue::interfaces::stream::FrameIterator &iter, uint32_t size, void *dst)
Copies bytes from a frame iterator to a destination pointer.
std::shared_ptr< rogue::interfaces::stream::TcpCore > TcpCorePtr
Shared pointer alias for TcpCore.
Definition TcpCore.h:141
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
static void toFrame(rogue::interfaces::stream::FrameIterator &iter, uint32_t size, void *src)
Copies bytes from a source pointer into a frame iterator.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110