rogue
Loading...
Searching...
No Matches
Client.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <arpa/inet.h>
23#include <unistd.h>
24
25#include <cerrno>
26#include <cstdlib>
27#include <cstring>
28#include <memory>
29#include <string>
30
31#include "rogue/GeneralError.h"
32#include "rogue/GilRelease.h"
33#include "rogue/Logging.h"
38
39namespace rpu = rogue::protocols::udp;
41
42#ifndef NO_PYTHON
43 #include <boost/python.hpp>
44namespace bp = boost::python;
45#endif
46
48rpu::ClientPtr rpu::Client::create(std::string host, uint16_t port, bool jumbo) {
49 rpu::ClientPtr r = std::make_shared<rpu::Client>(host, port, jumbo);
50 return (r);
51}
52
54rpu::Client::Client(std::string host, uint16_t port, bool jumbo) : rpu::Core(jumbo) {
55 struct addrinfo aiHints;
56 struct addrinfo* aiList = 0;
57 const sockaddr_in* addr;
58 int32_t val;
59 int32_t ret;
60 uint32_t size;
61
62 address_ = host;
63 port_ = port;
64 udpLog_ = rogue::Logging::create("udp.Client");
65
66 // Create a shared pointer to use as a lock for runThread()
67 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
68
69 // Create socket
70 if ((fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
71 throw(rogue::GeneralError::create("Client::Client",
72 "Failed to create socket for port %" PRIu16 " at address %s",
73 port_,
74 address_.c_str()));
75
76 // Lookup host address
77 bzero(&aiHints, sizeof(aiHints));
78 aiHints.ai_flags = AI_CANONNAME;
79 aiHints.ai_family = AF_INET;
80 aiHints.ai_socktype = SOCK_DGRAM;
81 aiHints.ai_protocol = IPPROTO_UDP;
82
83 // POSIX leaves *aiList undefined on failure; do not freeaddrinfo() it.
84 if (::getaddrinfo(address_.c_str(), 0, &aiHints, &aiList) != 0 || aiList == nullptr) {
85 ::close(fd_);
86 fd_ = -1;
87 throw(rogue::GeneralError::create("Client::Client", "Failed to resolve address %s", address_.c_str()));
88 }
89
90 addr = (const sockaddr_in*)(aiList->ai_addr);
91
92 // Setup Remote Address
93 memset(&remAddr_, 0, sizeof(struct sockaddr_in));
94 ((struct sockaddr_in*)(&remAddr_))->sin_family = AF_INET;
95 ((struct sockaddr_in*)(&remAddr_))->sin_addr.s_addr = addr->sin_addr.s_addr;
96 ((struct sockaddr_in*)(&remAddr_))->sin_port = htons(port_);
97
98 ::freeaddrinfo(aiList);
99 aiList = nullptr;
100
101 // Fixed size buffer pool
103 setPoolSize(10000); // Initial value, 10K frames
104
105 udpLog_->debug("UDP client ready. remote=%s:%" PRIu16 ", maxPayload=%" PRIu32,
106 address_.c_str(),
107 port_,
108 maxPayload());
109
110 threadEn_ = true;
111 try {
112 thread_ = new std::thread(&rpu::Client::runThread, this, std::weak_ptr<int>(scopePtr));
113 } catch (...) {
114 threadEn_ = false;
115 ::close(fd_);
116 fd_ = -1;
117 throw;
118 }
119
120 // Set a thread name
121#ifndef __MACH__
122 pthread_setname_np(thread_->native_handle(), "UdpClient");
123#endif
124}
125
127rpu::Client::~Client() {
128 this->stop();
129}
130
131void rpu::Client::stop() {
132 if (threadEn_) {
133 threadEn_ = false;
134 // close() before join() unblocks the worker's recvfrom(). Defer fd_ = -1
135 // until after join so a final FD_SET(fd_) does not see -1.
136 rogue::GilRelease noGil;
137 ::close(fd_);
138 thread_->join();
139 delete thread_;
140 thread_ = nullptr;
141 fd_ = -1;
142 udpLog_->debug("Stopping UDP client for remote %s:%" PRIu16, address_.c_str(), port_);
143 }
144}
145
147void rpu::Client::acceptFrame(ris::FramePtr frame) {
148 ris::Frame::BufferIterator it;
149 int32_t res;
150 fd_set fds;
151 struct timeval tout;
152 uint32_t x;
153 struct msghdr msg;
154 struct iovec msg_iov[1];
155
156 // Setup message header
157 msg.msg_name = &remAddr_;
158 msg.msg_namelen = sizeof(struct sockaddr_in);
159 msg.msg_iov = msg_iov;
160 msg.msg_iovlen = 1;
161 msg.msg_control = NULL;
162 msg.msg_controllen = 0;
163 msg.msg_flags = 0;
164
165 rogue::GilRelease noGil;
166 ris::FrameLockPtr frLock = frame->lock();
167 std::lock_guard<std::mutex> lock(udpMtx_);
168
169 // Drop errored frames
170 if (frame->getError()) {
171 udpLog_->warning("Dropping errored outbound frame. remote=%s:%" PRIu16 ", error=0x%" PRIx8,
172 address_.c_str(),
173 port_,
174 frame->getError());
175 return;
176 }
177
178 // Go through each buffer in the frame
179 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
180 if ((*it)->getPayload() == 0) break;
181
182 // Setup IOVs
183 msg_iov[0].iov_base = (*it)->begin();
184 msg_iov[0].iov_len = (*it)->getPayload();
185
186 // Keep trying since select call can fire
187 // but write fails because we did not win the (*it)er lock
188 do {
189 // Setup fds for select call
190 FD_ZERO(&fds);
191 FD_SET(fd_, &fds);
192
193 // Setup select timeout
194 tout = timeout_;
195
196 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
197 udpLog_->critical("Client::acceptFrame: Timeout waiting for outbound transmit after %" PRIu32
198 ".%" PRIu32 " seconds! May be caused by outbound backpressure.",
199 timeout_.tv_sec,
200 timeout_.tv_usec);
201 res = 0;
202 } else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
203 udpLog_->warning("UDP write call failed for %s: %s", address_.c_str(), std::strerror(errno));
204 }
205 } while (res == 0); // Continue while write result was zero
206 }
207}
208
210void rpu::Client::runThread(std::weak_ptr<int> lockPtr) {
211 ris::BufferPtr buff;
212 ris::FramePtr frame;
213 fd_set fds;
214 int32_t res;
215 struct timeval tout;
216 uint32_t avail;
217
218 // Wait until constructor completes
219 while (!lockPtr.expired()) continue;
220
221 udpLog_->logThreadId();
222
223 // Preallocate frame
224 frame = reqLocalFrame(maxPayload(), false);
225
226 while (threadEn_) {
227 // Attempt receive
228 buff = *(frame->beginBuffer());
229 avail = buff->getAvailable();
230 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, NULL, 0);
231
232 if (res > 0) {
233 // Message was too big
234 if (res > avail) {
235 udpLog_->warning("Receive data was too large. remote=%s:%" PRIu16 ", rx=%i, avail=%" PRIu32
236 ". Dropping.",
237 address_.c_str(),
238 port_,
239 res,
240 avail);
241 } else {
242 buff->setPayload(res);
243 sendFrame(frame);
244 }
245
246 // Get new frame
247 frame = reqLocalFrame(maxPayload(), false);
248 } else {
249 // Setup fds for select call
250 FD_ZERO(&fds);
251 FD_SET(fd_, &fds);
252
253 // Setup select timeout
254 tout.tv_sec = 0;
255 tout.tv_usec = 100;
256
257 // Select returns with available buffer
258 select(fd_ + 1, &fds, NULL, NULL, &tout);
259 }
260 }
261}
262
263void rpu::Client::setup_python() {
264#ifndef NO_PYTHON
265
266 bp::class_<rpu::Client, rpu::ClientPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
267 "Client",
268 bp::init<std::string, uint16_t, bool>());
269
270 bp::implicitly_convertible<rpu::ClientPtr, rpu::CorePtr>();
271 bp::implicitly_convertible<rpu::ClientPtr, ris::MasterPtr>();
272 bp::implicitly_convertible<rpu::ClientPtr, ris::SlavePtr>();
273#endif
274}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
void setFixedSize(uint32_t size)
Sets fixed-size mode.
Definition Pool.cpp:116
void setPoolSize(uint32_t size)
Sets buffer pool size.
Definition Pool.cpp:129
Shared UDP transport base for stream client/server endpoints.
Definition Core.h:64
std::atomic< bool > threadEn_
Definition Core.h:81
std::shared_ptr< rogue::Logging > udpLog_
Definition Core.h:66
struct sockaddr_in remAddr_
Definition Core.h:75
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
Definition Core.cpp:45
std::thread * thread_
Definition Core.h:80
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::udp::Client > ClientPtr
Definition Client.h:124