rogue
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <arpa/inet.h>
23#include <unistd.h>
24
25#include <cerrno>
26#include <cstdlib>
27#include <cstring>
28#include <iostream>
29#include <memory>
30
31#include "rogue/GeneralError.h"
32#include "rogue/GilRelease.h"
33#include "rogue/Logging.h"
38
39namespace rpu = rogue::protocols::udp;
41
42#ifndef NO_PYTHON
43 #include <boost/python.hpp>
44namespace bp = boost::python;
45#endif
46
48rpu::ServerPtr rpu::Server::create(uint16_t port, bool jumbo) {
49 rpu::ServerPtr r = std::make_shared<rpu::Server>(port, jumbo);
50 return (r);
51}
52
54rpu::Server::Server(uint16_t port, bool jumbo) : rpu::Core(jumbo) {
55 uint32_t len;
56 int32_t val;
57 uint32_t size;
58
59 port_ = port;
60 udpLog_ = rogue::Logging::create("udp.Server");
61
62 // Create a shared pointer to use as a lock for runThread()
63 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
64
65 // Create socket
66 if ((fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
67 throw(rogue::GeneralError::create("Server::Server", "Failed to create socket for port %" PRIu16, port_));
68
69 // Setup Remote Address
70 memset(&locAddr_, 0, sizeof(struct sockaddr_in));
71 locAddr_.sin_family = AF_INET;
72 locAddr_.sin_addr.s_addr = htonl(INADDR_ANY);
73 locAddr_.sin_port = htons(port_);
74
75 memset(&remAddr_, 0, sizeof(struct sockaddr_in));
76
77 if (bind(fd_, (struct sockaddr*)&locAddr_, sizeof(locAddr_)) < 0)
78 throw(rogue::GeneralError::create("Server::Server",
79 "Failed to bind to local port %" PRIu16 ". Another process may be using it",
80 port_));
81
82 // Kernel assigns port
83 if (port_ == 0) {
84 len = sizeof(locAddr_);
85 if (getsockname(fd_, (struct sockaddr*)&locAddr_, &len) < 0)
86 throw(rogue::GeneralError::create("Server::Server", "Failed to dynamically assign local port"));
87 port_ = ntohs(locAddr_.sin_port);
88 }
89
90 // Fixed size buffer pool
92 setPoolSize(10000); // Initial value, 10K frames
93
94 // Start rx thread
95 threadEn_ = true;
96 thread_ = new std::thread(&rpu::Server::runThread, this, std::weak_ptr<int>(scopePtr));
97
98 udpLog_->debug("UDP server ready. localPort=%" PRIu16 ", maxPayload=%" PRIu32, port_, maxPayload());
99
100 // Set a thread name
101#ifndef __MACH__
102 pthread_setname_np(thread_->native_handle(), "UdpServer");
103#endif
104}
105
107rpu::Server::~Server() {
108 this->stop();
109}
110
111void rpu::Server::stop() {
112 if (threadEn_) {
113 threadEn_ = false;
114 thread_->join();
115 udpLog_->debug("Stopping UDP server on local port %" PRIu16, port_);
116
117 ::close(fd_);
118 }
119}
120
122uint32_t rpu::Server::getPort() {
123 return (port_);
124}
125
127void rpu::Server::acceptFrame(ris::FramePtr frame) {
128 ris::Frame::BufferIterator it;
129 int32_t res;
130 fd_set fds;
131 struct timeval tout;
132 uint32_t x;
133 struct msghdr msg;
134 struct iovec msg_iov[1];
135
136 rogue::GilRelease noGil;
137 ris::FrameLockPtr frLock = frame->lock();
138 std::lock_guard<std::mutex> lock(udpMtx_);
139
140 // Drop errored frames
141 if (frame->getError()) {
142 udpLog_->warning("Dropping errored outbound frame on local port %" PRIu16 ", error=0x%" PRIx8,
143 port_,
144 frame->getError());
145 return;
146 }
147
148 // Setup message header
149 msg.msg_name = &remAddr_;
150 msg.msg_namelen = sizeof(struct sockaddr_in);
151 msg.msg_iov = msg_iov;
152 msg.msg_iovlen = 1;
153 msg.msg_control = NULL;
154 msg.msg_controllen = 0;
155 msg.msg_flags = 0;
156
157 // Go through each buffer in the frame
158 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
159 if ((*it)->getPayload() == 0) break;
160
161 // Setup IOVs
162 msg_iov[0].iov_base = (*it)->begin();
163 msg_iov[0].iov_len = (*it)->getPayload();
164
165 // Keep trying since select call can fire
166 // but write fails because we did not win the buffer lock
167 do {
168 // Setup fds for select call
169 FD_ZERO(&fds);
170 FD_SET(fd_, &fds);
171
172 // Setup select timeout
173 tout = timeout_;
174
175 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
176 udpLog_->critical("Server::acceptFrame: Timeout waiting for outbound transmit after %" PRIuLEAST32
177 ".%" PRIuLEAST32 " seconds! May be caused by outbound backpressure.",
178 timeout_.tv_sec,
179 timeout_.tv_usec);
180 res = 0;
181 } else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
182 udpLog_->warning("UDP write call failed on server port %" PRIu16 ": %s",
183 port_,
184 std::strerror(errno));
185 }
186 } while (res == 0); // Continue while write result was zero
187 }
188}
189
191void rpu::Server::runThread(std::weak_ptr<int> lockPtr) {
192 ris::BufferPtr buff;
193 ris::FramePtr frame;
194 fd_set fds;
195 int32_t res;
196 struct timeval tout;
197 struct sockaddr_in tmpAddr;
198 uint32_t tmpLen;
199 uint32_t avail;
200
201 // Wait until constructor completes
202 while (!lockPtr.expired()) continue;
203
204 udpLog_->logThreadId();
205
206 // Preallocate frame
207 frame = reqLocalFrame(maxPayload(), false);
208
209 while (threadEn_) {
210 // Attempt receive
211 buff = *(frame->beginBuffer());
212 avail = buff->getAvailable();
213 tmpLen = sizeof(struct sockaddr_in);
214 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, (struct sockaddr*)&tmpAddr, &tmpLen);
215
216 if (res > 0) {
217 // Message was too big
218 if (res > avail) {
219 udpLog_->warning("Receive data was too large on local port %" PRIu16 ". rx=%i, avail=%" PRIu32
220 ". Dropping.",
221 port_,
222 res,
223 avail);
224 } else {
225 buff->setPayload(res);
226 sendFrame(frame);
227 }
228
229 // Get new frame
230 frame = reqLocalFrame(maxPayload(), false);
231
232 // Lock before updating address
233 if (memcmp(&remAddr_, &tmpAddr, sizeof(remAddr_)) != 0) {
234 std::lock_guard<std::mutex> lock(udpMtx_);
235 char tmpIp[INET_ADDRSTRLEN];
236 if (inet_ntop(AF_INET, &(tmpAddr.sin_addr), tmpIp, sizeof(tmpIp)) != NULL) {
237 udpLog_->debug("UDP server peer updated on local port %" PRIu16 " to %s:%" PRIu16,
238 port_,
239 tmpIp,
240 ntohs(tmpAddr.sin_port));
241 } else {
242 udpLog_->debug("UDP server peer updated on local port %" PRIu16, port_);
243 }
244 remAddr_ = tmpAddr;
245 }
246 } else {
247 // Setup fds for select call
248 FD_ZERO(&fds);
249 FD_SET(fd_, &fds);
250
251 // Setup select timeout
252 tout.tv_sec = 0;
253 tout.tv_usec = 100;
254
255 // Select returns with available buffer
256 select(fd_ + 1, &fds, NULL, NULL, &tout);
257 }
258 }
259}
260
261void rpu::Server::setup_python() {
262#ifndef NO_PYTHON
263
264 bp::class_<rpu::Server, rpu::ServerPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
265 "Server",
266 bp::init<uint16_t, bool>())
267 .def("getPort", &rpu::Server::getPort);
268
269 bp::implicitly_convertible<rpu::ServerPtr, rpu::CorePtr>();
270 bp::implicitly_convertible<rpu::ServerPtr, ris::MasterPtr>();
271 bp::implicitly_convertible<rpu::ServerPtr, ris::SlavePtr>();
272#endif
273}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
void setFixedSize(uint32_t size)
Sets fixed-size mode.
Definition Pool.cpp:115
void setPoolSize(uint32_t size)
Sets buffer pool size.
Definition Pool.cpp:128
Shared UDP transport base for stream client/server endpoints.
Definition Core.h:61
std::shared_ptr< rogue::Logging > udpLog_
Definition Core.h:63
struct sockaddr_in remAddr_
Definition Core.h:72
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
Definition Core.cpp:45
std::thread * thread_
Definition Core.h:77
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::udp::Server > ServerPtr
Definition Server.h:130