rogue
Loading...
Searching...
No Matches
Client.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <inttypes.h>
22#include <unistd.h>
23
24#include <cstdlib>
25#include <cstring>
26#include <memory>
27#include <string>
28
29#include "rogue/GeneralError.h"
30#include "rogue/GilRelease.h"
31#include "rogue/Logging.h"
36
37namespace rpu = rogue::protocols::udp;
39
40#ifndef NO_PYTHON
41 #include <boost/python.hpp>
42namespace bp = boost::python;
43#endif
44
46rpu::ClientPtr rpu::Client::create(std::string host, uint16_t port, bool jumbo) {
47 rpu::ClientPtr r = std::make_shared<rpu::Client>(host, port, jumbo);
48 return (r);
49}
50
52rpu::Client::Client(std::string host, uint16_t port, bool jumbo) : rpu::Core(jumbo) {
53 struct addrinfo aiHints;
54 struct addrinfo* aiList = 0;
55 const sockaddr_in* addr;
56 int32_t val;
57 int32_t ret;
58 uint32_t size;
59
60 address_ = host;
61 port_ = port;
62 udpLog_ = rogue::Logging::create("udp.Client");
63
64 // Create a shared pointer to use as a lock for runThread()
65 std::shared_ptr<int> scopePtr = std::make_shared<int>(0);
66
67 // Create socket
68 if ((fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
69 throw(rogue::GeneralError::create("Client::Client",
70 "Failed to create socket for port %" PRIu16 " at address %s",
71 port_,
72 address_.c_str()));
73
74 // Lookup host address
75 bzero(&aiHints, sizeof(aiHints));
76 aiHints.ai_flags = AI_CANONNAME;
77 aiHints.ai_family = AF_INET;
78 aiHints.ai_socktype = SOCK_DGRAM;
79 aiHints.ai_protocol = IPPROTO_UDP;
80
81 if (::getaddrinfo(address_.c_str(), 0, &aiHints, &aiList) || !aiList)
82 throw(rogue::GeneralError::create("Client::Client", "Failed to resolve address %s", address_.c_str()));
83
84 addr = (const sockaddr_in*)(aiList->ai_addr);
85
86 // Setup Remote Address
87 memset(&remAddr_, 0, sizeof(struct sockaddr_in));
88 ((struct sockaddr_in*)(&remAddr_))->sin_family = AF_INET;
89 ((struct sockaddr_in*)(&remAddr_))->sin_addr.s_addr = addr->sin_addr.s_addr;
90 ((struct sockaddr_in*)(&remAddr_))->sin_port = htons(port_);
91
92 // Fixed size buffer pool
94 setPoolSize(10000); // Initial value, 10K frames
95
96 // Start rx thread
97 threadEn_ = true;
98 thread_ = new std::thread(&rpu::Client::runThread, this, std::weak_ptr<int>(scopePtr));
99
100 // Set a thread name
101#ifndef __MACH__
102 pthread_setname_np(thread_->native_handle(), "UdpClient");
103#endif
104}
105
107rpu::Client::~Client() {
108 this->stop();
109}
110
111void rpu::Client::stop() {
112 if (threadEn_) {
113 threadEn_ = false;
114 thread_->join();
115
116 ::close(fd_);
117 }
118}
119
121void rpu::Client::acceptFrame(ris::FramePtr frame) {
122 ris::Frame::BufferIterator it;
123 int32_t res;
124 fd_set fds;
125 struct timeval tout;
126 uint32_t x;
127 struct msghdr msg;
128 struct iovec msg_iov[1];
129
130 // Setup message header
131 msg.msg_name = &remAddr_;
132 msg.msg_namelen = sizeof(struct sockaddr_in);
133 msg.msg_iov = msg_iov;
134 msg.msg_iovlen = 1;
135 msg.msg_control = NULL;
136 msg.msg_controllen = 0;
137 msg.msg_flags = 0;
138
139 rogue::GilRelease noGil;
140 ris::FrameLockPtr frLock = frame->lock();
141 std::lock_guard<std::mutex> lock(udpMtx_);
142
143 // Drop errored frames
144 if (frame->getError()) {
145 udpLog_->warning("Client::acceptFrame: Dumping errored frame");
146 return;
147 }
148
149 // Go through each buffer in the frame
150 for (it = frame->beginBuffer(); it != frame->endBuffer(); ++it) {
151 if ((*it)->getPayload() == 0) break;
152
153 // Setup IOVs
154 msg_iov[0].iov_base = (*it)->begin();
155 msg_iov[0].iov_len = (*it)->getPayload();
156
157 // Keep trying since select call can fire
158 // but write fails because we did not win the (*it)er lock
159 do {
160 // Setup fds for select call
161 FD_ZERO(&fds);
162 FD_SET(fd_, &fds);
163
164 // Setup select timeout
165 tout = timeout_;
166
167 if (select(fd_ + 1, NULL, &fds, NULL, &tout) <= 0) {
168 udpLog_->critical("Client::acceptFrame: Timeout waiting for outbound transmit after %" PRIu32
169 ".%" PRIu32 " seconds! May be caused by outbound backpressure.",
170 timeout_.tv_sec,
171 timeout_.tv_usec);
172 res = 0;
173 } else if ((res = sendmsg(fd_, &msg, 0)) < 0) {
174 udpLog_->warning("UDP Write Call Failed");
175 }
176 } while (res == 0); // Continue while write result was zero
177 }
178}
179
181void rpu::Client::runThread(std::weak_ptr<int> lockPtr) {
182 ris::BufferPtr buff;
183 ris::FramePtr frame;
184 fd_set fds;
185 int32_t res;
186 struct timeval tout;
187 uint32_t avail;
188
189 // Wait until constructor completes
190 while (!lockPtr.expired()) continue;
191
192 udpLog_->logThreadId();
193
194 // Preallocate frame
195 frame = reqLocalFrame(maxPayload(), false);
196
197 while (threadEn_) {
198 // Attempt receive
199 buff = *(frame->beginBuffer());
200 avail = buff->getAvailable();
201 res = recvfrom(fd_, buff->begin(), avail, MSG_TRUNC, NULL, 0);
202
203 if (res > 0) {
204 // Message was too big
205 if (res > avail) {
206 udpLog_->warning("Receive data was too large. Rx=%i, avail=%i Dropping.", res, avail);
207 } else {
208 buff->setPayload(res);
209 sendFrame(frame);
210 }
211
212 // Get new frame
213 frame = reqLocalFrame(maxPayload(), false);
214 } else {
215 // Setup fds for select call
216 FD_ZERO(&fds);
217 FD_SET(fd_, &fds);
218
219 // Setup select timeout
220 tout.tv_sec = 0;
221 tout.tv_usec = 100;
222
223 // Select returns with available buffer
224 select(fd_ + 1, &fds, NULL, NULL, &tout);
225 }
226 }
227}
228
229void rpu::Client::setup_python() {
230#ifndef NO_PYTHON
231
232 bp::class_<rpu::Client, rpu::ClientPtr, bp::bases<rpu::Core, ris::Master, ris::Slave>, boost::noncopyable>(
233 "Client",
234 bp::init<std::string, uint16_t, bool>());
235
236 bp::implicitly_convertible<rpu::ClientPtr, rpu::CorePtr>();
237 bp::implicitly_convertible<rpu::ClientPtr, ris::MasterPtr>();
238 bp::implicitly_convertible<rpu::ClientPtr, ris::SlavePtr>();
239#endif
240}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:60
void setFixedSize(uint32_t size)
Sets fixed-size mode.
Definition Pool.cpp:115
void setPoolSize(uint32_t size)
Sets buffer pool size.
Definition Pool.cpp:128
Shared UDP transport base for stream client/server endpoints.
Definition Core.h:61
std::shared_ptr< rogue::Logging > udpLog_
Definition Core.h:63
struct sockaddr_in remAddr_
Definition Core.h:72
uint32_t maxPayload()
Returns maximum UDP payload size in bytes.
Definition Core.cpp:45
std::thread * thread_
Definition Core.h:77
std::shared_ptr< rogue::interfaces::stream::Buffer > BufferPtr
Shared pointer alias for Buffer.
Definition Buffer.h:270
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::udp::Client > ClientPtr
Definition Client.h:124