rogue
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <arpa/inet.h>
23#include <unistd.h>
24
25#include <cerrno>
26#include <cstdlib>
27#include <cstring>
28#include <iostream>
29#include <memory>
30
31#include "rogue/GeneralError.h"
32#include "rogue/GilRelease.h"
33#include "rogue/Logging.h"
38
39namespace rpu = rogue::protocols::udp;
41
42#ifndef NO_PYTHON
43 #include <boost/python.hpp>
44namespace bp = boost::python;
45#endif
46
48rpu::ServerPtr rpu::Server::create(uint16_t port, bool jumbo) {
49 rpu::ServerPtr r = std::make_shared<rpu::Server>(port, jumbo);
50 return (r);
51}
52
54rpu::Server::Server(uint16_t port, bool jumbo) : rpu::Core(jumbo) {
55 uint32_t len;
56 uint32_t size;
57
58 port_ = port;
59 udpLog_ = rogue::Logging::create("udp.Server");
60
61 // Create a shared pointer to use as a lock for runThread()
62 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
63
64 // Create socket
65 if ((fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
66 throw(rogue::GeneralError::create("Server::Server", "Failed to create socket for port %" PRIu16, port_));
67
68 // Setup Remote Address
69 memset(&locAddr_, 0, sizeof(struct sockaddr_in));
70 locAddr_.sin_family = AF_INET;
71 locAddr_.sin_addr.s_addr = htonl(INADDR_ANY);
72 locAddr_.sin_port = htons(port_);
73
74 memset(&remAddr_, 0, sizeof(struct sockaddr_in));
75
76 // Intentionally no SO_REUSEADDR / SO_REUSEPORT here: the kernel default
77 // EADDRINUSE on duplicate bind is the contract we want, so two Servers
78 // on the same port can't silently coexist with non-deterministic packet
79 // routing. Drafted then reverted in a7d5bc533 — see PR #1193 "Reverted".
80 if (bind(fd_, (struct sockaddr*)&locAddr_, sizeof(locAddr_)) < 0) {
81 ::close(fd_);
82 fd_ = -1;
83 throw(rogue::GeneralError::create("Server::Server",
84 "Failed to bind to local port %" PRIu16 ". Another process may be using it",
85 port_));
86 }
87
88 // Kernel assigns port
89 if (port_ == 0) {
90 len = sizeof(locAddr_);
91 if (getsockname(fd_, (struct sockaddr*)&locAddr_, &len) < 0) {
92 ::close(fd_);
93 fd_ = -1;
94 throw(rogue::GeneralError::create("Server::Server", "Failed to dynamically assign local port"));
95 }
96 port_ = ntohs(locAddr_.sin_port);
97 }
98
99 // Fixed size buffer pool
101 setPoolSize(10000); // Initial value, 10K frames
102
103 udpLog_->debug("UDP server ready. localPort=%" PRIu16 ", maxPayload=%" PRIu32, port_, maxPayload());
104
105 threadEn_ = true;
106 try {
107 thread_ = new std::thread(&rpu::Server::runThread, this, std::weak_ptr<int>(scopePtr));
108 } catch (...) {
109 threadEn_ = false;
110 ::close(fd_);
111 fd_ = -1;
112 throw;
113 }
114
115 // Set a thread name
116#ifndef __MACH__
117 pthread_setname_np(thread_->native_handle(), "UdpServer");
118#endif
119}
120
122rpu::Server::~Server() {
123 this->stop();
124}
125
126void rpu::Server::stop() {
127 if (threadEn_) {
128 threadEn_ = false;
129 // close() before join() unblocks the worker's recvfrom(). Defer fd_ = -1
130 // until after join so a final FD_SET(fd_) does not see -1.
131 rogue::GilRelease noGil;
132 ::close(fd_);
133 thread_->join();
134 delete thread_;
135 thread_ = nullptr;
136 fd_ = -1;
137 udpLog_->debug("Stopping UDP server on local port %" PRIu16, port_);
138 }
139}
140
142uint32_t rpu::Server::getPort() {
143 return (port_);
144}
145
147void rpu::Server::acceptFrame(ris::FramePtr frame) {
148 ris::Frame::BufferIterator it;
149 int32_t res;
150 fd_set fds;
151 struct timeval tout;
152 uint32_t x;
153 struct msghdr msg;
154 struct iovec msg_iov[1];
155
156 rogue::GilRelease noGil;
157 ris::FrameLockPtr frLock = frame->lock();
158 std::lock_guard<std::mutex> lock(udpMtx_);
159
160 // Drop errored frames
161 if (frame->getError()) {
162 udpLog_->warning("Dropping errored outbound frame on local port %" PRIu16 ", error=0x%" PRIx8,
163 port_,
164 frame->getError());
165 return;
166 }
167
168 // Setup message header
169 msg.msg_name = &remAddr_;
170 msg.msg_namelen = sizeof(struct sockaddr_in);
171 msg.msg_iov = msg_iov;
172 msg.msg_iovlen = 1;
173 msg.msg_control = NULL;
174 msg.msg_controllen = 0;
175 msg.msg_flags = 0;
176
177 // Go through each buffer in the frame
178 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
179 if ((*it)->getPayload() == 0) break;
180
181 // Setup IOVs
182 msg_iov[0].iov_base = (*it)->begin();
183 msg_iov[0].iov_len = (*it)->getPayload();
184
185 // Keep trying since select call can fire
186 // but write fails because we did not win the buffer lock
187 do {
188 // Setup fds for select call
189 FD_ZERO(&fds);
190 FD_SET(fd_, &fds);
191
192 // Setup select timeout
193 tout = timeout_;
194
195 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
196 udpLog_->critical("Server::acceptFrame: Timeout waiting for outbound transmit after %" PRIuLEAST32
197 ".%" PRIuLEAST32 " seconds! May be caused by outbound backpressure.",
198 timeout_.tv_sec,
199 timeout_.tv_usec);
200 res = 0;
201 } else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
202 udpLog_->warning("UDP write call failed on server port %" PRIu16 ": %s",
203 port_,
204 std::strerror(errno));
205 }
206 } while (res == 0); // Continue while write result was zero
207 }
208}
209
211void rpu::Server::runThread(std::weak_ptr<int> lockPtr) {
212 ris::BufferPtr buff;
213 ris::FramePtr frame;
214 fd_set fds;
215 int32_t res;
216 struct timeval tout;
217 struct sockaddr_in tmpAddr;
218 uint32_t tmpLen;
219 uint32_t avail;
220
221 // Wait until constructor completes
222 while (!lockPtr.expired()) continue;
223
224 udpLog_->logThreadId();
225
226 // Preallocate frame
227 frame = reqLocalFrame(maxPayload(), false);
228
229 while (threadEn_) {
230 // Attempt receive
231 buff = *(frame->beginBuffer());
232 avail = buff->getAvailable();
233 tmpLen = sizeof(struct sockaddr_in);
234 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, (struct sockaddr*)&tmpAddr, &tmpLen);
235
236 if (res > 0) {
237 // Message was too big
238 if (res > avail) {
239 udpLog_->warning("Receive data was too large on local port %" PRIu16 ". rx=%i, avail=%" PRIu32
240 ". Dropping.",
241 port_,
242 res,
243 avail);
244 } else {
245 buff->setPayload(res);
246 sendFrame(frame);
247 }
248
249 // Get new frame
250 frame = reqLocalFrame(maxPayload(), false);
251
252 // Lock before updating address
253 if (memcmp(&remAddr_, &tmpAddr, sizeof(remAddr_)) != 0) {
254 std::lock_guard<std::mutex> lock(udpMtx_);
255 char tmpIp[INET_ADDRSTRLEN];
256 if (inet_ntop(AF_INET, &(tmpAddr.sin_addr), tmpIp, sizeof(tmpIp)) != NULL) {
257 udpLog_->debug("UDP server peer updated on local port %" PRIu16 " to %s:%" PRIu16,
258 port_,
259 tmpIp,
260 ntohs(tmpAddr.sin_port));
261 } else {
262 udpLog_->debug("UDP server peer updated on local port %" PRIu16, port_);
263 }
264 remAddr_ = tmpAddr;
265 }
266 } else {
267 // Setup fds for select call
268 FD_ZERO(&fds);
269 FD_SET(fd_, &fds);
270
271 // Setup select timeout
272 tout.tv_sec = 0;
273 tout.tv_usec = 100;
274
275 // Select returns with available buffer
276 select(fd_ + 1, &fds, NULL, NULL, &tout);
277 }
278 }
279}
280
281void rpu::Server::setup_python() {
282#ifndef NO_PYTHON
283
284 bp::class_<rpu::Server, rpu::ServerPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
285 "Server",
286 bp::init<uint16_t, bool>())
287 .def("getPort", &rpu::Server::getPort);
288
289 bp::implicitly_convertible<rpu::ServerPtr, rpu::CorePtr>();
290 bp::implicitly_convertible<rpu::ServerPtr, ris::MasterPtr>();
291 bp::implicitly_convertible<rpu::ServerPtr, ris::SlavePtr>();
292#endif
293}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
void setFixedSize(uint32_t size)
Sets fixed-size mode.
Definition Pool.cpp:116
void setPoolSize(uint32_t size)
Sets buffer pool size.
Definition Pool.cpp:129
Shared UDP transport base for stream client/server endpoints.
Definition Core.h:64
std::atomic< bool > threadEn_
Definition Core.h:81
std::shared_ptr< rogue::Logging > udpLog_
Definition Core.h:66
struct sockaddr_in remAddr_
Definition Core.h:75
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
Definition Core.cpp:45
std::thread * thread_
Definition Core.h:80
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::udp::Server > ServerPtr
Definition Server.h:130