rogue
Loading...
Searching...
No Matches
TcpServer.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <zmq.h>
23
24#include <cstring>
25#include <memory>
26#include <string>
27
28#include "rogue/GeneralError.h"
29#include "rogue/GilRelease.h"
30#include "rogue/Logging.h"
32
34
35#ifndef NO_PYTHON
36 #include <boost/python.hpp>
37namespace bp = boost::python;
38#endif
39
41rim::TcpServerPtr rim::TcpServer::create(std::string addr, uint16_t port) {
42 rim::TcpServerPtr r = std::make_shared<rim::TcpServer>(addr, port);
43 return (r);
44}
45
47rim::TcpServer::TcpServer(std::string addr, uint16_t port) {
48 std::string logstr;
49 uint32_t opt;
50
51 logstr = "memory.TcpServer.";
52 logstr.append(addr);
53 logstr.append(".");
54 logstr.append(std::to_string(port));
55
56 this->bridgeLog_ = rogue::Logging::create(logstr);
57
58 // Format address
59 this->respAddr_ = "tcp://";
60 this->respAddr_.append(addr);
61 this->respAddr_.append(":");
62 this->reqAddr_ = this->respAddr_;
63
64 this->zmqCtx_ = zmq_ctx_new();
65 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
66 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
67
68 this->respAddr_.append(std::to_string(static_cast<int64_t>(port + 1)));
69 this->reqAddr_.append(std::to_string(static_cast<int64_t>(port)));
70
71 this->bridgeLog_->debug("Creating response client port: %s", this->respAddr_.c_str());
72
73 opt = 0;
74 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
75 throw(rogue::GeneralError("memory::TcpServer::TcpServer", "Failed to set socket linger"));
76
77 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
78 throw(rogue::GeneralError("memory::TcpServer::TcpServer", "Failed to set socket linger"));
79
80 opt = 100;
81 if (zmq_setsockopt(this->zmqReq_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
82 throw(rogue::GeneralError("memory::TcpServer::TcpServer", "Failed to set socket receive timeout"));
83
84 if (zmq_bind(this->zmqResp_, this->respAddr_.c_str()) < 0)
85 throw(rogue::GeneralError::create("memory::TcpServer::TcpServer",
86 "Failed to bind server to port %" PRIu16
87 " at address %s, another process may be using this port",
88 port + 1,
89 addr.c_str()));
90
91 this->bridgeLog_->debug("Creating request client port: %s", this->reqAddr_.c_str());
92
93 if (zmq_bind(this->zmqReq_, this->reqAddr_.c_str()) < 0)
94 throw(rogue::GeneralError::create("memory::TcpServer::TcpServer",
95 "Failed to bind server to port %" PRIu16
96 " at address %s, another process may be using this port",
97 port,
98 addr.c_str()));
99
100 // Start rx thread
101 threadEn_ = true;
102 this->thread_ = new std::thread(&rim::TcpServer::runThread, this);
103
104 this->bridgeLog_->debug("TCP memory bridge ready. request=%s response=%s",
105 this->reqAddr_.c_str(),
106 this->respAddr_.c_str());
107
108 // Set a thread name
109#ifndef __MACH__
110 pthread_setname_np(thread_->native_handle(), "TcpServer");
111#endif
112}
113
115rim::TcpServer::~TcpServer() {
116 this->stop();
117}
118
119void rim::TcpServer::close() {
120 this->stop();
121}
122
123void rim::TcpServer::start() {
124 // The bridge is fully bound and the worker thread is already running by
125 // the time the constructor returns. This hook exists for managed-lifecycle
126 // symmetry with TcpClient.
127}
128
129void rim::TcpServer::stop() {
130 if (threadEn_) {
131 rogue::GilRelease noGil;
132 threadEn_ = false;
133 thread_->join();
134 this->bridgeLog_->debug("Stopping TCP memory bridge. request=%s response=%s",
135 this->reqAddr_.c_str(),
136 this->respAddr_.c_str());
137 zmq_close(this->zmqResp_);
138 zmq_close(this->zmqReq_);
139 zmq_ctx_destroy(this->zmqCtx_);
140 }
141}
142
144void rim::TcpServer::runThread() {
145 uint8_t* data;
146 uint64_t more;
147 size_t moreSize;
148 uint32_t x;
149 uint32_t msgCnt;
150 zmq_msg_t msg[6];
151 uint32_t id;
152 uint64_t addr;
153 uint32_t size;
154 uint32_t type;
155 std::string result;
156
157 bridgeLog_->logThreadId();
158
159 while (threadEn_) {
160 for (x = 0; x < 6; x++) zmq_msg_init(&(msg[x]));
161 msgCnt = 0;
162 x = 0;
163
164 // Get message
165 do {
166 // Get the message
167 if (zmq_recvmsg(this->zmqReq_, &(msg[x]), 0) >= 0) {
168 if (x != 4) x++;
169 msgCnt++;
170
171 // Is there more data?
172 more = 0;
173 moreSize = 8;
174 zmq_getsockopt(this->zmqReq_, ZMQ_RCVMORE, &more, &moreSize);
175 } else {
176 more = 1;
177 }
178 } while (threadEn_ && more);
179
180 // Proper message received
181 if (threadEn_ && (msgCnt == 4 || msgCnt == 5)) {
182 // Check sizes
183 if ((zmq_msg_size(&(msg[0])) != 4) || (zmq_msg_size(&(msg[1])) != 8) || (zmq_msg_size(&(msg[2])) != 4) ||
184 (zmq_msg_size(&(msg[3])) != 4)) {
185 bridgeLog_->warning(
186 "Bad message sizes. id=%zu addr=%zu size=%zu type=%zu",
187 zmq_msg_size(&(msg[0])),
188 zmq_msg_size(&(msg[1])),
189 zmq_msg_size(&(msg[2])),
190 zmq_msg_size(&(msg[3])));
191 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
192 continue; // while (1)
193 }
194
195 // Get return fields
196 std::memcpy(&id, zmq_msg_data(&(msg[0])), 4);
197 std::memcpy(&addr, zmq_msg_data(&(msg[1])), 8);
198 std::memcpy(&size, zmq_msg_data(&(msg[2])), 4);
199 std::memcpy(&type, zmq_msg_data(&(msg[3])), 4);
200
201 // Bridge readiness probe is handled locally.
202 if (type == rim::TcpBridgeProbe) {
203 if ((msgCnt != 4) || (size != 0)) {
204 bridgeLog_->warning("Malformed readiness probe. Id=%" PRIu32, id);
205 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
206 continue; // while (1)
207 }
208 zmq_msg_init_size(&(msg[4]), 0);
209 result = "OK";
210
211 // Write data is expected
212 } else if ((type == rim::Write) || (type == rim::Post)) {
213 if ((msgCnt != 5) || (zmq_msg_size(&(msg[4])) != size)) {
214 bridgeLog_->warning("Transaction write data error. Id=%" PRIu32, id);
215 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
216 continue; // while (1)
217 }
218 } else {
219 zmq_msg_init_size(&(msg[4]), size);
220 }
221
222 if (type != rim::TcpBridgeProbe) {
223 // Data pointer
224 data = reinterpret_cast<uint8_t*>(zmq_msg_data(&(msg[4])));
225
226 bridgeLog_->debug("Starting transaction id=%" PRIu32 ", addr=0x%" PRIx64 ", size=%" PRIu32
227 ", type=%" PRIu32,
228 id,
229 addr,
230 size,
231 type);
232
233 // Execute transaction and wait for result
234 this->clearError();
235 reqTransaction(addr, size, data, type);
236 waitTransaction(0);
237 result = getError();
238
239 bridgeLog_->debug("Done transaction id=%" PRIu32 ", addr=0x%" PRIx64 ", size=%" PRIu32
240 ", type=%" PRIu32 ", result=(%s)",
241 id,
242 addr,
243 size,
244 type,
245 result.c_str());
246 }
247
248 // Result message, at least one char needs to be sent
249 if (result.length() == 0) result = "OK";
250 zmq_msg_init_size(&(msg[5]), result.length());
251 std::memcpy(zmq_msg_data(&(msg[5])), result.c_str(), result.length());
252
253 // Send message
254 for (x = 0; x < 6; x++) zmq_sendmsg(this->zmqResp_, &(msg[x]), (x == 5) ? 0 : ZMQ_SNDMORE);
255 } else {
256 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
257 }
258 }
259}
260
261void rim::TcpServer::setup_python() {
262#ifndef NO_PYTHON
263
264 bp::class_<rim::TcpServer, rim::TcpServerPtr, bp::bases<rim::Master>, boost::noncopyable>(
265 "TcpServer",
266 bp::init<std::string, uint16_t>())
267 .def("close", &rim::TcpServer::close)
268 .def("_start", &rim::TcpServer::start)
269 .def("_stop", &rim::TcpServer::stop);
270
271 bp::implicitly_convertible<rim::TcpServerPtr, rim::MasterPtr>();
272#endif
273}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
std::shared_ptr< rogue::interfaces::memory::TcpServer > TcpServerPtr
Shared pointer alias for TcpServer.
Definition TcpServer.h:150
static const uint32_t TcpBridgeProbe
Internal TCP bridge readiness probe transaction type.
Definition Constants.h:67
static const uint32_t Write
Memory write transaction type.
Definition Constants.h:43
static const uint32_t Post
Memory posted write transaction type.
Definition Constants.h:50