rogue
Loading...
Searching...
No Matches
Client.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <arpa/inet.h>
23#include <unistd.h>
24
25#include <cerrno>
26#include <cstdlib>
27#include <cstring>
28#include <memory>
29#include <string>
30
31#include "rogue/GeneralError.h"
32#include "rogue/GilRelease.h"
33#include "rogue/Logging.h"
38
39namespace rpu = rogue::protocols::udp;
41
42#ifndef NO_PYTHON
43 #include <boost/python.hpp>
44namespace bp = boost::python;
45#endif
46
48rpu::ClientPtr rpu::Client::create(std::string host, uint16_t port, bool jumbo) {
49 rpu::ClientPtr r = std::make_shared<rpu::Client>(host, port, jumbo);
50 return (r);
51}
52
54rpu::Client::Client(std::string host, uint16_t port, bool jumbo) : rpu::Core(jumbo) {
55 struct addrinfo aiHints;
56 struct addrinfo* aiList = 0;
57 const sockaddr_in* addr;
58 int32_t val;
59 int32_t ret;
60 uint32_t size;
61
62 address_ = host;
63 port_ = port;
64 udpLog_ = rogue::Logging::create("udp.Client");
65
66 // Create a shared pointer to use as a lock for runThread()
67 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
68
69 // Create socket
70 if ((fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
71 throw(rogue::GeneralError::create("Client::Client",
72 "Failed to create socket for port %" PRIu16 " at address %s",
73 port_,
74 address_.c_str()));
75
76 // Lookup host address
77 bzero(&aiHints, sizeof(aiHints));
78 aiHints.ai_flags = AI_CANONNAME;
79 aiHints.ai_family = AF_INET;
80 aiHints.ai_socktype = SOCK_DGRAM;
81 aiHints.ai_protocol = IPPROTO_UDP;
82
83 if (::getaddrinfo(address_.c_str(), 0, &aiHints, &aiList) || !aiList)
84 throw(rogue::GeneralError::create("Client::Client", "Failed to resolve address %s", address_.c_str()));
85
86 addr = (const sockaddr_in*)(aiList->ai_addr);
87
88 // Setup Remote Address
89 memset(&remAddr_, 0, sizeof(struct sockaddr_in));
90 ((struct sockaddr_in*)(&remAddr_))->sin_family = AF_INET;
91 ((struct sockaddr_in*)(&remAddr_))->sin_addr.s_addr = addr->sin_addr.s_addr;
92 ((struct sockaddr_in*)(&remAddr_))->sin_port = htons(port_);
93
94 // Fixed size buffer pool
96 setPoolSize(10000); // Initial value, 10K frames
97
98 // Start rx thread
99 threadEn_ = true;
100 thread_ = new std::thread(&rpu::Client::runThread, this, std::weak_ptr<int>(scopePtr));
101
102 udpLog_->debug("UDP client ready. remote=%s:%" PRIu16 ", maxPayload=%" PRIu32,
103 address_.c_str(),
104 port_,
105 maxPayload());
106
107 // Set a thread name
108#ifndef __MACH__
109 pthread_setname_np(thread_->native_handle(), "UdpClient");
110#endif
111}
112
114rpu::Client::~Client() {
115 this->stop();
116}
117
118void rpu::Client::stop() {
119 if (threadEn_) {
120 threadEn_ = false;
121 thread_->join();
122 udpLog_->debug("Stopping UDP client for remote %s:%" PRIu16, address_.c_str(), port_);
123
124 ::close(fd_);
125 }
126}
127
129void rpu::Client::acceptFrame(ris::FramePtr frame) {
130 ris::Frame::BufferIterator it;
131 int32_t res;
132 fd_set fds;
133 struct timeval tout;
134 uint32_t x;
135 struct msghdr msg;
136 struct iovec msg_iov[1];
137
138 // Setup message header
139 msg.msg_name = &remAddr_;
140 msg.msg_namelen = sizeof(struct sockaddr_in);
141 msg.msg_iov = msg_iov;
142 msg.msg_iovlen = 1;
143 msg.msg_control = NULL;
144 msg.msg_controllen = 0;
145 msg.msg_flags = 0;
146
147 rogue::GilRelease noGil;
148 ris::FrameLockPtr frLock = frame->lock();
149 std::lock_guard<std::mutex> lock(udpMtx_);
150
151 // Drop errored frames
152 if (frame->getError()) {
153 udpLog_->warning("Dropping errored outbound frame. remote=%s:%" PRIu16 ", error=0x%" PRIx8,
154 address_.c_str(),
155 port_,
156 frame->getError());
157 return;
158 }
159
160 // Go through each buffer in the frame
161 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
162 if ((*it)->getPayload() == 0) break;
163
164 // Setup IOVs
165 msg_iov[0].iov_base = (*it)->begin();
166 msg_iov[0].iov_len = (*it)->getPayload();
167
168 // Keep trying since select call can fire
169 // but write fails because we did not win the (*it)er lock
170 do {
171 // Setup fds for select call
172 FD_ZERO(&fds);
173 FD_SET(fd_, &fds);
174
175 // Setup select timeout
176 tout = timeout_;
177
178 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
179 udpLog_->critical("Client::acceptFrame: Timeout waiting for outbound transmit after %" PRIu32
180 ".%" PRIu32 " seconds! May be caused by outbound backpressure.",
181 timeout_.tv_sec,
182 timeout_.tv_usec);
183 res = 0;
184 } else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
185 udpLog_->warning("UDP write call failed for %s: %s", address_.c_str(), std::strerror(errno));
186 }
187 } while (res == 0); // Continue while write result was zero
188 }
189}
190
192void rpu::Client::runThread(std::weak_ptr<int> lockPtr) {
193 ris::BufferPtr buff;
194 ris::FramePtr frame;
195 fd_set fds;
196 int32_t res;
197 struct timeval tout;
198 uint32_t avail;
199
200 // Wait until constructor completes
201 while (!lockPtr.expired()) continue;
202
203 udpLog_->logThreadId();
204
205 // Preallocate frame
206 frame = reqLocalFrame(maxPayload(), false);
207
208 while (threadEn_) {
209 // Attempt receive
210 buff = *(frame->beginBuffer());
211 avail = buff->getAvailable();
212 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, NULL, 0);
213
214 if (res > 0) {
215 // Message was too big
216 if (res > avail) {
217 udpLog_->warning("Receive data was too large. remote=%s:%" PRIu16 ", rx=%i, avail=%" PRIu32
218 ". Dropping.",
219 address_.c_str(),
220 port_,
221 res,
222 avail);
223 } else {
224 buff->setPayload(res);
225 sendFrame(frame);
226 }
227
228 // Get new frame
229 frame = reqLocalFrame(maxPayload(), false);
230 } else {
231 // Setup fds for select call
232 FD_ZERO(&fds);
233 FD_SET(fd_, &fds);
234
235 // Setup select timeout
236 tout.tv_sec = 0;
237 tout.tv_usec = 100;
238
239 // Select returns with available buffer
240 select(fd_ + 1, &fds, NULL, NULL, &tout);
241 }
242 }
243}
244
245void rpu::Client::setup_python() {
246#ifndef NO_PYTHON
247
248 bp::class_<rpu::Client, rpu::ClientPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
249 "Client",
250 bp::init<std::string, uint16_t, bool>());
251
252 bp::implicitly_convertible<rpu::ClientPtr, rpu::CorePtr>();
253 bp::implicitly_convertible<rpu::ClientPtr, ris::MasterPtr>();
254 bp::implicitly_convertible<rpu::ClientPtr, ris::SlavePtr>();
255#endif
256}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
void setFixedSize(uint32_t size)
Sets fixed-size mode.
Definition Pool.cpp:115
void setPoolSize(uint32_t size)
Sets buffer pool size.
Definition Pool.cpp:128
Shared UDP transport base for stream client/server endpoints.
Definition Core.h:61
std::shared_ptr< rogue::Logging > udpLog_
Definition Core.h:63
struct sockaddr_in remAddr_
Definition Core.h:72
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
Definition Core.cpp:45
std::thread * thread_
Definition Core.h:77
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::udp::Client > ClientPtr
Definition Client.h:124