41 #include <boost/python.hpp>
42namespace bp = boost::python;
46rpu::ClientPtr rpu::Client::create(std::string host, uint16_t port,
bool jumbo) {
47 rpu::ClientPtr r = std::make_shared<rpu::Client>(host, port, jumbo);
52rpu::Client::Client(std::string host, uint16_t port,
bool jumbo) :
rpu::
Core(jumbo) {
53 struct addrinfo aiHints;
54 struct addrinfo* aiList = 0;
55 const sockaddr_in* addr;
65 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
68 if ((
fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
70 "Failed to create socket for port %" PRIu16
" at address %s",
75 bzero(&aiHints,
sizeof(aiHints));
76 aiHints.ai_flags = AI_CANONNAME;
77 aiHints.ai_family = AF_INET;
78 aiHints.ai_socktype = SOCK_DGRAM;
79 aiHints.ai_protocol = IPPROTO_UDP;
81 if (::getaddrinfo(address_.c_str(), 0, &aiHints, &aiList) || !aiList)
84 addr = (
const sockaddr_in*)(aiList->ai_addr);
87 memset(&
remAddr_, 0,
sizeof(
struct sockaddr_in));
88 ((
struct sockaddr_in*)(&
remAddr_))->sin_family = AF_INET;
89 ((
struct sockaddr_in*)(&
remAddr_))->sin_addr.s_addr = addr->sin_addr.s_addr;
90 ((
struct sockaddr_in*)(&
remAddr_))->sin_port = htons(port_);
98 thread_ =
new std::thread(&rpu::Client::runThread,
this, std::weak_ptr<int>(scopePtr));
102 pthread_setname_np(
thread_->native_handle(),
"UdpClient");
107rpu::Client::~Client() {
111void rpu::Client::stop() {
122 ris::Frame::BufferIterator it;
128 struct iovec msg_iov[1];
131 msg.msg_name = &remAddr_;
132 msg.msg_namelen =
sizeof(
struct sockaddr_in);
133 msg.msg_iov = msg_iov;
135 msg.msg_control = NULL;
136 msg.msg_controllen = 0;
141 std::lock_guard<std::mutex> lock(udpMtx_);
144 if (frame->getError()) {
145 udpLog_->warning(
"Client::acceptFrame: Dumping errored frame");
150 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
151 if ((*it)->getPayload() == 0)
break;
154 msg_iov[0].iov_base = (*it)->begin();
155 msg_iov[0].iov_len = (*it)->getPayload();
167 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
168 udpLog_->critical(
"Client::acceptFrame: Timeout waiting for outbound transmit after %" PRIu32
169 ".%" PRIu32
" seconds! May be caused by outbound backpressure.",
173 }
else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
174 udpLog_->warning(
"UDP Write Call Failed");
181void rpu::Client::runThread(std::weak_ptr<int> lockPtr) {
190 while (!lockPtr.expired())
continue;
192 udpLog_->logThreadId();
195 frame = reqLocalFrame(maxPayload(),
false);
199 buff = *(frame->beginBuffer());
200 avail = buff->getAvailable();
201 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, NULL, 0);
206 udpLog_->warning(
"Receive data was too large. Rx=%i, avail=%i Dropping.", res, avail);
208 buff->setPayload(res);
213 frame = reqLocalFrame(maxPayload(),
false);
224 select(fd_ + 1, &fds, NULL, NULL, &tout);
229void rpu::Client::setup_python() {
232 bp::class_<rpu::Client, rpu::ClientPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
234 bp::init<std::string, uint16_t, bool>());
236 bp::implicitly_convertible<rpu::ClientPtr, rpu::CorePtr>();
237 bp::implicitly_convertible<rpu::ClientPtr, ris::MasterPtr>();
238 bp::implicitly_convertible<rpu::ClientPtr, ris::SlavePtr>();
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
void setFixedSize(uint32_t size)
Sets fixed-size mode.
void setPoolSize(uint32_t size)
Sets buffer pool size.
Shared UDP transport base for stream client/server endpoints.
std::shared_ptr< rogue::Logging > udpLog_
struct sockaddr_in remAddr_
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
std::shared_ptr< rogue::protocols::udp::Client > ClientPtr