rogue
Loading...
Searching...
No Matches
TcpClient.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <zmq.h>
23
24#include <cstring>
25#include <memory>
26#include <string>
27#include <chrono>
28
29#include "rogue/GeneralError.h"
30#include "rogue/GilRelease.h"
31#include "rogue/Logging.h"
35
37
38#ifndef NO_PYTHON
39 #include <boost/python.hpp>
40namespace bp = boost::python;
41#endif
42
43namespace {
44static constexpr double DefaultReadyTimeout = 10.0;
45static constexpr double DefaultReadyPeriod = 0.1;
46} // namespace
47
49rim::TcpClientPtr rim::TcpClient::create(std::string addr, uint16_t port, bool waitReady) {
50 rim::TcpClientPtr r = std::make_shared<rim::TcpClient>(addr, port, waitReady);
51 return (r);
52}
53
55rim::TcpClient::TcpClient(std::string addr, uint16_t port, bool waitReady) : rim::Slave(4, 0xFFFFFFFF) {
56 int32_t opt;
57 std::string logstr;
58
59 logstr = "memory.TcpClient.";
60 logstr.append(addr);
61 logstr.append(".");
62 logstr.append(std::to_string(port));
63
64 this->bridgeLog_ = rogue::Logging::create(logstr);
65 this->probeSeq_ = 0;
66 this->probeId_ = 0;
67 this->probeDone_ = false;
68 this->probeResult_.clear();
69 this->waitReadyOnStart_ = waitReady;
70
71 // Format address
72 this->respAddr_ = "tcp://";
73 this->respAddr_.append(addr);
74 this->respAddr_.append(":");
75 this->reqAddr_ = this->respAddr_;
76
77 this->zmqCtx_ = zmq_ctx_new();
78 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
79 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
80
81 // Don't buffer when no connection
82 opt = 1;
83 if (zmq_setsockopt(this->zmqReq_, ZMQ_IMMEDIATE, &opt, sizeof(int32_t)) != 0)
84 throw(rogue::GeneralError("memory::TcpClient::TcpClient", "Failed to set socket immediate"));
85
86 this->respAddr_.append(std::to_string(static_cast<int64_t>(port + 1)));
87 this->reqAddr_.append(std::to_string(static_cast<int64_t>(port)));
88
89 this->bridgeLog_->debug("Creating response client port: %s", this->respAddr_.c_str());
90
91 opt = 0;
92 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
93 throw(rogue::GeneralError("memory::TcpClient::TcpClient", "Failed to set socket linger"));
94
95 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
96 throw(rogue::GeneralError("memory::TcpClient::TcpClient", "Failed to set socket linger"));
97
98 opt = 100;
99 if (zmq_setsockopt(this->zmqResp_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
100 throw(rogue::GeneralError("memory::TcpClient::TcpClient", "Failed to set socket receive timeout"));
101
102 if (zmq_connect(this->zmqResp_, this->respAddr_.c_str()) < 0)
103 throw(rogue::GeneralError::create("memory::TcpClient::TcpClient",
104 "Failed to connect to remote port %" PRIu16 " at address %s",
105 port + 1,
106 addr.c_str()));
107
108 this->bridgeLog_->debug("Creating request client port: %s", this->reqAddr_.c_str());
109
110 if (zmq_connect(this->zmqReq_, this->reqAddr_.c_str()) < 0)
111 throw(rogue::GeneralError::create("memory::TcpClient::TcpClient",
112 "Failed to connect to remote port %" PRIu16 " at address %s",
113 port,
114 addr.c_str()));
115
116 // Start rx thread
117 threadEn_ = true;
118 this->thread_ = new std::thread(&rim::TcpClient::runThread, this);
119
120 // Set a thread name
121#ifndef __MACH__
122 pthread_setname_np(thread_->native_handle(), "TcpClient");
123#endif
124}
125
127rim::TcpClient::~TcpClient() {
128 this->stop();
129}
130
131// deprecated
132void rim::TcpClient::close() {
133 this->stop();
134}
135
136void rim::TcpClient::stop() {
137 if (threadEn_) {
138 rogue::GilRelease noGil;
139 threadEn_ = false;
140 thread_->join();
141 zmq_close(this->zmqResp_);
142 zmq_close(this->zmqReq_);
143 zmq_ctx_destroy(this->zmqCtx_);
144 }
145}
146
147bool rim::TcpClient::waitReady(double timeout, double period) {
148 uint32_t id;
149 uint64_t addr = 0;
150 uint32_t size = 0;
151 uint32_t type = rim::TcpBridgeProbe;
152 zmq_msg_t msg[4];
153 auto deadline = std::chrono::steady_clock::now() + std::chrono::duration<double>(timeout);
154
155 // Retry the probe until the overall timeout expires. This verifies the
156 // full request/response bridge path, not just local socket setup.
157 while (std::chrono::steady_clock::now() < deadline) {
158 {
159 std::lock_guard<std::mutex> probeLock(probeMtx_);
160 // Track the current probe ID so the response thread can match the
161 // incoming bridge reply back to this wait operation.
162 probeId_ = ++probeSeq_;
163 probeDone_ = false;
164 probeResult_.clear();
165 id = probeId_;
166 }
167
168 // Build a minimal internal bridge-control request. The server handles
169 // this locally and responds with the normal six-part completion
170 // message, but without touching downstream memory.
171 zmq_msg_init_size(&(msg[0]), 4);
172 std::memcpy(zmq_msg_data(&(msg[0])), &id, 4);
173 zmq_msg_init_size(&(msg[1]), 8);
174 std::memcpy(zmq_msg_data(&(msg[1])), &addr, 8);
175 zmq_msg_init_size(&(msg[2]), 4);
176 std::memcpy(zmq_msg_data(&(msg[2])), &size, 4);
177 zmq_msg_init_size(&(msg[3]), 4);
178 std::memcpy(zmq_msg_data(&(msg[3])), &type, 4);
179
180 {
181 std::lock_guard<std::mutex> block(bridgeMtx_);
182 // Send through the same request socket and multipart framing used
183 // by normal memory transactions so this exercises the real bridge
184 // path end-to-end.
185 for (uint32_t x = 0; x < 4; ++x) {
186 if (zmq_sendmsg(this->zmqReq_, &(msg[x]), (x == 3 ? 0 : ZMQ_SNDMORE) | ZMQ_DONTWAIT) < 0) {
187 bridgeLog_->debug("Readiness probe send failed for port %s", this->reqAddr_.c_str());
188 break;
189 }
190 }
191 }
192
193 for (uint32_t x = 0; x < 4; ++x) zmq_msg_close(&(msg[x]));
194
195 std::unique_lock<std::mutex> probeLock(probeMtx_);
196 // The receive thread sets probeDone_/probeResult_ when it sees a probe
197 // response with the matching ID. We only wait for one retry period
198 // here so we can re-send if the server is not up yet.
199 probeCond_.wait_for(probeLock, std::chrono::duration<double>(period), [&]() { return probeDone_ && probeId_ == id; });
200
201 if (probeDone_ && probeId_ == id && probeResult_ == "OK") {
202 bridgeLog_->debug("Readiness probe succeeded for port %s", this->reqAddr_.c_str());
203 return true;
204 }
205 }
206
207 bridgeLog_->warning("Timed out waiting for bridge readiness on port %s", this->reqAddr_.c_str());
208 return false;
209}
210
211void rim::TcpClient::start() {
212 if (!waitReadyOnStart_) return;
213
214 if (!waitReady(DefaultReadyTimeout, DefaultReadyPeriod)) {
215 throw(rogue::GeneralError::create("memory::TcpClient::start",
216 "Timed out waiting for remote TcpServer readiness on %s",
217 this->reqAddr_.c_str()));
218 }
219}
220
222void rim::TcpClient::doTransaction(rim::TransactionPtr tran) {
223 uint32_t x;
224 uint32_t msgCnt;
225 zmq_msg_t msg[5];
226 uint32_t id;
227 uint64_t addr;
228 uint32_t size;
229 uint32_t type;
230
231 rogue::GilRelease noGil;
232 std::lock_guard<std::mutex> block(bridgeMtx_);
233 rim::TransactionLockPtr lock = tran->lock();
234
235 // ID message
236 id = tran->id();
237 zmq_msg_init_size(&(msg[0]), 4);
238 std::memcpy(zmq_msg_data(&(msg[0])), &id, 4);
239
240 // Addr message
241 addr = tran->address();
242 zmq_msg_init_size(&(msg[1]), 8);
243 std::memcpy(zmq_msg_data(&(msg[1])), &addr, 8);
244
245 // Size message
246 size = tran->size();
247 zmq_msg_init_size(&(msg[2]), 4);
248 std::memcpy(zmq_msg_data(&(msg[2])), &size, 4);
249
250 // Type message
251 type = tran->type();
252 zmq_msg_init_size(&(msg[3]), 4);
253 std::memcpy(zmq_msg_data(&(msg[3])), &type, 4);
254
255 // Write transaction
256 if (type == rim::Write || type == rim::Post) {
257 msgCnt = 5;
258 zmq_msg_init_size(&(msg[4]), size);
259 std::memcpy(zmq_msg_data(&(msg[4])), tran->begin(), size);
260
261 // Read transaction
262 } else {
263 msgCnt = 4;
264 }
265
266 bridgeLog_->debug("Requested transaction id=%" PRIu32 ", addr=0x%" PRIx64 ", size=%" PRIu32 ", type=%" PRIu32
267 ", cnt=%" PRIu32 ", port: %s",
268 id,
269 addr,
270 size,
271 type,
272 msgCnt,
273 this->reqAddr_.c_str());
274
275 // Add transaction
276 if (type == rim::Post)
277 tran->done();
278 else
279 addTransaction(tran);
280
281 // Send message
282 for (x = 0; x < msgCnt; x++) {
283 if (zmq_sendmsg(this->zmqReq_, &(msg[x]), ((x == (msgCnt - 1) ? 0 : ZMQ_SNDMORE)) | ZMQ_DONTWAIT) < 0) {
284 bridgeLog_->warning("Failed to send transaction %" PRIu32 ", msg %" PRIu32, id, x);
285 }
286 }
287}
288
290void rim::TcpClient::runThread() {
292 bool err;
293 uint64_t more;
294 size_t moreSize;
295 uint32_t x;
296 uint32_t msgCnt;
297 zmq_msg_t msg[6];
298 uint32_t id;
299 uint64_t addr;
300 uint32_t size;
301 uint32_t type;
302 char result[1000];
303
304 bridgeLog_->logThreadId();
305
306 while (threadEn_) {
307 for (x = 0; x < 6; x++) zmq_msg_init(&(msg[x]));
308 msgCnt = 0;
309 x = 0;
310
311 // Get message
312 do {
313 // Get the message
314 if (zmq_recvmsg(this->zmqResp_, &(msg[x]), 0) >= 0) {
315 if (x != 5) x++;
316 msgCnt++;
317
318 // Is there more data?
319 more = 0;
320 moreSize = 8;
321 zmq_getsockopt(this->zmqResp_, ZMQ_RCVMORE, &more, &moreSize);
322 } else {
323 more = 1;
324 }
325 } while (threadEn_ && more);
326
327 // Proper message received
328 if (threadEn_ && (msgCnt == 6)) {
329 // Check sizes
330 if ((zmq_msg_size(&(msg[0])) != 4) || (zmq_msg_size(&(msg[1])) != 8) || (zmq_msg_size(&(msg[2])) != 4) ||
331 (zmq_msg_size(&(msg[3])) != 4) || (zmq_msg_size(&(msg[5])) > 999)) {
332 bridgeLog_->warning("Bad message sizes");
333 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
334 continue; // while (1)
335 }
336
337 // Get return fields
338 std::memcpy(&id, zmq_msg_data(&(msg[0])), 4);
339 std::memcpy(&addr, zmq_msg_data(&(msg[1])), 8);
340 std::memcpy(&size, zmq_msg_data(&(msg[2])), 4);
341 std::memcpy(&type, zmq_msg_data(&(msg[3])), 4);
342
343 memset(result, 0, 1000);
344 std::strncpy(result, reinterpret_cast<char*>(zmq_msg_data(&(msg[5]))), zmq_msg_size(&(msg[5])));
345
346 if (type == rim::TcpBridgeProbe) {
347 std::lock_guard<std::mutex> probeLock(probeMtx_);
348 if (id == probeId_) {
349 probeResult_ = result;
350 probeDone_ = true;
351 probeCond_.notify_all();
352 }
353 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
354 continue; // while (1)
355 }
356
357 // Find Transaction
358 if ((tran = getTransaction(id)) == NULL) {
359 bridgeLog_->warning("Failed to find transaction id=%" PRIu32, id);
360 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
361 continue; // while (1)
362 }
363
364 // Lock transaction
365 rim::TransactionLockPtr lock = tran->lock();
366
367 // Transaction expired
368 if (tran->expired()) {
369 bridgeLog_->warning("Transaction expired. Id=%" PRIu32, id);
370 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
371 continue; // while (1)
372 }
373
374 // Double check transaction
375 if ((addr != tran->address()) || (size != tran->size()) || (type != tran->type())) {
376 bridgeLog_->warning("Transaction data mismatch. Id=%" PRIu32, id);
377 tran->error("Transaction data mismatch in TcpClient");
378 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
379 continue; // while (1)
380 }
381
382 // Copy data if read
383 if (type != rim::Write) {
384 if (zmq_msg_size(&(msg[4])) != size) {
385 bridgeLog_->warning("Transaction size mismatch. Id=%" PRIu32, id);
386 tran->error("Received transaction response did not match header size");
387 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
388 continue; // while (1)
389 }
390 std::memcpy(tran->begin(), zmq_msg_data(&(msg[4])), size);
391 }
392 if (strcmp(result, "OK") != 0)
393 tran->error(result);
394 else
395 tran->done();
396 bridgeLog_->debug("Response for transaction id=%" PRIu32 ", addr=0x%" PRIx64 ", size=%" PRIu32
397 ", type=%" PRIu32 ", cnt=%" PRIu32 ", port: %s, Result: (%s)",
398 id,
399 addr,
400 size,
401 type,
402 msgCnt,
403 this->respAddr_.c_str(),
404 result);
405 }
406 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
407 }
408}
409
410void rim::TcpClient::setup_python() {
411#ifndef NO_PYTHON
412
413 bp::class_<rim::TcpClient, rim::TcpClientPtr, bp::bases<rim::Slave>, boost::noncopyable>(
414 "TcpClient",
415 bp::init<std::string, uint16_t, bp::optional<bool> >())
416 .def("close", &rim::TcpClient::close)
417 .def("waitReady", &rim::TcpClient::waitReady)
418 .def("_start", &rim::TcpClient::start);
419
420 bp::implicitly_convertible<rim::TcpClientPtr, rim::SlavePtr>();
421#endif
422}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:60
Memory slave device.
Definition Slave.h:54
bool waitReady(double timeout, double period)
Wait for the remote TcpServer path to respond to a bridge probe.
std::shared_ptr< rogue::interfaces::memory::TransactionLock > TransactionLockPtr
Shared pointer alias for TransactionLock.
static const uint32_t TcpBridgeProbe
Internal TCP bridge readiness probe transaction type.
Definition Constants.h:67
static const uint32_t Write
Memory write transaction type.
Definition Constants.h:43
std::shared_ptr< rogue::interfaces::memory::Transaction > TransactionPtr
Shared pointer alias for Transaction.
static const uint32_t Post
Memory posted write transaction type.
Definition Constants.h:50
std::shared_ptr< rogue::interfaces::memory::TcpClient > TcpClientPtr
Shared pointer alias for TcpClient.
Definition TcpClient.h:190