rogue
Loading...
Searching...
No Matches
TcpServer.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <zmq.h>
23
24#include <cstring>
25#include <memory>
26#include <string>
27
28#include "rogue/GeneralError.h"
29#include "rogue/GilRelease.h"
30#include "rogue/Logging.h"
32
34
35#ifndef NO_PYTHON
36 #include <boost/python.hpp>
37namespace bp = boost::python;
38#endif
39
41rim::TcpServerPtr rim::TcpServer::create(std::string addr, uint16_t port) {
42 rim::TcpServerPtr r = std::make_shared<rim::TcpServer>(addr, port);
43 return (r);
44}
45
47rim::TcpServer::TcpServer(std::string addr, uint16_t port) {
48 std::string logstr;
49 uint32_t opt;
50
51 logstr = "memory.TcpServer.";
52 logstr.append(addr);
53 logstr.append(".");
54 logstr.append(std::to_string(port));
55
56 this->bridgeLog_ = rogue::Logging::create(logstr);
57
58 // Format address
59 this->respAddr_ = "tcp://";
60 this->respAddr_.append(addr);
61 this->respAddr_.append(":");
62 this->reqAddr_ = this->respAddr_;
63
64 this->zmqCtx_ = zmq_ctx_new();
65 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
66 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
67
68 this->respAddr_.append(std::to_string(static_cast<int64_t>(port + 1)));
69 this->reqAddr_.append(std::to_string(static_cast<int64_t>(port)));
70
71 this->bridgeLog_->debug("Creating response client port: %s", this->respAddr_.c_str());
72
73 opt = 0;
74 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
75 throw(rogue::GeneralError("memory::TcpServer::TcpServer", "Failed to set socket linger"));
76
77 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
78 throw(rogue::GeneralError("memory::TcpServer::TcpServer", "Failed to set socket linger"));
79
80 opt = 100;
81 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
82 throw(rogue::GeneralError("memory::TcpServer::TcpServer", "Failed to set socket receive timeout"));
83
84 if (zmq_bind(this->zmqResp_, this->respAddr_.c_str()) < 0)
85 throw(rogue::GeneralError::create("memory::TcpServer::TcpServer",
86 "Failed to bind server to port %" PRIu16
87 " at address %s, another process may be using this port",
88 port + 1,
89 addr.c_str()));
90
91 this->bridgeLog_->debug("Creating request client port: %s", this->reqAddr_.c_str());
92
93 if (zmq_bind(this->zmqReq_, this->reqAddr_.c_str()) < 0)
94 throw(rogue::GeneralError::create("memory::TcpServer::TcpServer",
95 "Failed to bind server to port %" PRIu16
96 " at address %s, another process may be using this port",
97 port,
98 addr.c_str()));
99
100 // Start rx thread
101 threadEn_ = true;
102 this->thread_ = new std::thread(&rim::TcpServer::runThread, this);
103
104 // Set a thread name
105#ifndef __MACH__
106 pthread_setname_np(thread_->native_handle(), "TcpServer");
107#endif
108}
109
111rim::TcpServer::~TcpServer() {
112 this->stop();
113}
114
115void rim::TcpServer::close() {
116 this->stop();
117}
118
119void rim::TcpServer::start() {
120 // The bridge is fully bound and the worker thread is already running by
121 // the time the constructor returns. This hook exists for managed-lifecycle
122 // symmetry with TcpClient.
123}
124
125void rim::TcpServer::stop() {
126 if (threadEn_) {
127 rogue::GilRelease noGil;
128 threadEn_ = false;
129 thread_->join();
130 zmq_close(this->zmqResp_);
131 zmq_close(this->zmqReq_);
132 zmq_ctx_destroy(this->zmqCtx_);
133 }
134}
135
137void rim::TcpServer::runThread() {
138 uint8_t* data;
139 uint64_t more;
140 size_t moreSize;
141 uint32_t x;
142 uint32_t msgCnt;
143 zmq_msg_t msg[6];
144 uint32_t id;
145 uint64_t addr;
146 uint32_t size;
147 uint32_t type;
148 std::string result;
149
150 bridgeLog_->logThreadId();
151
152 while (threadEn_) {
153 for (x = 0; x < 6; x++) zmq_msg_init(&(msg[x]));
154 msgCnt = 0;
155 x = 0;
156
157 // Get message
158 do {
159 // Get the message
160 if (zmq_recvmsg(this->zmqReq_, &(msg[x]), 0) >= 0) {
161 if (x != 4) x++;
162 msgCnt++;
163
164 // Is there more data?
165 more = 0;
166 moreSize = 8;
167 zmq_getsockopt(this->zmqReq_, ZMQ_RCVMORE, &more, &moreSize);
168 } else {
169 more = 1;
170 }
171 } while (threadEn_ && more);
172
173 // Proper message received
174 if (threadEn_ && (msgCnt == 4 || msgCnt == 5)) {
175 // Check sizes
176 if ((zmq_msg_size(&(msg[0])) != 4) || (zmq_msg_size(&(msg[1])) != 8) || (zmq_msg_size(&(msg[2])) != 4) ||
177 (zmq_msg_size(&(msg[3])) != 4)) {
178 bridgeLog_->warning("Bad message sizes");
179 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
180 continue; // while (1)
181 }
182
183 // Get return fields
184 std::memcpy(&id, zmq_msg_data(&(msg[0])), 4);
185 std::memcpy(&addr, zmq_msg_data(&(msg[1])), 8);
186 std::memcpy(&size, zmq_msg_data(&(msg[2])), 4);
187 std::memcpy(&type, zmq_msg_data(&(msg[3])), 4);
188
189 // Bridge readiness probe is handled locally.
190 if (type == rim::TcpBridgeProbe) {
191 if ((msgCnt != 4) || (size != 0)) {
192 bridgeLog_->warning("Malformed readiness probe. Id=%" PRIu32, id);
193 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
194 continue; // while (1)
195 }
196 zmq_msg_init_size(&(msg[4]), 0);
197 result = "OK";
198
199 // Write data is expected
200 } else if ((type == rim::Write) || (type == rim::Post)) {
201 if ((msgCnt != 5) || (zmq_msg_size(&(msg[4])) != size)) {
202 bridgeLog_->warning("Transaction write data error. Id=%" PRIu32, id);
203 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
204 continue; // while (1)
205 }
206 } else {
207 zmq_msg_init_size(&(msg[4]), size);
208 }
209
210 if (type != rim::TcpBridgeProbe) {
211 // Data pointer
212 data = reinterpret_cast<uint8_t*>(zmq_msg_data(&(msg[4])));
213
214 bridgeLog_->debug("Starting transaction id=%" PRIu32 ", addr=0x%" PRIx64 ", size=%" PRIu32
215 ", type=%" PRIu32,
216 id,
217 addr,
218 size,
219 type);
220
221 // Execute transaction and wait for result
222 this->clearError();
223 reqTransaction(addr, size, data, type);
224 waitTransaction(0);
225 result = getError();
226
227 bridgeLog_->debug("Done transaction id=%" PRIu32 ", addr=0x%" PRIx64 ", size=%" PRIu32
228 ", type=%" PRIu32 ", result=(%s)",
229 id,
230 addr,
231 size,
232 type,
233 result.c_str());
234 }
235
236 // Result message, at least one char needs to be sent
237 if (result.length() == 0) result = "OK";
238 zmq_msg_init_size(&(msg[5]), result.length());
239 std::memcpy(zmq_msg_data(&(msg[5])), result.c_str(), result.length());
240
241 // Send message
242 for (x = 0; x < 6; x++) zmq_sendmsg(this->zmqResp_, &(msg[x]), (x == 5) ? 0 : ZMQ_SNDMORE);
243 } else {
244 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
245 }
246 }
247}
248
249void rim::TcpServer::setup_python() {
250#ifndef NO_PYTHON
251
252 bp::class_<rim::TcpServer, rim::TcpServerPtr, bp::bases<rim::Master>, boost::noncopyable>(
253 "TcpServer",
254 bp::init<std::string, uint16_t>())
255 .def("close", &rim::TcpServer::close)
256 .def("_start", &rim::TcpServer::start)
257 .def("_stop", &rim::TcpServer::stop);
258
259 bp::implicitly_convertible<rim::TcpServerPtr, rim::MasterPtr>();
260#endif
261}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:60
std::shared_ptr< rogue::interfaces::memory::TcpServer > TcpServerPtr
Shared pointer alias for TcpServer.
Definition TcpServer.h:150
static const uint32_t TcpBridgeProbe
Internal TCP bridge readiness probe transaction type.
Definition Constants.h:67
static const uint32_t Write
Memory write transaction type.
Definition Constants.h:43
static const uint32_t Post
Memory posted write transaction type.
Definition Constants.h:50