rogue
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <unistd.h>
23
24#include <cstdlib>
25#include <cstring>
26#include <iostream>
27#include <memory>
28
29#include "rogue/GeneralError.h"
30#include "rogue/GilRelease.h"
31#include "rogue/Logging.h"
36
37namespace rpu = rogue::protocols::udp;
39
40#ifndef NO_PYTHON
41 #include <boost/python.hpp>
42namespace bp = boost::python;
43#endif
44
46rpu::ServerPtr rpu::Server::create(uint16_t port, bool jumbo) {
47 rpu::ServerPtr r = std::make_shared<rpu::Server>(port, jumbo);
48 return (r);
49}
50
52rpu::Server::Server(uint16_t port, bool jumbo) : rpu::Core(jumbo) {
53 uint32_t len;
54 int32_t val;
55 uint32_t size;
56
57 port_ = port;
58 udpLog_ = rogue::Logging::create("udp.Server");
59
60 // Create a shared pointer to use as a lock for runThread()
61 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
62
63 // Create socket
64 if ((fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
65 throw(rogue::GeneralError::create("Server::Server", "Failed to create socket for port %" PRIu16, port_));
66
67 // Setup Remote Address
68 memset(&locAddr_, 0, sizeof(struct sockaddr_in));
69 locAddr_.sin_family = AF_INET;
70 locAddr_.sin_addr.s_addr = htonl(INADDR_ANY);
71 locAddr_.sin_port = htons(port_);
72
73 memset(&remAddr_, 0, sizeof(struct sockaddr_in));
74
75 if (bind(fd_, (struct sockaddr*)&locAddr_, sizeof(locAddr_)) < 0)
76 throw(rogue::GeneralError::create("Server::Server",
77 "Failed to bind to local port %" PRIu16 ". Another process may be using it",
78 port_));
79
80 // Kernel assigns port
81 if (port_ == 0) {
82 len = sizeof(locAddr_);
83 if (getsockname(fd_, (struct sockaddr*)&locAddr_, &len) < 0)
84 throw(rogue::GeneralError::create("Server::Server", "Failed to dynamically assign local port"));
85 port_ = ntohs(locAddr_.sin_port);
86 }
87
88 // Fixed size buffer pool
90 setPoolSize(10000); // Initial value, 10K frames
91
92 // Start rx thread
93 threadEn_ = true;
94 thread_ = new std::thread(&rpu::Server::runThread, this, std::weak_ptr<int>(scopePtr));
95
96 // Set a thread name
97#ifndef __MACH__
98 pthread_setname_np(thread_->native_handle(), "UdpServer");
99#endif
100}
101
103rpu::Server::~Server() {
104 this->stop();
105}
106
107void rpu::Server::stop() {
108 if (threadEn_) {
109 threadEn_ = false;
110 thread_->join();
111
112 ::close(fd_);
113 }
114}
115
117uint32_t rpu::Server::getPort() {
118 return (port_);
119}
120
122void rpu::Server::acceptFrame(ris::FramePtr frame) {
123 ris::Frame::BufferIterator it;
124 int32_t res;
125 fd_set fds;
126 struct timeval tout;
127 uint32_t x;
128 struct msghdr msg;
129 struct iovec msg_iov[1];
130
131 rogue::GilRelease noGil;
132 ris::FrameLockPtr frLock = frame->lock();
133 std::lock_guard<std::mutex> lock(udpMtx_);
134
135 // Drop errored frames
136 if (frame->getError()) {
137 udpLog_->warning("Server::acceptFrame: Dumping errored frame");
138 return;
139 }
140
141 // Setup message header
142 msg.msg_name = &remAddr_;
143 msg.msg_namelen = sizeof(struct sockaddr_in);
144 msg.msg_iov = msg_iov;
145 msg.msg_iovlen = 1;
146 msg.msg_control = NULL;
147 msg.msg_controllen = 0;
148 msg.msg_flags = 0;
149
150 // Go through each buffer in the frame
151 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
152 if ((*it)->getPayload() == 0) break;
153
154 // Setup IOVs
155 msg_iov[0].iov_base = (*it)->begin();
156 msg_iov[0].iov_len = (*it)->getPayload();
157
158 // Keep trying since select call can fire
159 // but write fails because we did not win the buffer lock
160 do {
161 // Setup fds for select call
162 FD_ZERO(&fds);
163 FD_SET(fd_, &fds);
164
165 // Setup select timeout
166 tout = timeout_;
167
168 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
169 udpLog_->critical("Server::acceptFrame: Timeout waiting for outbound transmit after %" PRIuLEAST32
170 ".%" PRIuLEAST32 " seconds! May be caused by outbound backpressure.",
171 timeout_.tv_sec,
172 timeout_.tv_usec);
173 res = 0;
174 } else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
175 udpLog_->warning("UDP Write Call Failed");
176 }
177 } while (res == 0); // Continue while write result was zero
178 }
179}
180
182void rpu::Server::runThread(std::weak_ptr<int> lockPtr) {
183 ris::BufferPtr buff;
184 ris::FramePtr frame;
185 fd_set fds;
186 int32_t res;
187 struct timeval tout;
188 struct sockaddr_in tmpAddr;
189 uint32_t tmpLen;
190 uint32_t avail;
191
192 // Wait until constructor completes
193 while (!lockPtr.expired()) continue;
194
195 udpLog_->logThreadId();
196
197 // Preallocate frame
198 frame = reqLocalFrame(maxPayload(), false);
199
200 while (threadEn_) {
201 // Attempt receive
202 buff = *(frame->beginBuffer());
203 avail = buff->getAvailable();
204 tmpLen = sizeof(struct sockaddr_in);
205 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, (struct sockaddr*)&tmpAddr, &tmpLen);
206
207 if (res > 0) {
208 // Message was too big
209 if (res > avail) {
210 udpLog_->warning("Receive data was too large. Dropping.");
211 } else {
212 buff->setPayload(res);
213 sendFrame(frame);
214 }
215
216 // Get new frame
217 frame = reqLocalFrame(maxPayload(), false);
218
219 // Lock before updating address
220 if (memcmp(&remAddr_, &tmpAddr, sizeof(remAddr_)) != 0) {
221 std::lock_guard<std::mutex> lock(udpMtx_);
222 remAddr_ = tmpAddr;
223 }
224 } else {
225 // Setup fds for select call
226 FD_ZERO(&fds);
227 FD_SET(fd_, &fds);
228
229 // Setup select timeout
230 tout.tv_sec = 0;
231 tout.tv_usec = 100;
232
233 // Select returns with available buffer
234 select(fd_ + 1, &fds, NULL, NULL, &tout);
235 }
236 }
237}
238
239void rpu::Server::setup_python() {
240#ifndef NO_PYTHON
241
242 bp::class_<rpu::Server, rpu::ServerPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
243 "Server",
244 bp::init<uint16_t, bool>())
245 .def("getPort", &rpu::Server::getPort);
246
247 bp::implicitly_convertible<rpu::ServerPtr, rpu::CorePtr>();
248 bp::implicitly_convertible<rpu::ServerPtr, ris::MasterPtr>();
249 bp::implicitly_convertible<rpu::ServerPtr, ris::SlavePtr>();
250#endif
251}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:60
void setFixedSize(uint32_t size)
Sets fixed-size mode.
Definition Pool.cpp:115
void setPoolSize(uint32_t size)
Sets buffer pool size.
Definition Pool.cpp:128
Shared UDP transport base for stream client/server endpoints.
Definition Core.h:61
std::shared_ptr< rogue::Logging > udpLog_
Definition Core.h:63
struct sockaddr_in remAddr_
Definition Core.h:72
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
Definition Core.cpp:45
std::thread * thread_
Definition Core.h:77
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::udp::Server > ServerPtr
Definition Server.h:130