rogue
Loading...
Searching...
No Matches
XvcConnection.cpp
Go to the documentation of this file.
1
19
20#include "rogue/GilRelease.h"
21
22#include <netinet/tcp.h>
23#include <sys/select.h>
24#include <sys/socket.h>
25#include <unistd.h>
26
27#include <cerrno>
28#include <cinttypes>
29#include <cstdio>
30#include <cstring>
31
33
34rpx::XvcConnection::XvcConnection(int sd, int wakeFd, JtagDriver* drv, uint64_t maxVecLen)
35 : sd_(-1),
36 wakeFd_(wakeFd),
37 drv_(drv),
38 maxVecLen_(maxVecLen),
39 supVecLen_(0),
40 lastErrno_(0) {
41 socklen_t sz = sizeof(peer_);
42
43 // RAII for the sd_
44 if ((sd_ = ::accept(sd, reinterpret_cast<struct sockaddr*>(&peer_), &sz)) < 0)
45 throw(rogue::GeneralError::create("XvcConnection::XvcConnection()", "Unable to accept connection"));
46
47 // XVC protocol is synchronous / not pipelined :-(
48 // use TCP_NODELAY to make sure our messages (many of which
49 // are small) are sent ASAP
50 int yes = 1;
51 if (setsockopt(sd_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes))) {
52 ::close(sd_);
53 sd_ = -1;
54 throw(rogue::GeneralError::create("XvcConnection::XvcConnection()", "Unable to set TCP_NODELAY"));
55 }
56
57#if defined(__APPLE__) && defined(SO_NOSIGPIPE)
58 // macOS fallback: no MSG_NOSIGNAL on BSD send(); use per-socket
59 // SO_NOSIGPIPE instead so writes to a closed peer surface EPIPE rather
60 // than SIGPIPE-ing the process.
61 int nosigpipe = 1;
62 if (setsockopt(sd_, SOL_SOCKET, SO_NOSIGPIPE, &nosigpipe, sizeof(nosigpipe))) {
63 ::close(sd_);
64 sd_ = -1;
65 throw(rogue::GeneralError::create("XvcConnection::XvcConnection()",
66 "setsockopt(SO_NOSIGPIPE) failed"));
67 }
68#endif
69}
70
71rpx::XvcConnection::~XvcConnection() {
72 if (sd_ >= 0) ::close(sd_);
73}
74
75ssize_t rpx::XvcConnection::readTo(void* buf, size_t count) {
76 if (sd_ < 0 || sd_ >= FD_SETSIZE || (wakeFd_ >= 0 && wakeFd_ >= FD_SETSIZE)) {
77 lastErrno_ = EBADF;
78 return -2;
79 }
80 int maxFd = sd_ + 1;
81 if (wakeFd_ >= 0 && wakeFd_ >= sd_) maxFd = wakeFd_ + 1;
82
83 // NO timeout — block until data, EOF, or shutdown signal.
84 // Release the GIL across the blocking select() so that the outer
85 // rogue::ScopedGil held by Xvc::runThread does not starve Python worker
86 // threads while this connection is parked waiting for client bytes.
87 // Retry on EINTR (matches XvcServer::run): a benign signal must not tear
88 // down the client connection.
89 int nready;
90 fd_set rset;
91 while (true) {
92 FD_ZERO(&rset);
93 FD_SET(sd_, &rset);
94 if (wakeFd_ >= 0) FD_SET(wakeFd_, &rset);
95 {
97 nready = ::select(maxFd, &rset, nullptr, nullptr, nullptr);
98 }
99 if (nready >= 0) break;
100 if (errno == EINTR) continue;
101 lastErrno_ = errno;
102 return -2; // unrecoverable select error
103 }
104 if (wakeFd_ >= 0 && FD_ISSET(wakeFd_, &rset)) {
105 return -1; // shutdown requested via self-pipe
106 }
107 if (FD_ISSET(sd_, &rset)) {
108 ssize_t r;
109 do {
110 r = ::read(sd_, buf, count);
111 } while (r < 0 && errno == EINTR);
112 // >0 = bytes; 0 = peer EOF; <0 = socket error (mapped to -2 so
113 // callers don't confuse it with the -1 shutdown sentinel).
114 if (r < 0) {
115 lastErrno_ = errno;
116 return -2;
117 }
118 return r;
119 }
120 lastErrno_ = ENODATA;
121 return -2; // unexpected: neither wake-fd nor data-fd ready
122}
123
124// fill rx buffer to 'n' octets
125void rpx::XvcConnection::fill(uint64_t n) {
126 uint8_t* p = rp_ + static_cast<ptrdiff_t>(rl_);
127 uint64_t k = n;
128
129 if (n <= rl_) return;
130
131 k -= rl_;
132 while (k > 0) {
133 ssize_t got = readTo(p, static_cast<size_t>(k));
134
135 if (got > 0) {
136 k -= static_cast<uint64_t>(got);
137 p += got;
138 continue;
139 }
140 if (got == 0) {
141 throw(rogue::GeneralError::create("XvcConnection::fill()", "peer closed connection"));
142 }
143 // got == -1: shutdown requested via wakeFd.
144 if (got == -1)
145 throw(rogue::GeneralError::create("XvcConnection::fill()", "shutdown requested"));
146 // got <= -2: select() or unexpected error.
147 throw(rogue::GeneralError::create("XvcConnection::fill()",
148 "socket/select error: %s (errno %d)", strerror(lastErrno_), lastErrno_));
149 }
150 rl_ = n;
151}
152
153// mark 'n' octets as 'consumed'
154void rpx::XvcConnection::bump(uint64_t n) {
155 if (n > rl_)
156 throw(rogue::GeneralError::create("XvcConnection::bump()", "consuming %" PRIu64 " bytes but only %" PRIu64 " available", n, rl_));
157 rp_ += static_cast<ptrdiff_t>(n);
158 rl_ -= n;
159 if (rl_ == 0) {
160 rp_ = &rxb_[0];
161 }
162}
163
164void rpx::XvcConnection::allocBufs() {
165 uint64_t overhead = 128; // headers and such;
166
167 // Determine the vector size supported by the target
168 uint64_t tgtVecLen = drv_->query();
169
170 if (0 == tgtVecLen) {
171 // target can stream
172 tgtVecLen = maxVecLen_;
173 }
174
175 // What can the driver support?
176 supVecLen_ = drv_->getMaxVectorSize();
177
178 if (supVecLen_ == 0) {
179 // supports any size
180 supVecLen_ = tgtVecLen;
181 } else if (tgtVecLen < supVecLen_) {
182 supVecLen_ = tgtVecLen;
183 }
184
185 chunk_ = (2 * maxVecLen_ + overhead);
186
187 rxb_.resize(static_cast<size_t>(2 * chunk_));
188 txb_.resize(static_cast<size_t>(maxVecLen_ + overhead));
189
190 rp_ = &rxb_[0];
191 rl_ = 0;
192 tl_ = 0;
193}
194
195void rpx::XvcConnection::flush() {
196 uint8_t* p = &txb_[0];
197
198 while (tl_ > 0) {
199 size_t toSend = static_cast<size_t>(tl_);
200 ssize_t put;
201 do {
202#if defined(__APPLE__)
203 put = ::send(sd_, p, toSend, 0);
204#else
205 put = ::send(sd_, p, toSend, MSG_NOSIGNAL);
206#endif
207 } while (put < 0 && errno == EINTR);
208 if (put <= 0) {
209 int e = errno;
210 throw(rogue::GeneralError::create("XvcConnection::flush()",
211 "send() failed: %s (errno %d)", strerror(e), e));
212 }
213
214 p += put;
215 tl_ -= static_cast<uint64_t>(put);
216 }
217}
218
219void rpx::XvcConnection::run() {
220 uint32_t bits = 0;
221 uint64_t bitsLeft = 0, bitsSent = 0;
222 uint64_t bytes = 0;
223 uint64_t vecLen = 0;
224 uint64_t off = 0;
225
226 allocBufs();
227
228 while (!drv_->isDone()) {
229 // read stuff;
230 ssize_t got = readTo(rp_, static_cast<size_t>(chunk_));
231
232 if (got == 0) throw(rogue::GeneralError::create("XvcConnection::run()", "peer closed connection"));
233 if (got == -1) throw(rogue::GeneralError::create("XvcConnection::run()", "shutdown requested"));
234 if (got < 0) throw(rogue::GeneralError::create("XvcConnection::run()",
235 "socket/select error: %s (errno %d)", strerror(lastErrno_), lastErrno_));
236
237 rl_ = static_cast<uint64_t>(got);
238
239 do {
240 fill(2);
241
242 if (0 == ::memcmp(rp_, "ge", 2)) {
243 fill(8);
244
245 drv_->query(); // informs the driver that there is a new connection
246
247 int slen = snprintf(
248 reinterpret_cast<char*>(&txb_[0]), txb_.size(), "xvcServer_v1.0:%" PRIu64 "\n", maxVecLen_);
249 if (slen < 0)
250 throw(rogue::GeneralError::create("XvcConnection::run()", "snprintf failed for getinfo reply"));
251 tl_ = (static_cast<uint64_t>(slen) < txb_.size())
252 ? static_cast<uint64_t>(slen)
253 : txb_.size() - 1;
254
255 bump(8);
256 } else if (0 == ::memcmp(rp_, "se", 2)) {
257 fill(11);
258
259 uint32_t requestedPeriod = (static_cast<uint32_t>(rp_[10]) << 24) |
260 (static_cast<uint32_t>(rp_[9]) << 16) |
261 (static_cast<uint32_t>(rp_[8]) << 8) |
262 static_cast<uint32_t>(rp_[7]);
263
264 uint32_t newPeriod = drv_->setPeriodNs(requestedPeriod);
265
266 for (size_t u = 0; u < sizeof(newPeriod); u++) {
267 txb_[u] = static_cast<uint8_t>(newPeriod);
268 newPeriod = newPeriod >> 8;
269 }
270
271 tl_ = 4;
272
273 bump(11);
274 } else if (0 == ::memcmp(rp_, "sh", 2)) {
275 fill(10);
276
277 bits = (static_cast<uint32_t>(rp_[9]) << 24) |
278 (static_cast<uint32_t>(rp_[8]) << 16) |
279 (static_cast<uint32_t>(rp_[7]) << 8) |
280 static_cast<uint32_t>(rp_[6]);
281 bytes = (static_cast<uint64_t>(bits) + 7) / 8;
282
283 if (bytes > maxVecLen_)
284 throw(rogue::GeneralError::create("XvcConnection::run()", "Requested bit vector length too big"));
285
286 bump(10);
287 fill(2 * bytes);
288
289 vecLen = bytes > supVecLen_ ? supVecLen_ : bytes;
290
291 if (vecLen == 0)
292 throw(rogue::GeneralError::create("XvcConnection::run()",
293 "supported vector length is zero — cannot chunk shift"));
294
295 // break into chunks the driver can handle; due to the xvc layout we can't efficiently
296 // start working on a chunk while still waiting for more data to come in (well - we could
297 // but had to have the full TDI vector plus a chunk of the TMS vector in. Thus, we don't
298 // bother...).
299 for (off = 0, bitsLeft = bits; bitsLeft > 0; bitsLeft -= bitsSent, off += vecLen) {
300 bitsSent = 8 * vecLen;
301 if (bitsLeft < bitsSent) {
302 bitsSent = bitsLeft;
303 }
304
305 drv_->sendVectors(bitsSent,
306 rp_ + static_cast<ptrdiff_t>(off),
307 rp_ + static_cast<ptrdiff_t>(bytes + off),
308 &txb_[0] + static_cast<ptrdiff_t>(off));
309 }
310 tl_ = bytes;
311
312 bump(2 * bytes);
313 } else {
314 throw(rogue::GeneralError::create("XvcConnection::run()", "Unsupported message received"));
315 }
316 flush();
317
318 /* Repeat until all bytes from the current chunk are exhausted.
319 * Most chunks contain a single vector shift message and are
320 * consumed in the first iteration. If not, the spill-over area
321 * provides room for a second iteration which terminates the loop.
322 */
323 } while (rl_ > 0);
324 }
325}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
Base transport driver for the AxisToJtag firmware protocol.
Definition JtagDriver.h:60