43 #include <boost/python.hpp>
44namespace bp = boost::python;
54rpu::Server::Server(uint16_t port,
bool jumbo) :
rpu::
Core(jumbo) {
63 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
66 if ((
fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
70 memset(&locAddr_, 0,
sizeof(
struct sockaddr_in));
71 locAddr_.sin_family = AF_INET;
72 locAddr_.sin_addr.s_addr = htonl(INADDR_ANY);
73 locAddr_.sin_port = htons(port_);
75 memset(&
remAddr_, 0,
sizeof(
struct sockaddr_in));
77 if (bind(
fd_, (
struct sockaddr*)&locAddr_,
sizeof(locAddr_)) < 0)
79 "Failed to bind to local port %" PRIu16
". Another process may be using it",
84 len =
sizeof(locAddr_);
85 if (getsockname(
fd_, (
struct sockaddr*)&locAddr_, &len) < 0)
87 port_ = ntohs(locAddr_.sin_port);
96 thread_ =
new std::thread(&rpu::Server::runThread,
this, std::weak_ptr<int>(scopePtr));
98 udpLog_->debug(
"UDP server ready. localPort=%" PRIu16
", maxPayload=%" PRIu32, port_,
maxPayload());
102 pthread_setname_np(
thread_->native_handle(),
"UdpServer");
107rpu::Server::~Server() {
111void rpu::Server::stop() {
115 udpLog_->debug(
"Stopping UDP server on local port %" PRIu16, port_);
122uint32_t rpu::Server::getPort() {
128 ris::Frame::BufferIterator it;
134 struct iovec msg_iov[1];
138 std::lock_guard<std::mutex> lock(udpMtx_);
141 if (frame->getError()) {
142 udpLog_->warning(
"Dropping errored outbound frame on local port %" PRIu16
", error=0x%" PRIx8,
149 msg.msg_name = &remAddr_;
150 msg.msg_namelen =
sizeof(
struct sockaddr_in);
151 msg.msg_iov = msg_iov;
153 msg.msg_control = NULL;
154 msg.msg_controllen = 0;
158 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
159 if ((*it)->getPayload() == 0)
break;
162 msg_iov[0].iov_base = (*it)->begin();
163 msg_iov[0].iov_len = (*it)->getPayload();
175 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
176 udpLog_->critical(
"Server::acceptFrame: Timeout waiting for outbound transmit after %" PRIuLEAST32
177 ".%" PRIuLEAST32
" seconds! May be caused by outbound backpressure.",
181 }
else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
182 udpLog_->warning(
"UDP write call failed on server port %" PRIu16
": %s",
184 std::strerror(errno));
191void rpu::Server::runThread(std::weak_ptr<int> lockPtr) {
197 struct sockaddr_in tmpAddr;
202 while (!lockPtr.expired())
continue;
204 udpLog_->logThreadId();
207 frame = reqLocalFrame(maxPayload(),
false);
211 buff = *(frame->beginBuffer());
212 avail = buff->getAvailable();
213 tmpLen =
sizeof(
struct sockaddr_in);
214 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, (
struct sockaddr*)&tmpAddr, &tmpLen);
219 udpLog_->warning(
"Receive data was too large on local port %" PRIu16
". rx=%i, avail=%" PRIu32
225 buff->setPayload(res);
230 frame = reqLocalFrame(maxPayload(),
false);
233 if (memcmp(&remAddr_, &tmpAddr,
sizeof(remAddr_)) != 0) {
234 std::lock_guard<std::mutex> lock(udpMtx_);
235 char tmpIp[INET_ADDRSTRLEN];
236 if (inet_ntop(AF_INET, &(tmpAddr.sin_addr), tmpIp,
sizeof(tmpIp)) != NULL) {
237 udpLog_->debug(
"UDP server peer updated on local port %" PRIu16
" to %s:%" PRIu16,
240 ntohs(tmpAddr.sin_port));
242 udpLog_->debug(
"UDP server peer updated on local port %" PRIu16, port_);
256 select(fd_ + 1, &fds, NULL, NULL, &tout);
261void rpu::Server::setup_python() {
264 bp::class_<rpu::Server, rpu::ServerPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
266 bp::init<uint16_t, bool>())
267 .def(
"getPort", &rpu::Server::getPort);
269 bp::implicitly_convertible<rpu::ServerPtr, rpu::CorePtr>();
270 bp::implicitly_convertible<rpu::ServerPtr, ris::MasterPtr>();
271 bp::implicitly_convertible<rpu::ServerPtr, ris::SlavePtr>();
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
void setFixedSize(uint32_t size)
Sets fixed-size mode.
void setPoolSize(uint32_t size)
Sets buffer pool size.
Shared UDP transport base for stream client/server endpoints.
std::shared_ptr< rogue::Logging > udpLog_
struct sockaddr_in remAddr_
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
std::shared_ptr< rogue::protocols::udp::Server > ServerPtr