rogue
Loading...
Searching...
No Matches
TcpClient.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <zmq.h>
23
24#include <cstring>
25#include <memory>
26#include <string>
27#include <chrono>
28
29#include "rogue/GeneralError.h"
30#include "rogue/GilRelease.h"
31#include "rogue/Logging.h"
35
37
38#ifndef NO_PYTHON
39 #include <boost/python.hpp>
40namespace bp = boost::python;
41#endif
42
43namespace {
44static constexpr double DefaultReadyTimeout = 10.0;
45static constexpr double DefaultReadyPeriod = 0.1;
46} // namespace
47
49rim::TcpClientPtr rim::TcpClient::create(std::string addr, uint16_t port, bool waitReady) {
50 rim::TcpClientPtr r = std::make_shared<rim::TcpClient>(addr, port, waitReady);
51 return (r);
52}
53
55rim::TcpClient::TcpClient(std::string addr, uint16_t port, bool waitReady) : rim::Slave(4, 0xFFFFFFFF) {
56 int32_t opt;
57 std::string logstr;
58
59 logstr = "memory.TcpClient.";
60 logstr.append(addr);
61 logstr.append(".");
62 logstr.append(std::to_string(port));
63
64 this->bridgeLog_ = rogue::Logging::create(logstr);
65 this->probeSeq_ = 0;
66 this->probeId_ = 0;
67 this->probeDone_ = false;
68 this->probeResult_.clear();
69 this->waitReadyOnStart_ = waitReady;
70
71 // Format address
72 this->respAddr_ = "tcp://";
73 this->respAddr_.append(addr);
74 this->respAddr_.append(":");
75 this->reqAddr_ = this->respAddr_;
76
77 this->zmqCtx_ = zmq_ctx_new();
78 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
79 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
80
81 // Throws before threadEn_=true skip the dtor's stop(); free the context here.
82 try {
83 // Don't buffer when no connection
84 opt = 1;
85 if (zmq_setsockopt(this->zmqReq_, ZMQ_IMMEDIATE, &opt, sizeof(int32_t)) != 0)
86 throw(rogue::GeneralError("memory::TcpClient::TcpClient", "Failed to set socket immediate"));
87
88 this->respAddr_.append(std::to_string(static_cast<int64_t>(port + 1)));
89 this->reqAddr_.append(std::to_string(static_cast<int64_t>(port)));
90
91 this->bridgeLog_->debug("Creating response client port: %s", this->respAddr_.c_str());
92
93 opt = 0;
94 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
95 throw(rogue::GeneralError("memory::TcpClient::TcpClient", "Failed to set socket linger"));
96
97 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &opt, sizeof(int32_t)) != 0)
98 throw(rogue::GeneralError("memory::TcpClient::TcpClient", "Failed to set socket linger"));
99
100 opt = 100;
101 if (zmq_setsockopt(this->zmqResp_, ZMQ_RCVTIMEO, &opt, sizeof(int32_t)) != 0)
102 throw(rogue::GeneralError("memory::TcpClient::TcpClient", "Failed to set socket receive timeout"));
103
104 if (zmq_connect(this->zmqResp_, this->respAddr_.c_str()) < 0)
105 throw(rogue::GeneralError::create("memory::TcpClient::TcpClient",
106 "Failed to connect to remote port %" PRIu16 " at address %s",
107 port + 1,
108 addr.c_str()));
109
110 this->bridgeLog_->debug("Creating request client port: %s", this->reqAddr_.c_str());
111
112 if (zmq_connect(this->zmqReq_, this->reqAddr_.c_str()) < 0)
113 throw(rogue::GeneralError::create("memory::TcpClient::TcpClient",
114 "Failed to connect to remote port %" PRIu16 " at address %s",
115 port,
116 addr.c_str()));
117
118 // Start rx thread
119 threadEn_ = true;
120 this->thread_ = std::make_unique<std::thread>(&rim::TcpClient::runThread, this);
121
122 // Set a thread name
123#ifndef __MACH__
124 pthread_setname_np(thread_->native_handle(), "TcpClient");
125#endif
126 } catch (...) {
127 // ~std::thread on a joinable thread calls std::terminate(); join first.
128 // RCVTIMEO=100 was already set on zmqResp_ above so the worker exits
129 // within ~100 ms of threadEn_=false. Release the GIL around join() to
130 // match stop(): the worker may be blocked acquiring the GIL to deliver
131 // a transaction into Python, and joining while holding the GIL would
132 // deadlock.
133 threadEn_ = false;
134 if (thread_) {
135 {
136 rogue::GilRelease noGil;
137 thread_->join();
138 }
139 thread_.reset();
140 }
141 if (zmqResp_ != nullptr) {
142 zmq_close(zmqResp_);
143 zmqResp_ = nullptr;
144 }
145 if (zmqReq_ != nullptr) {
146 zmq_close(zmqReq_);
147 zmqReq_ = nullptr;
148 }
149 if (zmqCtx_ != nullptr) {
150 zmq_ctx_destroy(zmqCtx_);
151 zmqCtx_ = nullptr;
152 }
153 throw;
154 }
155}
156
158rim::TcpClient::~TcpClient() {
159 this->stop();
160}
161
162// deprecated
163void rim::TcpClient::close() {
164 this->stop();
165}
166
167void rim::TcpClient::stop() {
168 if (threadEn_) {
169 rogue::GilRelease noGil;
170 threadEn_ = false;
171 thread_->join();
172 thread_.reset();
173 zmq_close(this->zmqResp_);
174 zmqResp_ = nullptr;
175 zmq_close(this->zmqReq_);
176 zmqReq_ = nullptr;
177 zmq_ctx_destroy(this->zmqCtx_);
178 zmqCtx_ = nullptr;
179 }
180}
181
182bool rim::TcpClient::waitReady(double timeout, double period) {
183 uint32_t id;
184 uint64_t addr = 0;
185 uint32_t size = 0;
186 uint32_t type = rim::TcpBridgeProbe;
187 zmq_msg_t msg[4];
188 auto deadline = std::chrono::steady_clock::now() + std::chrono::duration<double>(timeout);
189
190 // Retry the probe until the overall timeout expires. This verifies the
191 // full request/response bridge path, not just local socket setup.
192 while (std::chrono::steady_clock::now() < deadline) {
193 {
194 std::lock_guard<std::mutex> probeLock(probeMtx_);
195 // Track the current probe ID so the response thread can match the
196 // incoming bridge reply back to this wait operation.
197 probeId_ = ++probeSeq_;
198 probeDone_ = false;
199 probeResult_.clear();
200 id = probeId_;
201 }
202
203 // Build a minimal internal bridge-control request. The server handles
204 // this locally and responds with the normal six-part completion
205 // message, but without touching downstream memory.
206 zmq_msg_init_size(&(msg[0]), 4);
207 std::memcpy(zmq_msg_data(&(msg[0])), &id, 4);
208 zmq_msg_init_size(&(msg[1]), 8);
209 std::memcpy(zmq_msg_data(&(msg[1])), &addr, 8);
210 zmq_msg_init_size(&(msg[2]), 4);
211 std::memcpy(zmq_msg_data(&(msg[2])), &size, 4);
212 zmq_msg_init_size(&(msg[3]), 4);
213 std::memcpy(zmq_msg_data(&(msg[3])), &type, 4);
214
215 {
216 std::lock_guard<std::mutex> block(bridgeMtx_);
217 // Send through the same request socket and multipart framing used
218 // by normal memory transactions so this exercises the real bridge
219 // path end-to-end.
220 for (uint32_t x = 0; x < 4; ++x) {
221 if (zmq_sendmsg(this->zmqReq_, &(msg[x]), (x == 3 ? 0 : ZMQ_SNDMORE) | ZMQ_DONTWAIT) < 0) {
222 bridgeLog_->debug("Readiness probe send failed for port %s", this->reqAddr_.c_str());
223 break;
224 }
225 }
226 }
227
228 for (uint32_t x = 0; x < 4; ++x) zmq_msg_close(&(msg[x]));
229
230 std::unique_lock<std::mutex> probeLock(probeMtx_);
231 // The receive thread sets probeDone_/probeResult_ when it sees a probe
232 // response with the matching ID. We only wait for one retry period
233 // here so we can re-send if the server is not up yet.
234 probeCond_.wait_for(probeLock, std::chrono::duration<double>(period), [&]() { return probeDone_ && probeId_ == id; });
235
236 if (probeDone_ && probeId_ == id && probeResult_ == "OK") {
237 bridgeLog_->debug("Readiness probe succeeded for port %s", this->reqAddr_.c_str());
238 return true;
239 }
240 }
241
242 bridgeLog_->warning("Timed out waiting for bridge readiness on port %s", this->reqAddr_.c_str());
243 return false;
244}
245
246void rim::TcpClient::start() {
247 if (!waitReadyOnStart_) return;
248
249 if (!waitReady(DefaultReadyTimeout, DefaultReadyPeriod)) {
250 throw(rogue::GeneralError::create("memory::TcpClient::start",
251 "Timed out waiting for remote TcpServer readiness on %s",
252 this->reqAddr_.c_str()));
253 }
254}
255
257void rim::TcpClient::doTransaction(rim::TransactionPtr tran) {
258 uint32_t x;
259 uint32_t msgCnt;
260 zmq_msg_t msg[5];
261 uint32_t id;
262 uint64_t addr;
263 uint32_t size;
264 uint32_t type;
265
266 rogue::GilRelease noGil;
267 std::lock_guard<std::mutex> block(bridgeMtx_);
268 rim::TransactionLockPtr lock = tran->lock();
269
270 // ID message
271 id = tran->id();
272 zmq_msg_init_size(&(msg[0]), 4);
273 std::memcpy(zmq_msg_data(&(msg[0])), &id, 4);
274
275 // Addr message
276 addr = tran->address();
277 zmq_msg_init_size(&(msg[1]), 8);
278 std::memcpy(zmq_msg_data(&(msg[1])), &addr, 8);
279
280 // Size message
281 size = tran->size();
282 zmq_msg_init_size(&(msg[2]), 4);
283 std::memcpy(zmq_msg_data(&(msg[2])), &size, 4);
284
285 // Type message
286 type = tran->type();
287 zmq_msg_init_size(&(msg[3]), 4);
288 std::memcpy(zmq_msg_data(&(msg[3])), &type, 4);
289
290 // Write transaction
291 if (type == rim::Write || type == rim::Post) {
292 msgCnt = 5;
293 zmq_msg_init_size(&(msg[4]), size);
294 std::memcpy(zmq_msg_data(&(msg[4])), tran->begin(), size);
295
296 // Read transaction
297 } else {
298 msgCnt = 4;
299 }
300
301 bridgeLog_->debug("Requested transaction id=%" PRIu32 ", addr=0x%" PRIx64 ", size=%" PRIu32 ", type=%" PRIu32
302 ", cnt=%" PRIu32 ", port: %s",
303 id,
304 addr,
305 size,
306 type,
307 msgCnt,
308 this->reqAddr_.c_str());
309
310 // Add transaction
311 if (type == rim::Post)
312 tran->done();
313 else
314 addTransaction(tran);
315
316 // Send message
317 for (x = 0; x < msgCnt; x++) {
318 if (zmq_sendmsg(this->zmqReq_, &(msg[x]), ((x == (msgCnt - 1) ? 0 : ZMQ_SNDMORE)) | ZMQ_DONTWAIT) < 0) {
319 bridgeLog_->warning("Failed to send transaction %" PRIu32 ", msg %" PRIu32 " on %s: %s",
320 id,
321 x,
322 this->reqAddr_.c_str(),
323 zmq_strerror(zmq_errno()));
324 }
325 }
326}
327
329void rim::TcpClient::runThread() {
331 bool err;
332 uint64_t more;
333 size_t moreSize;
334 uint32_t x;
335 uint32_t msgCnt;
336 zmq_msg_t msg[6];
337 uint32_t id;
338 uint64_t addr;
339 uint32_t size;
340 uint32_t type;
341 char result[1000];
342
343 bridgeLog_->logThreadId();
344
345 while (threadEn_) {
346 for (x = 0; x < 6; x++) zmq_msg_init(&(msg[x]));
347 msgCnt = 0;
348 x = 0;
349
350 // Get message
351 do {
352 // Get the message
353 if (zmq_recvmsg(this->zmqResp_, &(msg[x]), 0) >= 0) {
354 if (x != 5) x++;
355 msgCnt++;
356
357 // Is there more data?
358 more = 0;
359 moreSize = 8;
360 zmq_getsockopt(this->zmqResp_, ZMQ_RCVMORE, &more, &moreSize);
361 } else {
362 more = 1;
363 }
364 } while (threadEn_ && more);
365
366 // Proper message received
367 if (threadEn_ && (msgCnt == 6)) {
368 // Check sizes
369 if ((zmq_msg_size(&(msg[0])) != 4) || (zmq_msg_size(&(msg[1])) != 8) || (zmq_msg_size(&(msg[2])) != 4) ||
370 (zmq_msg_size(&(msg[3])) != 4) || (zmq_msg_size(&(msg[5])) > 999)) {
371 bridgeLog_->warning(
372 "Bad message sizes. id=%zu addr=%zu size=%zu type=%zu result=%zu",
373 zmq_msg_size(&(msg[0])),
374 zmq_msg_size(&(msg[1])),
375 zmq_msg_size(&(msg[2])),
376 zmq_msg_size(&(msg[3])),
377 zmq_msg_size(&(msg[5])));
378 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
379 continue; // while (1)
380 }
381
382 // Get return fields
383 std::memcpy(&id, zmq_msg_data(&(msg[0])), 4);
384 std::memcpy(&addr, zmq_msg_data(&(msg[1])), 8);
385 std::memcpy(&size, zmq_msg_data(&(msg[2])), 4);
386 std::memcpy(&type, zmq_msg_data(&(msg[3])), 4);
387
388 memset(result, 0, 1000);
389 std::strncpy(result, reinterpret_cast<char*>(zmq_msg_data(&(msg[5]))), zmq_msg_size(&(msg[5])));
390
391 if (type == rim::TcpBridgeProbe) {
392 std::lock_guard<std::mutex> probeLock(probeMtx_);
393 if (id == probeId_) {
394 probeResult_ = result;
395 probeDone_ = true;
396 probeCond_.notify_all();
397 }
398 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
399 continue; // while (1)
400 }
401
402 // Find Transaction
403 if ((tran = getTransaction(id)) == NULL) {
404 bridgeLog_->warning("Failed to find transaction id=%" PRIu32, id);
405 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
406 continue; // while (1)
407 }
408
409 // Lock transaction
410 rim::TransactionLockPtr lock = tran->lock();
411
412 if (tran->expired()) {
413 bridgeLog_->warning("Dropping late response for expired transaction. Id=%" PRIu32, id);
414 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
415 continue; // while (1)
416 }
417
418 // Double check transaction
419 if ((addr != tran->address()) || (size != tran->size()) || (type != tran->type())) {
420 bridgeLog_->warning("Transaction data mismatch. Id=%" PRIu32, id);
421 tran->error("Transaction data mismatch in TcpClient");
422 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
423 continue; // while (1)
424 }
425
426 // Copy data if read
427 if (type != rim::Write) {
428 if (zmq_msg_size(&(msg[4])) != size) {
429 bridgeLog_->warning("Transaction size mismatch. Id=%" PRIu32, id);
430 tran->error("Received transaction response did not match header size");
431 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
432 continue; // while (1)
433 }
434 std::memcpy(tran->begin(), zmq_msg_data(&(msg[4])), size);
435 }
436 if (strcmp(result, "OK") != 0)
437 tran->error(result);
438 else
439 tran->done();
440 bridgeLog_->debug("Response for transaction id=%" PRIu32 ", addr=0x%" PRIx64 ", size=%" PRIu32
441 ", type=%" PRIu32 ", cnt=%" PRIu32 ", port: %s, Result: (%s)",
442 id,
443 addr,
444 size,
445 type,
446 msgCnt,
447 this->respAddr_.c_str(),
448 result);
449 }
450 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
451 }
452}
453
454void rim::TcpClient::setup_python() {
455#ifndef NO_PYTHON
456
457#pragma GCC diagnostic push
458#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
459 bp::class_<rim::TcpClient, rim::TcpClientPtr, bp::bases<rim::Slave>, boost::noncopyable>(
460 "TcpClient",
461 bp::init<std::string, uint16_t, bp::optional<bool> >())
462 .def("close", &rim::TcpClient::close)
463 .def("waitReady", &rim::TcpClient::waitReady)
464 .def("_start", &rim::TcpClient::start)
465 .def("_stop", &rim::TcpClient::stop);
466#pragma GCC diagnostic pop
467
468 bp::implicitly_convertible<rim::TcpClientPtr, rim::SlavePtr>();
469#endif
470}
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
Memory slave device.
Definition Slave.h:54
bool waitReady(double timeout, double period)
Wait for the remote TcpServer path to respond to a bridge probe.
std::shared_ptr< rogue::interfaces::memory::TransactionLock > TransactionLockPtr
Shared pointer alias for TransactionLock.
static const uint32_t TcpBridgeProbe
Internal TCP bridge readiness probe transaction type.
Definition Constants.h:67
static const uint32_t Write
Memory write transaction type.
Definition Constants.h:43
std::shared_ptr< rogue::interfaces::memory::Transaction > TransactionPtr
Shared pointer alias for Transaction.
static const uint32_t Post
Memory posted write transaction type.
Definition Constants.h:50
std::shared_ptr< rogue::interfaces::memory::TcpClient > TcpClientPtr
Shared pointer alias for TcpClient.
Definition TcpClient.h:189