43 #include <boost/python.hpp>
44namespace bp = boost::python;
48rpu::ClientPtr rpu::Client::create(std::string host, uint16_t port,
bool jumbo) {
49 rpu::ClientPtr r = std::make_shared<rpu::Client>(host, port, jumbo);
54rpu::Client::Client(std::string host, uint16_t port,
bool jumbo) :
rpu::
Core(jumbo) {
55 struct addrinfo aiHints;
56 struct addrinfo* aiList = 0;
57 const sockaddr_in* addr;
67 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
70 if ((
fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
72 "Failed to create socket for port %" PRIu16
" at address %s",
77 bzero(&aiHints,
sizeof(aiHints));
78 aiHints.ai_flags = AI_CANONNAME;
79 aiHints.ai_family = AF_INET;
80 aiHints.ai_socktype = SOCK_DGRAM;
81 aiHints.ai_protocol = IPPROTO_UDP;
84 if (::getaddrinfo(address_.c_str(), 0, &aiHints, &aiList) != 0 || aiList ==
nullptr) {
90 addr = (
const sockaddr_in*)(aiList->ai_addr);
93 memset(&
remAddr_, 0,
sizeof(
struct sockaddr_in));
94 ((
struct sockaddr_in*)(&
remAddr_))->sin_family = AF_INET;
95 ((
struct sockaddr_in*)(&
remAddr_))->sin_addr.s_addr = addr->sin_addr.s_addr;
96 ((
struct sockaddr_in*)(&
remAddr_))->sin_port = htons(port_);
98 ::freeaddrinfo(aiList);
105 udpLog_->debug(
"UDP client ready. remote=%s:%" PRIu16
", maxPayload=%" PRIu32,
112 thread_ =
new std::thread(&rpu::Client::runThread,
this, std::weak_ptr<int>(scopePtr));
122 pthread_setname_np(
thread_->native_handle(),
"UdpClient");
127rpu::Client::~Client() {
131void rpu::Client::stop() {
142 udpLog_->debug(
"Stopping UDP client for remote %s:%" PRIu16, address_.c_str(), port_);
148 ris::Frame::BufferIterator it;
154 struct iovec msg_iov[1];
157 msg.msg_name = &remAddr_;
158 msg.msg_namelen =
sizeof(
struct sockaddr_in);
159 msg.msg_iov = msg_iov;
161 msg.msg_control = NULL;
162 msg.msg_controllen = 0;
167 std::lock_guard<std::mutex> lock(udpMtx_);
170 if (frame->getError()) {
171 udpLog_->warning(
"Dropping errored outbound frame. remote=%s:%" PRIu16
", error=0x%" PRIx8,
179 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
180 if ((*it)->getPayload() == 0)
break;
183 msg_iov[0].iov_base = (*it)->begin();
184 msg_iov[0].iov_len = (*it)->getPayload();
196 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
197 udpLog_->critical(
"Client::acceptFrame: Timeout waiting for outbound transmit after %" PRIu32
198 ".%" PRIu32
" seconds! May be caused by outbound backpressure.",
202 }
else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
203 udpLog_->warning(
"UDP write call failed for %s: %s", address_.c_str(), std::strerror(errno));
210void rpu::Client::runThread(std::weak_ptr<int> lockPtr) {
219 while (!lockPtr.expired())
continue;
221 udpLog_->logThreadId();
224 frame = reqLocalFrame(maxPayload(),
false);
228 buff = *(frame->beginBuffer());
229 avail = buff->getAvailable();
230 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, NULL, 0);
235 udpLog_->warning(
"Receive data was too large. remote=%s:%" PRIu16
", rx=%i, avail=%" PRIu32
242 buff->setPayload(res);
247 frame = reqLocalFrame(maxPayload(),
false);
258 select(fd_ + 1, &fds, NULL, NULL, &tout);
263void rpu::Client::setup_python() {
266 bp::class_<rpu::Client, rpu::ClientPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
268 bp::init<std::string, uint16_t, bool>());
270 bp::implicitly_convertible<rpu::ClientPtr, rpu::CorePtr>();
271 bp::implicitly_convertible<rpu::ClientPtr, ris::MasterPtr>();
272 bp::implicitly_convertible<rpu::ClientPtr, ris::SlavePtr>();
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
void setFixedSize(uint32_t size)
Sets fixed-size mode.
void setPoolSize(uint32_t size)
Sets buffer pool size.
Shared UDP transport base for stream client/server endpoints.
std::atomic< bool > threadEn_
std::shared_ptr< rogue::Logging > udpLog_
struct sockaddr_in remAddr_
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
std::shared_ptr< rogue::protocols::udp::Client > ClientPtr