21#include <infiniband/verbs.h>
48 #include <boost/python.hpp>
49namespace bp = boost::python;
62 uint32_t rxQueueDepth) {
63 return std::make_shared<rpr::Server>(
64 deviceName, ibPort, gidIndex,
maxPayload, rxQueueDepth);
72rpr::Server::Server(
const std::string& deviceName,
76 uint32_t rxQueueDepth)
77 :
rpr::
Core(deviceName, ibPort, gidIndex, maxPayload),
80 cq_(nullptr), qp_(nullptr), mr_(nullptr),
83 numBufs_(rxQueueDepth),
96 memset(hostGid_, 0, 16);
97 memset(fpgaGid_, 0, 16);
114 uint64_t slabSize64 =
static_cast<uint64_t
>(numBufs_) * bufSize_;
115 if (slabSize64 > UINT32_MAX)
117 "RX slab too large: %u * %u = %" PRIu64
" exceeds 4 GiB",
118 numBufs_, bufSize_, slabSize64));
119 slabSize_ =
static_cast<uint32_t
>(slabSize64);
121 void* slabPtr =
nullptr;
122 if (posix_memalign(&slabPtr, 4096, slabSize_) != 0 || !slabPtr)
124 "Failed to allocate RX slab (%u bytes)",
126 slab_ =
static_cast<uint8_t*
>(slabPtr);
127 memset(slab_, 0, slabSize_);
132 mr_ = ibv_reg_mr(
pd_, slab_, slabSize_,
133 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
136 "ibv_reg_mr failed"));
138 mrAddr_ =
reinterpret_cast<uint64_t
>(slab_);
141 log_->info(
"MR: addr=0x%016" PRIx64
" rkey=0x%08x size=%u",
142 mrAddr_, mrRkey_, slabSize_);
147 cq_ = ibv_create_cq(
ctx_,
148 static_cast<int>(numBufs_),
149 nullptr,
nullptr, 0);
152 "ibv_create_cq failed"));
157 struct ibv_qp_init_attr qpAttr;
158 memset(&qpAttr, 0,
sizeof(qpAttr));
159 qpAttr.qp_type = IBV_QPT_RC;
160 qpAttr.sq_sig_all = 0;
161 qpAttr.send_cq = cq_;
162 qpAttr.recv_cq = cq_;
163 qpAttr.cap.max_recv_wr = numBufs_;
164 qpAttr.cap.max_send_wr = 1;
165 qpAttr.cap.max_recv_sge = 1;
166 qpAttr.cap.max_send_sge = 1;
168 qp_ = ibv_create_qp(
pd_, &qpAttr);
171 "ibv_create_qp (RC) failed"));
173 hostQpn_ = qp_->qp_num;
179 struct ibv_qp_attr attr;
180 memset(&attr, 0,
sizeof(attr));
181 attr.qp_state = IBV_QPS_INIT;
184 attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE |
185 IBV_ACCESS_REMOTE_READ |
186 IBV_ACCESS_LOCAL_WRITE;
188 if (ibv_modify_qp(qp_, &attr,
192 IBV_QP_ACCESS_FLAGS))
194 "QP RESET→INIT failed"));
217 "ibv_query_gid failed"));
219 for (
int i = 0; i < 16; ++i) {
220 if (gid.raw[i] != 0) {
227 "GID query returned all-zero for "
228 "gidIndex=%u on device '%s' "
229 "(likely out-of-range; rdma_rxe "
230 "does not validate gidIndex and "
231 "returns an empty GID for unused "
235 memcpy(hostGid_, gid.raw, 16);
242 std::mt19937 psnRng(std::random_device {}());
243 hostRqPsn_ = psnRng() & 0xFFFFFF;
244 hostSqPsn_ = psnRng() & 0xFFFFFF;
246 log_->info(
"RC QP ready: qpn=0x%06x rqPsn=0x%06x sqPsn=0x%06x",
247 hostQpn_, hostRqPsn_, hostSqPsn_);
259void rpr::Server::cleanupResources() {
281void rpr::Server::setFpgaGid(
const std::string& gidBytes) {
282 if (gidBytes.size() != 16)
284 "GID must be 16 bytes, got %zu",
286 memcpy(fpgaGid_, gidBytes.c_str(), 16);
287 log_->info(
"FPGA GID stored");
293void rpr::Server::completeConnection(uint32_t fpgaQpn, uint32_t fpgaRqPsn,
294 uint32_t pmtu, uint32_t minRnrTimer) {
300 if (thread_ !=
nullptr)
302 "rocev2::Server::completeConnection",
303 "completeConnection already invoked; destroy and recreate "
304 "Server to re-establish the RC connection"));
310 if (pmtu < 1 || pmtu > 5)
312 "rocev2::Server::completeConnection",
313 "pmtu must be in the range 1..5 "
314 "(1=256 2=512 3=1024 4=2048 5=4096); got %u", pmtu));
316 log_->info(
"completeConnection: fpgaQpn=0x%06x fpgaRqPsn=0x%06x minRnrTimer=%u",
317 fpgaQpn, fpgaRqPsn, minRnrTimer);
321 for (uint32_t i = 0; i < numBufs_; ++i) postRecvWr(i);
322 log_->info(
"Pre-posted %u recv WRs", numBufs_);
327 memcpy(dgid.raw, fpgaGid_, 16);
329 struct ibv_qp_attr attr;
330 memset(&attr, 0,
sizeof(attr));
331 attr.qp_state = IBV_QPS_RTR;
332 attr.path_mtu =
static_cast<ibv_mtu
>(pmtu);
333 attr.dest_qp_num = fpgaQpn;
334 attr.rq_psn = fpgaRqPsn;
335 attr.max_dest_rd_atomic = 16;
336 attr.min_rnr_timer = minRnrTimer;
337 attr.ah_attr.is_global = 1;
338 attr.ah_attr.grh.dgid = dgid;
339 attr.ah_attr.grh.sgid_index = gidIndex_;
340 attr.ah_attr.grh.hop_limit = 64;
341 attr.ah_attr.port_num = ibPort_;
344 if (ibv_modify_qp(qp_, &attr,
350 IBV_QP_MAX_DEST_RD_ATOMIC |
351 IBV_QP_MIN_RNR_TIMER))
353 "QP INIT→RTR failed"));
356 log_->info(
"QP → RTR (minRnrTimer=%u)", minRnrTimer);
360 struct ibv_qp_attr attr;
361 memset(&attr, 0,
sizeof(attr));
362 attr.qp_state = IBV_QPS_RTS;
363 attr.sq_psn = hostSqPsn_;
367 attr.max_rd_atomic = 16;
369 if (ibv_modify_qp(qp_, &attr,
375 IBV_QP_MAX_QP_RD_ATOMIC))
377 "QP RTR→RTS failed"));
380 log_->info(
"QP → RTS — ready to receive RDMA WRITEs");
383 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
384 threadEn_.store(
true);
385 thread_ =
new std::thread(&rpr::Server::runThread,
this,
386 std::weak_ptr<int>(scopePtr));
389 pthread_setname_np(thread_->native_handle(),
"RoCEv2Server");
396std::string rpr::Server::getGid()
const {
397 std::ostringstream oss;
398 for (
int i = 0; i < 16; i += 2) {
400 oss << std::hex << std::setfill(
'0')
401 << std::setw(2) <<
static_cast<int>(hostGid_[i])
402 << std::setw(2) <<
static_cast<int>(hostGid_[i+1]);
411void rpr::Server::postRecvWr(uint32_t slot) {
417 if (slot >= numBufs_)
419 "slot=%u out of range (numBufs=%u)",
422 uint8_t* bufStart = slab_ + (
static_cast<uint64_t
>(slot) * bufSize_);
425 memset(&sge, 0,
sizeof(sge));
426 sge.addr =
reinterpret_cast<uint64_t
>(bufStart);
427 sge.length = bufSize_;
428 sge.lkey = mr_->lkey;
430 struct ibv_recv_wr wr;
431 memset(&wr, 0,
sizeof(wr));
432 wr.wr_id =
static_cast<uint64_t
>(slot);
437 struct ibv_recv_wr* bad =
nullptr;
438 if (ibv_post_recv(qp_, &wr, &bad))
440 "ibv_post_recv failed (slot=%u)", slot));
452void rpr::Server::retBuffer(uint8_t* data, uint32_t meta, uint32_t rawSize) {
453 uint32_t slot = meta & 0x00FFFFFF;
455 log_->debug(
"retBuffer: re-posting slot=%u", slot);
457 if (threadEn_.load() && qp_) {
471void rpr::Server::runThread(std::weak_ptr<int> lockPtr) {
472 while (!lockPtr.expired())
continue;
475 log_->info(
"RoCEv2 receive thread started");
488 while (threadEn_.load()) {
489 int n = ibv_poll_cq(cq_, 1, &wc);
491 std::this_thread::sleep_for(std::chrono::microseconds(100));
495 log_->warning(
"ibv_poll_cq error");
499 uint32_t slot =
static_cast<uint32_t
>(wc.wr_id);
506 if (slot >= numBufs_) {
507 log_->warning(
"CQ returned out-of-range wr_id=%" PRIu64
508 " (numBufs=%u); discarding",
509 static_cast<uint64_t
>(wc.wr_id), numBufs_);
513 if (wc.status != IBV_WC_SUCCESS) {
514 log_->warning(
"CQ error: %s (slot=%u)",
515 ibv_wc_status_str(wc.status), slot);
517 if (wc.status == IBV_WC_WR_FLUSH_ERR) {
521 log_->error(
"QP in ERROR state (WR_FLUSH_ERR); exiting CQ poll thread");
522 threadEn_.store(
false);
530 if (wc.opcode != IBV_WC_RECV_RDMA_WITH_IMM) {
531 log_->warning(
"Unexpected opcode %d (slot=%u), re-posting",
538 uint8_t channel =
static_cast<uint8_t
>(wc.imm_data & 0xFF);
539 uint32_t payloadLen = wc.byte_len;
541 if (payloadLen == 0 || payloadLen > bufSize_) {
542 log_->warning(
"Bad payload len=%u (slot=%u), re-posting",
550 uint8_t* slotPtr = slab_ + (
static_cast<uint64_t
>(slot) * bufSize_);
556 buff->setPayload(payloadLen);
559 frame->appendBuffer(buff);
560 frame->setChannel(channel);
561 frame->setFirstUser(
SsiSof);
562 frame->setLastUser(0);
564 log_->debug(
"RX slot=%u channel=%u len=%u", slot, channel, payloadLen);
568 frameCount_.fetch_add(1, std::memory_order_relaxed);
569 byteCount_.fetch_add(payloadLen, std::memory_order_relaxed);
572 log_->error(
"RoCEv2 receive thread exiting on ibverbs error: %s", e.
what());
573 threadEn_.store(
false);
574 }
catch (
const std::exception& e) {
575 log_->error(
"RoCEv2 receive thread exiting on exception: %s", e.what());
576 threadEn_.store(
false);
578 log_->error(
"RoCEv2 receive thread exiting on unknown exception");
579 threadEn_.store(
false);
582 log_->info(
"RoCEv2 receive thread stopped");
591 log_->warning(
"RoCEv2 Server::acceptFrame: TX not supported, dropping");
597void rpr::Server::stop() {
603 threadEn_.store(
false);
605 if (thread_->joinable()) thread_->join();
612rpr::Server::~Server() { this->stop(); }
617void rpr::Server::setup_python() {
621 bp::bases<rpr::Core, ris::Master, ris::Slave>,
624 bp::init<std::string, uint8_t, uint8_t, uint32_t, uint32_t>(
625 (bp::arg(
"deviceName"),
626 bp::arg(
"ibPort") = 1,
627 bp::arg(
"gidIndex") = 0,
630 .def(
"create", &rpr::Server::create)
631 .staticmethod(
"create")
632 .def(
"setFpgaGid", &rpr::Server::setFpgaGid)
633 .def(
"completeConnection", &rpr::Server::completeConnection,
635 bp::arg(
"fpgaRqPsn"),
637 bp::arg(
"minRnrTimer") = 1))
638 .def(
"getQpn", &rpr::Server::getQpn)
639 .def(
"getGid", &rpr::Server::getGid)
640 .def(
"getRqPsn", &rpr::Server::getRqPsn)
641 .def(
"getSqPsn", &rpr::Server::getSqPsn)
642 .def(
"getMrAddr", &rpr::Server::getMrAddr)
643 .def(
"getMrRkey", &rpr::Server::getMrRkey)
644 .def(
"getFrameCount", &rpr::Server::getFrameCount)
645 .def(
"getByteCount", &rpr::Server::getByteCount)
646 .def(
"stop", &rpr::Server::stop);
648 bp::implicitly_convertible<rpr::ServerPtr, rpr::CorePtr>();
649 bp::implicitly_convertible<rpr::ServerPtr, ris::MasterPtr>();
650 bp::implicitly_convertible<rpr::ServerPtr, ris::SlavePtr>();
Generic Rogue exception type.
char const * what() const
Returns exception text for standard exception handling.
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
struct ibv_context * ctx_
uint32_t maxPayload() const
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
std::shared_ptr< rogue::protocols::rocev2::Server > ServerPtr
static const uint32_t DefaultMaxPayload
static const uint32_t DefaultRxQueueDepth
static const uint8_t SsiSof