43 #include <boost/python.hpp>
44namespace bp = boost::python;
48rpu::ClientPtr rpu::Client::create(std::string host, uint16_t port,
bool jumbo) {
49 rpu::ClientPtr r = std::make_shared<rpu::Client>(host, port, jumbo);
54rpu::Client::Client(std::string host, uint16_t port,
bool jumbo) :
rpu::
Core(jumbo) {
55 struct addrinfo aiHints;
56 struct addrinfo* aiList = 0;
57 const sockaddr_in* addr;
67 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
70 if ((
fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
72 "Failed to create socket for port %" PRIu16
" at address %s",
77 bzero(&aiHints,
sizeof(aiHints));
78 aiHints.ai_flags = AI_CANONNAME;
79 aiHints.ai_family = AF_INET;
80 aiHints.ai_socktype = SOCK_DGRAM;
81 aiHints.ai_protocol = IPPROTO_UDP;
83 if (::getaddrinfo(address_.c_str(), 0, &aiHints, &aiList) || !aiList)
86 addr = (
const sockaddr_in*)(aiList->ai_addr);
89 memset(&
remAddr_, 0,
sizeof(
struct sockaddr_in));
90 ((
struct sockaddr_in*)(&
remAddr_))->sin_family = AF_INET;
91 ((
struct sockaddr_in*)(&
remAddr_))->sin_addr.s_addr = addr->sin_addr.s_addr;
92 ((
struct sockaddr_in*)(&
remAddr_))->sin_port = htons(port_);
100 thread_ =
new std::thread(&rpu::Client::runThread,
this, std::weak_ptr<int>(scopePtr));
102 udpLog_->debug(
"UDP client ready. remote=%s:%" PRIu16
", maxPayload=%" PRIu32,
109 pthread_setname_np(
thread_->native_handle(),
"UdpClient");
114rpu::Client::~Client() {
118void rpu::Client::stop() {
122 udpLog_->debug(
"Stopping UDP client for remote %s:%" PRIu16, address_.c_str(), port_);
130 ris::Frame::BufferIterator it;
136 struct iovec msg_iov[1];
139 msg.msg_name = &remAddr_;
140 msg.msg_namelen =
sizeof(
struct sockaddr_in);
141 msg.msg_iov = msg_iov;
143 msg.msg_control = NULL;
144 msg.msg_controllen = 0;
149 std::lock_guard<std::mutex> lock(udpMtx_);
152 if (frame->getError()) {
153 udpLog_->warning(
"Dropping errored outbound frame. remote=%s:%" PRIu16
", error=0x%" PRIx8,
161 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
162 if ((*it)->getPayload() == 0)
break;
165 msg_iov[0].iov_base = (*it)->begin();
166 msg_iov[0].iov_len = (*it)->getPayload();
178 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
179 udpLog_->critical(
"Client::acceptFrame: Timeout waiting for outbound transmit after %" PRIu32
180 ".%" PRIu32
" seconds! May be caused by outbound backpressure.",
184 }
else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
185 udpLog_->warning(
"UDP write call failed for %s: %s", address_.c_str(), std::strerror(errno));
192void rpu::Client::runThread(std::weak_ptr<int> lockPtr) {
201 while (!lockPtr.expired())
continue;
203 udpLog_->logThreadId();
206 frame = reqLocalFrame(maxPayload(),
false);
210 buff = *(frame->beginBuffer());
211 avail = buff->getAvailable();
212 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, NULL, 0);
217 udpLog_->warning(
"Receive data was too large. remote=%s:%" PRIu16
", rx=%i, avail=%" PRIu32
224 buff->setPayload(res);
229 frame = reqLocalFrame(maxPayload(),
false);
240 select(fd_ + 1, &fds, NULL, NULL, &tout);
245void rpu::Client::setup_python() {
248 bp::class_<rpu::Client, rpu::ClientPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
250 bp::init<std::string, uint16_t, bool>());
252 bp::implicitly_convertible<rpu::ClientPtr, rpu::CorePtr>();
253 bp::implicitly_convertible<rpu::ClientPtr, ris::MasterPtr>();
254 bp::implicitly_convertible<rpu::ClientPtr, ris::SlavePtr>();
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
void setFixedSize(uint32_t size)
Sets fixed-size mode.
void setPoolSize(uint32_t size)
Sets buffer pool size.
Shared UDP transport base for stream client/server endpoints.
std::shared_ptr< rogue::Logging > udpLog_
struct sockaddr_in remAddr_
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
std::shared_ptr< rogue::protocols::udp::Client > ClientPtr