39 #include <boost/python.hpp>
40namespace bp = boost::python;
44static constexpr double DefaultReadyTimeout = 10.0;
45static constexpr double DefaultReadyPeriod = 0.1;
55rim::TcpClient::TcpClient(std::string addr, uint16_t port,
bool waitReady) :
rim::
Slave(4, 0xFFFFFFFF) {
59 logstr =
"memory.TcpClient.";
62 logstr.append(std::to_string(port));
67 this->probeDone_ =
false;
68 this->probeResult_.clear();
72 this->respAddr_ =
"tcp://";
73 this->respAddr_.append(addr);
74 this->respAddr_.append(
":");
75 this->reqAddr_ = this->respAddr_;
77 this->zmqCtx_ = zmq_ctx_new();
78 this->zmqResp_ = zmq_socket(this->zmqCtx_, ZMQ_PULL);
79 this->zmqReq_ = zmq_socket(this->zmqCtx_, ZMQ_PUSH);
83 if (zmq_setsockopt(this->zmqReq_, ZMQ_IMMEDIATE, &opt,
sizeof(int32_t)) != 0)
84 throw(
rogue::GeneralError(
"memory::TcpClient::TcpClient",
"Failed to set socket immediate"));
86 this->respAddr_.append(std::to_string(
static_cast<int64_t
>(port + 1)));
87 this->reqAddr_.append(std::to_string(
static_cast<int64_t
>(port)));
89 this->bridgeLog_->debug(
"Creating response client port: %s", this->respAddr_.c_str());
92 if (zmq_setsockopt(this->zmqResp_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
95 if (zmq_setsockopt(this->zmqReq_, ZMQ_LINGER, &opt,
sizeof(int32_t)) != 0)
99 if (zmq_setsockopt(this->zmqResp_, ZMQ_RCVTIMEO, &opt,
sizeof(int32_t)) != 0)
100 throw(
rogue::GeneralError(
"memory::TcpClient::TcpClient",
"Failed to set socket receive timeout"));
102 if (zmq_connect(this->zmqResp_, this->respAddr_.c_str()) < 0)
104 "Failed to connect to remote port %" PRIu16
" at address %s",
108 this->bridgeLog_->debug(
"Creating request client port: %s", this->reqAddr_.c_str());
110 if (zmq_connect(this->zmqReq_, this->reqAddr_.c_str()) < 0)
112 "Failed to connect to remote port %" PRIu16
" at address %s",
118 this->thread_ =
new std::thread(&rim::TcpClient::runThread,
this);
122 pthread_setname_np(thread_->native_handle(),
"TcpClient");
127rim::TcpClient::~TcpClient() {
132void rim::TcpClient::close() {
136void rim::TcpClient::stop() {
141 zmq_close(this->zmqResp_);
142 zmq_close(this->zmqReq_);
143 zmq_ctx_destroy(this->zmqCtx_);
147bool rim::TcpClient::waitReady(
double timeout,
double period) {
153 auto deadline = std::chrono::steady_clock::now() + std::chrono::duration<double>(timeout);
157 while (std::chrono::steady_clock::now() < deadline) {
159 std::lock_guard<std::mutex> probeLock(probeMtx_);
162 probeId_ = ++probeSeq_;
164 probeResult_.clear();
171 zmq_msg_init_size(&(msg[0]), 4);
172 std::memcpy(zmq_msg_data(&(msg[0])), &
id, 4);
173 zmq_msg_init_size(&(msg[1]), 8);
174 std::memcpy(zmq_msg_data(&(msg[1])), &addr, 8);
175 zmq_msg_init_size(&(msg[2]), 4);
176 std::memcpy(zmq_msg_data(&(msg[2])), &size, 4);
177 zmq_msg_init_size(&(msg[3]), 4);
178 std::memcpy(zmq_msg_data(&(msg[3])), &type, 4);
181 std::lock_guard<std::mutex> block(bridgeMtx_);
185 for (uint32_t x = 0; x < 4; ++x) {
186 if (zmq_sendmsg(this->zmqReq_, &(msg[x]), (x == 3 ? 0 : ZMQ_SNDMORE) | ZMQ_DONTWAIT) < 0) {
187 bridgeLog_->debug(
"Readiness probe send failed for port %s", this->reqAddr_.c_str());
193 for (uint32_t x = 0; x < 4; ++x) zmq_msg_close(&(msg[x]));
195 std::unique_lock<std::mutex> probeLock(probeMtx_);
199 probeCond_.wait_for(probeLock, std::chrono::duration<double>(period), [&]() {
return probeDone_ && probeId_ == id; });
201 if (probeDone_ && probeId_ ==
id && probeResult_ ==
"OK") {
202 bridgeLog_->debug(
"Readiness probe succeeded for port %s", this->reqAddr_.c_str());
207 bridgeLog_->warning(
"Timed out waiting for bridge readiness on port %s", this->reqAddr_.c_str());
211void rim::TcpClient::start() {
212 if (!waitReadyOnStart_)
return;
214 if (!waitReady(DefaultReadyTimeout, DefaultReadyPeriod)) {
216 "Timed out waiting for remote TcpServer readiness on %s",
217 this->reqAddr_.c_str()));
232 std::lock_guard<std::mutex> block(bridgeMtx_);
237 zmq_msg_init_size(&(msg[0]), 4);
238 std::memcpy(zmq_msg_data(&(msg[0])), &
id, 4);
241 addr = tran->address();
242 zmq_msg_init_size(&(msg[1]), 8);
243 std::memcpy(zmq_msg_data(&(msg[1])), &addr, 8);
247 zmq_msg_init_size(&(msg[2]), 4);
248 std::memcpy(zmq_msg_data(&(msg[2])), &size, 4);
252 zmq_msg_init_size(&(msg[3]), 4);
253 std::memcpy(zmq_msg_data(&(msg[3])), &type, 4);
258 zmq_msg_init_size(&(msg[4]), size);
259 std::memcpy(zmq_msg_data(&(msg[4])), tran->begin(), size);
266 bridgeLog_->debug(
"Requested transaction id=%" PRIu32
", addr=0x%" PRIx64
", size=%" PRIu32
", type=%" PRIu32
267 ", cnt=%" PRIu32
", port: %s",
273 this->reqAddr_.c_str());
279 addTransaction(tran);
282 for (x = 0; x < msgCnt; x++) {
283 if (zmq_sendmsg(this->zmqReq_, &(msg[x]), ((x == (msgCnt - 1) ? 0 : ZMQ_SNDMORE)) | ZMQ_DONTWAIT) < 0) {
284 bridgeLog_->warning(
"Failed to send transaction %" PRIu32
", msg %" PRIu32,
id, x);
290void rim::TcpClient::runThread() {
304 bridgeLog_->logThreadId();
307 for (x = 0; x < 6; x++) zmq_msg_init(&(msg[x]));
314 if (zmq_recvmsg(this->zmqResp_, &(msg[x]), 0) >= 0) {
321 zmq_getsockopt(this->zmqResp_, ZMQ_RCVMORE, &more, &moreSize);
325 }
while (threadEn_ && more);
328 if (threadEn_ && (msgCnt == 6)) {
330 if ((zmq_msg_size(&(msg[0])) != 4) || (zmq_msg_size(&(msg[1])) != 8) || (zmq_msg_size(&(msg[2])) != 4) ||
331 (zmq_msg_size(&(msg[3])) != 4) || (zmq_msg_size(&(msg[5])) > 999)) {
332 bridgeLog_->warning(
"Bad message sizes");
333 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
338 std::memcpy(&
id, zmq_msg_data(&(msg[0])), 4);
339 std::memcpy(&addr, zmq_msg_data(&(msg[1])), 8);
340 std::memcpy(&size, zmq_msg_data(&(msg[2])), 4);
341 std::memcpy(&type, zmq_msg_data(&(msg[3])), 4);
343 memset(result, 0, 1000);
344 std::strncpy(result,
reinterpret_cast<char*
>(zmq_msg_data(&(msg[5]))), zmq_msg_size(&(msg[5])));
347 std::lock_guard<std::mutex> probeLock(probeMtx_);
348 if (
id == probeId_) {
349 probeResult_ = result;
351 probeCond_.notify_all();
353 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
358 if ((tran = getTransaction(
id)) == NULL) {
359 bridgeLog_->warning(
"Failed to find transaction id=%" PRIu32,
id);
360 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
368 if (tran->expired()) {
369 bridgeLog_->warning(
"Transaction expired. Id=%" PRIu32,
id);
370 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
375 if ((addr != tran->address()) || (size != tran->size()) || (type != tran->type())) {
376 bridgeLog_->warning(
"Transaction data mismatch. Id=%" PRIu32,
id);
377 tran->error(
"Transaction data mismatch in TcpClient");
378 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
384 if (zmq_msg_size(&(msg[4])) != size) {
385 bridgeLog_->warning(
"Transaction size mismatch. Id=%" PRIu32,
id);
386 tran->error(
"Received transaction response did not match header size");
387 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
390 std::memcpy(tran->begin(), zmq_msg_data(&(msg[4])), size);
392 if (strcmp(result,
"OK") != 0)
396 bridgeLog_->debug(
"Response for transaction id=%" PRIu32
", addr=0x%" PRIx64
", size=%" PRIu32
397 ", type=%" PRIu32
", cnt=%" PRIu32
", port: %s, Result: (%s)",
403 this->respAddr_.c_str(),
406 for (x = 0; x < msgCnt; x++) zmq_msg_close(&(msg[x]));
410void rim::TcpClient::setup_python() {
413 bp::class_<rim::TcpClient, rim::TcpClientPtr, bp::bases<rim::Slave>, boost::noncopyable>(
415 bp::init<std::string, uint16_t, bp::optional<bool> >())
416 .def(
"close", &rim::TcpClient::close)
417 .def(
"waitReady", &rim::TcpClient::waitReady)
418 .def(
"_start", &rim::TcpClient::start);
420 bp::implicitly_convertible<rim::TcpClientPtr, rim::SlavePtr>();
Generic Rogue exception type.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
bool waitReady(double timeout, double period)
Wait for the remote TcpServer path to respond to a bridge probe.
std::shared_ptr< rogue::interfaces::memory::TransactionLock > TransactionLockPtr
Shared pointer alias for TransactionLock.
static const uint32_t TcpBridgeProbe
Internal TCP bridge readiness probe transaction type.
static const uint32_t Write
Memory write transaction type.
std::shared_ptr< rogue::interfaces::memory::Transaction > TransactionPtr
Shared pointer alias for Transaction.
static const uint32_t Post
Memory posted write transaction type.
std::shared_ptr< rogue::interfaces::memory::TcpClient > TcpClientPtr
Shared pointer alias for TcpClient.