41 #include <boost/python.hpp>
42namespace bp = boost::python;
52rpu::Server::Server(uint16_t port,
bool jumbo) :
rpu::
Core(jumbo) {
61 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
64 if ((
fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
68 memset(&locAddr_, 0,
sizeof(
struct sockaddr_in));
69 locAddr_.sin_family = AF_INET;
70 locAddr_.sin_addr.s_addr = htonl(INADDR_ANY);
71 locAddr_.sin_port = htons(port_);
73 memset(&
remAddr_, 0,
sizeof(
struct sockaddr_in));
75 if (bind(
fd_, (
struct sockaddr*)&locAddr_,
sizeof(locAddr_)) < 0)
77 "Failed to bind to local port %" PRIu16
". Another process may be using it",
82 len =
sizeof(locAddr_);
83 if (getsockname(
fd_, (
struct sockaddr*)&locAddr_, &len) < 0)
85 port_ = ntohs(locAddr_.sin_port);
94 thread_ =
new std::thread(&rpu::Server::runThread,
this, std::weak_ptr<int>(scopePtr));
98 pthread_setname_np(
thread_->native_handle(),
"UdpServer");
103rpu::Server::~Server() {
107void rpu::Server::stop() {
117uint32_t rpu::Server::getPort() {
123 ris::Frame::BufferIterator it;
129 struct iovec msg_iov[1];
133 std::lock_guard<std::mutex> lock(udpMtx_);
136 if (frame->getError()) {
137 udpLog_->warning(
"Server::acceptFrame: Dumping errored frame");
142 msg.msg_name = &remAddr_;
143 msg.msg_namelen =
sizeof(
struct sockaddr_in);
144 msg.msg_iov = msg_iov;
146 msg.msg_control = NULL;
147 msg.msg_controllen = 0;
151 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
152 if ((*it)->getPayload() == 0)
break;
155 msg_iov[0].iov_base = (*it)->begin();
156 msg_iov[0].iov_len = (*it)->getPayload();
168 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
169 udpLog_->critical(
"Server::acceptFrame: Timeout waiting for outbound transmit after %" PRIuLEAST32
170 ".%" PRIuLEAST32
" seconds! May be caused by outbound backpressure.",
174 }
else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
175 udpLog_->warning(
"UDP Write Call Failed");
182void rpu::Server::runThread(std::weak_ptr<int> lockPtr) {
188 struct sockaddr_in tmpAddr;
193 while (!lockPtr.expired())
continue;
195 udpLog_->logThreadId();
198 frame = reqLocalFrame(maxPayload(),
false);
202 buff = *(frame->beginBuffer());
203 avail = buff->getAvailable();
204 tmpLen =
sizeof(
struct sockaddr_in);
205 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, (
struct sockaddr*)&tmpAddr, &tmpLen);
210 udpLog_->warning(
"Receive data was too large. Dropping.");
212 buff->setPayload(res);
217 frame = reqLocalFrame(maxPayload(),
false);
220 if (memcmp(&remAddr_, &tmpAddr,
sizeof(remAddr_)) != 0) {
221 std::lock_guard<std::mutex> lock(udpMtx_);
234 select(fd_ + 1, &fds, NULL, NULL, &tout);
239void rpu::Server::setup_python() {
242 bp::class_<rpu::Server, rpu::ServerPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
244 bp::init<uint16_t, bool>())
245 .def(
"getPort", &rpu::Server::getPort);
247 bp::implicitly_convertible<rpu::ServerPtr, rpu::CorePtr>();
248 bp::implicitly_convertible<rpu::ServerPtr, ris::MasterPtr>();
249 bp::implicitly_convertible<rpu::ServerPtr, ris::SlavePtr>();
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
void setFixedSize(uint32_t size)
Sets fixed-size mode.
void setPoolSize(uint32_t size)
Sets buffer pool size.
Shared UDP transport base for stream client/server endpoints.
std::shared_ptr< rogue::Logging > udpLog_
struct sockaddr_in remAddr_
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
std::shared_ptr< rogue::protocols::udp::Server > ServerPtr