39 #include <boost/python.hpp>
40namespace bp = boost::python;
44static constexpr double DefaultReadyTimeout = 10.0;
45static constexpr double DefaultReadyPeriod = 0.1;
55rim::TcpClient::TcpClient(std::string addr, uint16_t port,
bool waitReady) :
rim::
Slave(4, 0xFFFFFFFF) {
59 logstr =
"memory.TcpClient.";
62 logstr.append(std::to_string(port));
67 this->probeDone_ =
false;
68 this->probeResult_.clear();
72 this->respAddr_ =
"tcp://";
73 this->respAddr_.append(addr);
74 this->respAddr_.append(
":");
75 this->reqAddr_ = this->respAddr_;
77 this->zmqCtx_ = zmq_ctx_new();
78 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
79 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
85 if (zmq_setsockopt(this->zmqReq_, ZMQ_IMMEDIATE, &opt,
sizeof(int32_t)) != 0)
86 throw(
rogue::GeneralError(
"memory::TcpClient::TcpClient",
"Failed to set socket immediate"));
88 this->respAddr_.append(std::to_string(
static_cast<int64_t
>(port + 1)));
89 this->reqAddr_.append(std::to_string(
static_cast<int64_t
>(port)));
91 this->bridgeLog_->debug(
"Creating response client port: %s", this->respAddr_.c_str());
94 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
97 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
101 if (zmq_setsockopt(this->zmqResp_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
102 throw(
rogue::GeneralError(
"memory::TcpClient::TcpClient",
"Failed to set socket receive timeout"));
104 if (zmq_connect(this->zmqResp_, this->respAddr_.c_str()) < 0)
106 "Failed to connect to remote port %" PRIu16
" at address %s",
110 this->bridgeLog_->debug(
"Creating request client port: %s", this->reqAddr_.c_str());
112 if (zmq_connect(this->zmqReq_, this->reqAddr_.c_str()) < 0)
114 "Failed to connect to remote port %" PRIu16
" at address %s",
120 this->thread_ = std::make_unique<std::thread>(&rim::TcpClient::runThread,
this);
124 pthread_setname_np(thread_->native_handle(),
"TcpClient");
141 if (zmqResp_ !=
nullptr) {
145 if (zmqReq_ !=
nullptr) {
149 if (zmqCtx_ !=
nullptr) {
150 zmq_ctx_destroy(zmqCtx_);
158rim::TcpClient::~TcpClient() {
163void rim::TcpClient::close() {
167void rim::TcpClient::stop() {
173 zmq_close(this->zmqResp_);
175 zmq_close(this->zmqReq_);
177 zmq_ctx_destroy(this->zmqCtx_);
182bool rim::TcpClient::waitReady(
double timeout,
double period) {
188 auto deadline = std::chrono::steady_clock::now() + std::chrono::duration<double>(timeout);
192 while (std::chrono::steady_clock::now() < deadline) {
194 std::lock_guard<std::mutex> probeLock(probeMtx_);
197 probeId_ = ++probeSeq_;
199 probeResult_.clear();
206 zmq_msg_init_size(&(msg[0]), 4);
207 std::memcpy(zmq_msg_data(&(msg[0])), &
id, 4);
208 zmq_msg_init_size(&(msg[1]), 8);
209 std::memcpy(zmq_msg_data(&(msg[1])), &addr, 8);
210 zmq_msg_init_size(&(msg[2]), 4);
211 std::memcpy(zmq_msg_data(&(msg[2])), &size, 4);
212 zmq_msg_init_size(&(msg[3]), 4);
213 std::memcpy(zmq_msg_data(&(msg[3])), &type, 4);
216 std::lock_guard<std::mutex> block(bridgeMtx_);
220 for (uint32_t x = 0; x < 4; ++x) {
221 if (zmq_sendmsg(this->zmqReq_, &(msg[x]), (x == 3 ? 0 : ZMQ_SNDMORE) | ZMQ_DONTWAIT) < 0) {
222 bridgeLog_->debug(
"Readiness probe send failed for port %s", this->reqAddr_.c_str());
228 for (uint32_t x = 0; x < 4; ++x) zmq_msg_close(&(msg[x]));
230 std::unique_lock<std::mutex> probeLock(probeMtx_);
234 probeCond_.wait_for(probeLock, std::chrono::duration<double>(period), [&]() {
return probeDone_ && probeId_ == id; });
236 if (probeDone_ && probeId_ ==
id && probeResult_ ==
"OK") {
237 bridgeLog_->debug(
"Readiness probe succeeded for port %s", this->reqAddr_.c_str());
242 bridgeLog_->warning(
"Timed out waiting for bridge readiness on port %s", this->reqAddr_.c_str());
246void rim::TcpClient::start() {
247 if (!waitReadyOnStart_)
return;
249 if (!waitReady(DefaultReadyTimeout, DefaultReadyPeriod)) {
251 "Timed out waiting for remote TcpServer readiness on %s",
252 this->reqAddr_.c_str()));
267 std::lock_guard<std::mutex> block(bridgeMtx_);
272 zmq_msg_init_size(&(msg[0]), 4);
273 std::memcpy(zmq_msg_data(&(msg[0])), &
id, 4);
276 addr = tran->address();
277 zmq_msg_init_size(&(msg[1]), 8);
278 std::memcpy(zmq_msg_data(&(msg[1])), &addr, 8);
282 zmq_msg_init_size(&(msg[2]), 4);
283 std::memcpy(zmq_msg_data(&(msg[2])), &size, 4);
287 zmq_msg_init_size(&(msg[3]), 4);
288 std::memcpy(zmq_msg_data(&(msg[3])), &type, 4);
293 zmq_msg_init_size(&(msg[4]), size);
294 std::memcpy(zmq_msg_data(&(msg[4])), tran->begin(), size);
301 bridgeLog_->debug(
"Requested transaction id=%" PRIu32
", addr=0x%" PRIx64
", size=%" PRIu32
", type=%" PRIu32
302 ", cnt=%" PRIu32
", port: %s",
308 this->reqAddr_.c_str());
314 addTransaction(tran);
317 for (x = 0; x < msgCnt; x++) {
318 if (zmq_sendmsg(this->zmqReq_, &(msg[x]), ((x == (msgCnt - 1) ? 0 : ZMQ_SNDMORE)) | ZMQ_DONTWAIT) < 0) {
319 bridgeLog_->warning(
"Failed to send transaction %" PRIu32
", msg %" PRIu32
" on %s: %s",
322 this->reqAddr_.c_str(),
323 zmq_strerror(zmq_errno()));
329void rim::TcpClient::runThread() {
343 bridgeLog_->logThreadId();
346 for (x = 0; x < 6; x++) zmq_msg_init(&(msg[x]));
353 if (zmq_recvmsg(this->zmqResp_, &(msg[x]), 0) >= 0) {
360 zmq_getsockopt(this->zmqResp_, ZMQ_RCVMORE, &more, &moreSize);
364 }
while (threadEn_ && more);
367 if (threadEn_ && (msgCnt == 6)) {
369 if ((zmq_msg_size(&(msg[0])) != 4) || (zmq_msg_size(&(msg[1])) != 8) || (zmq_msg_size(&(msg[2])) != 4) ||
370 (zmq_msg_size(&(msg[3])) != 4) || (zmq_msg_size(&(msg[5])) > 999)) {
372 "Bad message sizes. id=%zu addr=%zu size=%zu type=%zu result=%zu",
373 zmq_msg_size(&(msg[0])),
374 zmq_msg_size(&(msg[1])),
375 zmq_msg_size(&(msg[2])),
376 zmq_msg_size(&(msg[3])),
377 zmq_msg_size(&(msg[5])));
378 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
383 std::memcpy(&
id, zmq_msg_data(&(msg[0])), 4);
384 std::memcpy(&addr, zmq_msg_data(&(msg[1])), 8);
385 std::memcpy(&size, zmq_msg_data(&(msg[2])), 4);
386 std::memcpy(&type, zmq_msg_data(&(msg[3])), 4);
388 memset(result, 0, 1000);
389 std::strncpy(result,
reinterpret_cast<char*
>(zmq_msg_data(&(msg[5]))), zmq_msg_size(&(msg[5])));
392 std::lock_guard<std::mutex> probeLock(probeMtx_);
393 if (
id == probeId_) {
394 probeResult_ = result;
396 probeCond_.notify_all();
398 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
403 if ((tran = getTransaction(
id)) == NULL) {
404 bridgeLog_->warning(
"Failed to find transaction id=%" PRIu32,
id);
405 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
412 if (tran->expired()) {
413 bridgeLog_->warning(
"Dropping late response for expired transaction. Id=%" PRIu32,
id);
414 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
419 if ((addr != tran->address()) || (size != tran->size()) || (type != tran->type())) {
420 bridgeLog_->warning(
"Transaction data mismatch. Id=%" PRIu32,
id);
421 tran->error(
"Transaction data mismatch in TcpClient");
422 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
428 if (zmq_msg_size(&(msg[4])) != size) {
429 bridgeLog_->warning(
"Transaction size mismatch. Id=%" PRIu32,
id);
430 tran->error(
"Received transaction response did not match header size");
431 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
434 std::memcpy(tran->begin(), zmq_msg_data(&(msg[4])), size);
436 if (strcmp(result,
"OK") != 0)
440 bridgeLog_->debug(
"Response for transaction id=%" PRIu32
", addr=0x%" PRIx64
", size=%" PRIu32
441 ", type=%" PRIu32
", cnt=%" PRIu32
", port: %s, Result: (%s)",
447 this->respAddr_.c_str(),
450 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
454void rim::TcpClient::setup_python() {
457#pragma GCC diagnostic push
458#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
459 bp::class_<rim::TcpClient, rim::TcpClientPtr, bp::bases<rim::Slave>, boost::noncopyable>(
461 bp::init<std::string, uint16_t, bp::optional<bool> >())
462 .def(
"close", &rim::TcpClient::close)
463 .def(
"waitReady", &rim::TcpClient::waitReady)
464 .def(
"_start", &rim::TcpClient::start)
465 .def(
"_stop", &rim::TcpClient::stop);
466#pragma GCC diagnostic pop
468 bp::implicitly_convertible<rim::TcpClientPtr, rim::SlavePtr>();
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
bool waitReady(double timeout, double period)
Wait for the remote TcpServer path to respond to a bridge probe.
std::shared_ptr< rogue::interfaces::memory::TransactionLock > TransactionLockPtr
Shared pointer alias for TransactionLock.
static const uint32_t TcpBridgeProbe
Internal TCP bridge readiness probe transaction type.
static const uint32_t Write
Memory write transaction type.
std::shared_ptr< rogue::interfaces::memory::Transaction > TransactionPtr
Shared pointer alias for Transaction.
static const uint32_t Post
Memory posted write transaction type.
std::shared_ptr< rogue::interfaces::memory::TcpClient > TcpClientPtr
Shared pointer alias for TcpClient.