rogue
Loading...
Searching...
No Matches
Xvc.cpp
Go to the documentation of this file.
1
17#include "rogue/Directives.h"
18
20
21#include <fcntl.h>
22#include <pthread.h>
23#include <unistd.h>
24
25#include <algorithm>
26#include <cerrno>
27#include <cinttypes>
28#include <climits>
29#include <cstddef>
30#include <cstring>
31#include <memory>
32#include <thread>
33
34#include "rogue/GeneralError.h"
35#include "rogue/GilRelease.h"
36#include "rogue/Logging.h"
37#include "rogue/ScopedGil.h"
42
45
46#ifndef NO_PYTHON
47 #include <boost/python.hpp>
48namespace bp = boost::python;
49#endif
50
52rpx::XvcPtr rpx::Xvc::create(uint16_t port) {
53 rpx::XvcPtr r = std::make_shared<rpx::Xvc>(port);
54 return (r);
55}
56
58rpx::Xvc::Xvc(uint16_t port) : JtagDriver(port), mtu_(1450), threadEn_{false}, started_{false} {
59 // set queue capacity
60 queue_.setThold(100);
61
62 // create logger
63 xvcLog_ = rogue::Logging::create("xilinx.xvc");
64
65 // Self-pipe for cross-thread shutdown wake. One byte is written on the
66 // write end in stop(); the read end is consumed by XvcServer::run and
67 // XvcConnection::readTo to break select() promptly.
68#if defined(__APPLE__)
69 if (::pipe(wakeFd_) < 0)
70 throw(rogue::GeneralError::create("Xvc::Xvc()", "pipe() failed: %s (errno %d)",
71 strerror(errno), errno));
72 if (::fcntl(wakeFd_[0], F_SETFL, O_NONBLOCK) < 0 ||
73 ::fcntl(wakeFd_[1], F_SETFL, O_NONBLOCK) < 0 ||
74 ::fcntl(wakeFd_[0], F_SETFD, FD_CLOEXEC) < 0 ||
75 ::fcntl(wakeFd_[1], F_SETFD, FD_CLOEXEC) < 0) {
76 int e = errno;
77 ::close(wakeFd_[0]);
78 ::close(wakeFd_[1]);
79 wakeFd_[0] = wakeFd_[1] = -1;
80 throw(rogue::GeneralError::create("Xvc::Xvc()", "fcntl() on wake pipe failed: %s (errno %d)",
81 strerror(e), e));
82 }
83#else
84 if (::pipe2(wakeFd_, O_NONBLOCK | O_CLOEXEC) < 0)
85 throw(rogue::GeneralError::create("Xvc::Xvc()", "pipe2() failed: %s (errno %d)",
86 strerror(errno), errno));
87#endif
88}
89
91rpx::Xvc::~Xvc() {
93 // Ensure stop() was called; if not, best-effort cleanup.
94 // Wrapped in try/catch because destructors are noexcept by default and
95 // an escaping exception would std::terminate() the process.
96 if (threadEn_.load(std::memory_order_acquire)) {
97 try {
98 stop();
99 } catch (...) {}
100 }
101 if (wakeFd_[0] >= 0) {
102 ::close(wakeFd_[0]);
103 wakeFd_[0] = -1;
104 }
105 if (wakeFd_[1] >= 0) {
106 ::close(wakeFd_[1]);
107 wakeFd_[1] = -1;
108 }
109}
110
112void rpx::Xvc::start() {
113 // Release the GIL while spawning runThread so that the thread's
114 // rogue::ScopedGil (used to attach a Python tstate for Python ris.Slave
115 // callbacks — see runThread()) can actually acquire it. Without this, a
116 // Python caller invoking xvc._start() would hold the GIL, runThread's
117 // PyGILState_Ensure() would block forever, and the server never starts.
118 rogue::GilRelease noGil;
119
120 // Log starting the xvc server thread
121 xvcLog_->debug("Starting the XVC server thread");
122
123 // One-shot guard. rogue::Queue::stop() (called from Xvc::stop) is
124 // irreversible, so restart on the same instance would silently break
125 // (queue_.pop() returns nullptr immediately, done_ stays true). Fail
126 // loudly instead.
127 if (started_.load(std::memory_order_acquire))
128 throw(rogue::GeneralError::create("Xvc::start()", "Xvc instance already started; restart is unsupported"));
129
130 // Construct the server synchronously — this binds the TCP socket so that
131 // (a) any bind errors surface immediately, (b) getPort() is usable as
132 // soon as start() returns (kernel-assigned when port_ == 0).
133 // JtagDriver::init() remains in runThread() so any of its side effects
134 // still happen in the server thread context.
135 unsigned maxMsg = 32768;
136 server_ = std::make_unique<rpx::XvcServer>(port_, wakeFd_[0], this, maxMsg);
137 boundPort_.store(server_->getPort(), std::memory_order_release);
138
139 // Start the thread — only flip flags after all throwable ops succeed so
140 // a failed start() leaves the instance in a retryable state.
141 threadEn_.store(true, std::memory_order_release);
142 try {
143 thread_ = std::make_unique<std::thread>(&rpx::Xvc::runThread, this);
144 } catch (...) {
145 threadEn_.store(false, std::memory_order_release);
146 boundPort_.store(0, std::memory_order_release);
147 server_.reset();
148 throw;
149 }
150
151 // Mark as started only after everything succeeded.
152 started_.store(true, std::memory_order_release);
153
154 // Tag the native thread for OS-level diagnostics (htop, perf top,
155 // gdb `info threads`). Note: this does NOT make the thread appear with
156 // this name in Python's threading.enumerate() — Python only tracks its
157 // own threads (and "Dummy-N" wrappers for PyGILState_Ensure'd natives),
158 // which is why the Python E2E teardown test asserts on socket
159 // unreachability instead of thread-name leaks.
160#if defined(__linux__)
161 pthread_setname_np(thread_->native_handle(), "XvcServer");
162#elif defined(__APPLE__)
163 // macOS pthread_setname_np takes only one arg and renames the calling
164 // thread, so naming happens at the top of runThread() instead.
165#endif
166}
167
169void rpx::Xvc::stop() {
170 // Release the GIL across stop() so that runThread's rogue::ScopedGil
171 // destructor (PyGILState_Release → may reacquire briefly for auto-tstate
172 // teardown) can complete while this caller is parked in thread_->join().
173 // Without this, a Python caller invoking xvc._stop() would hold the GIL
174 // and deadlock against runThread's shutdown. Matches the existing
175 // GilRelease pattern in ~Xvc().
176 rogue::GilRelease noGil;
177
178 // No-op when the instance was never started (or already stopped).
179 // Queue::stop() is irreversible, so running it before start() would
180 // poison the queue and make a subsequent start() silently broken.
181 if (!started_.load(std::memory_order_acquire)) return;
182
183 // Log stopping the xvc server thread
184 xvcLog_->debug("Stopping the XVC server thread");
185
186 // Shutdown ordering:
187 // 1. done_ = true → JtagDriver::xferRel retry loop exits.
188 // 2. queue_.stop() → any xfer() blocked in pop() wakes immediately (returns nullptr).
189 // 3. wakeFd_ write → unblocks select() in XvcServer::run / XvcConnection::readTo.
190 // 4. threadEn_.exchange(false) → server loop sees flag and exits on next iteration.
191 done_.store(true, std::memory_order_release);
192
193 // Stop the queue — any xfer() blocked in pop() wakes with nullptr.
194 queue_.stop();
195
196 // Clear any stale frames remaining after stop().
197 queue_.reset();
198
199 // Wake any select() blocked in XvcServer::run / XvcConnection::readTo.
200 // One byte is sufficient to make the wake fd readable and break select().
201 // EAGAIN is fine — the wake fd may already be pending readable.
202 if (wakeFd_[1] >= 0) {
203 ssize_t w;
204 do {
205 w = ::write(wakeFd_[1], "x", 1);
206 } while (w < 0 && errno == EINTR);
207 (void)w;
208 }
209
210 // Stop the XVC server thread
211 if (threadEn_.exchange(false, std::memory_order_acq_rel)) {
212 thread_->join();
213 thread_.reset();
214 }
215
216 // Server destructs here — closes listening socket.
217 server_.reset();
218
219 // Clear cached port so getPort() reflects the stopped state.
220 boundPort_.store(0, std::memory_order_release);
221}
222
224void rpx::Xvc::runThread() {
225#if defined(__APPLE__)
226 // macOS pthread_setname_np renames the *calling* thread; do it here
227 // (Linux uses the native_handle() form in start()).
228 pthread_setname_np("XvcServer");
229#endif
230
231 // Attach a Python thread state to this native std::thread so that Python
232 // ris.Slave subclasses (invoked via Master::sendFrame → SlaveWrap::acceptFrame)
233 // can safely PyGILState_Ensure / PyEval_SaveThread without corrupting
234 // tstate ownership. Without this, a nested GilRelease inside Master::sendFrame
235 // (triggered when a Python _acceptFrame calls self._sendFrame(...))
236 // PyEval_SaveThread's the tstate that the outer PyGILState_Ensure was
237 // managing → fatal "PyThreadState_Get: the function must be called with
238 // the GIL held" crash in any other Python thread.
239 //
240 // Blocking waits inside xfer() (queue_.pop) are already wrapped in
241 // rogue::GilRelease at the xfer() top; and select() inside XvcServer::run /
242 // XvcConnection::readTo is wrapped in rogue::GilRelease at those sites — so
243 // this outer ScopedGil does not starve other Python threads. It only
244 // establishes tstate identity for this thread.
245 //
246 // Guarded by Py_IsInitialized(): the C++ doctest smoke binary does NOT
247 // initialize Python, so PyGILState_Ensure() would block forever on an
248 // uninitialized interpreter. Py_IsInitialized() lets the same binary skip
249 // the attach in that path — no Python callbacks can fire without the
250 // interpreter, so tstate identity is moot.
251 //
252 // Note: Xvc::start() releases the GIL before spawning this thread (see
253 // start()), which is what lets PyGILState_Ensure() actually acquire it
254 // when Python IS running and the caller was holding the GIL across
255 // xvc._start().
256#ifndef NO_PYTHON
257 std::unique_ptr<rogue::ScopedGil> threadGil;
258 if (Py_IsInitialized()) threadGil = std::make_unique<rogue::ScopedGil>();
259#endif
260
261 // Guard the whole thread body so any exception out of init() / server_->run()
262 // (e.g. JtagDriver::init() throwing when no backend is connected, as in
263 // the smoke tests) cannot std::terminate() the process. On failure, mark
264 // the instance done and close the listener so xfer() callers unblock and
265 // new TCP clients are refused; threadEn_ is left for stop() to flip + join.
266 try {
267 // Driver initialization (kept here so init() side effects remain in the
268 // server-thread context — start() only constructs the XvcServer).
269 this->init();
270
271 // Pass the atomic run-control flag directly; XvcServer::run wakes via the
272 // self-pipe wakeFd (Xvc::stop writes one byte) in addition to observing
273 // threadEn_ going false.
274 if (server_) server_->run(threadEn_, xvcLog_);
275 } catch (const std::exception& e) {
276 if (xvcLog_) xvcLog_->warning("XVC server thread exiting on exception: %s", e.what());
277 done_.store(true, std::memory_order_release);
278 queue_.stop();
279 server_.reset();
280 } catch (...) {
281 if (xvcLog_) xvcLog_->warning("XVC server thread exiting on unknown exception");
282 done_.store(true, std::memory_order_release);
283 queue_.stop();
284 server_.reset();
285 }
286}
287
289void rpx::Xvc::acceptFrame(ris::FramePtr frame) {
290 // Save off frame
291 if (!queue_.busy()) queue_.push(frame);
292
293 // The XvcConnection class manages the TCP connection to Vivado.
294 // After a Vivado request is issued and forwarded to the FPGA, we wait for the response.
295 // XvcConnection will call the xfer() below to do the transfer and checks for a response.
296 // All we need to do is ensure that as soon as the new frame comes in, it's stored in the queue.
297}
298
299uint64_t rpx::Xvc::getMaxVectorSize() {
300 // MTU lim; 2*vector size + header must fit!
301 unsigned ws = getWordSize();
302 if (ws >= mtu_) return 0;
303 uint64_t mtuLim = (mtu_ - ws) / 2;
304
305 return mtuLim;
306}
307
308uint32_t rpx::Xvc::getPort() const {
309 uint32_t p = boundPort_.load(std::memory_order_acquire);
310 return p ? p : static_cast<uint32_t>(port_);
311}
312
313int rpx::Xvc::xfer(uint8_t* txBuffer,
314 unsigned txBytes,
315 uint8_t* hdBuffer,
316 unsigned hdBytes,
317 uint8_t* rxBuffer,
318 unsigned rxBytes) {
319 // Early-exit when the server is not running (atomic load).
320 if (!threadEn_.load(std::memory_order_acquire)) return 0;
321
322 // Destructor-order requirement: declare both frame shared_ptrs (txFrame /
323 // rxFrame) in the OUTER scope, BEFORE the rogue::GilRelease inner scope.
324 // Local destructors run in reverse construction order, so with this layout
325 // the sequence at function exit is:
326 // 1. inner-scope end → ~GilRelease() reacquires the GIL
327 // 2. outer-scope end → rxFrame.~shared_ptr() then txFrame.~shared_ptr()
328 // both run WITH THE GIL HELD
329 // This matters because when a frame was handed to a Python ris.Slave
330 // subclass (via Master::sendFrame → SlaveWrap::acceptFrame → _acceptFrame),
331 // Boost.Python attaches a `shared_ptr_deleter` that holds a PyObject
332 // reference. That deleter's `operator()` performs `Py_DECREF` without
333 // itself acquiring the GIL — so if a shared_ptr is released while the
334 // thread has no GIL, PyThreadState_Get fires a fatal error.
335 //
336 // Use SEPARATE variables for TX and RX frames so the TX shared_ptr is
337 // never overwritten (and therefore never decremented) inside the
338 // GilRelease scope. An overwriting `frame = queue_.pop()` would release
339 // the prior TX shared_ptr while the GIL is dropped — re-introducing the
340 // exact crash this destructor-order dance exists to prevent.
341 ris::FramePtr txFrame;
342 ris::FramePtr rxFrame;
343
344 {
345 // Release the GIL across the entire blocking section.
346 // MUST be the first statement of this inner scope — before any blocking
347 // call (queue_.pop()) and any mutex acquisition. Do NOT nest inside any
348 // std::unique_lock / std::lock_guard — that inverts acquire order on
349 // scope exit and can deadlock with Python callbacks.
350 rogue::GilRelease noGil;
351
352 // --- send request frame ---
353 xvcLog_->debug("Tx buffer has %" PRIu32 " bytes to send", static_cast<uint32_t>(txBytes));
354
355 txFrame = reqFrame(static_cast<uint32_t>(txBytes), true);
356 txFrame->setPayload(static_cast<uint32_t>(txBytes));
357 ris::FrameIterator iter = txFrame->begin();
358 ris::toFrame(iter, static_cast<uint32_t>(txBytes), txBuffer);
359
360 xvcLog_->debug("Sending new frame of size %" PRIu32, txFrame->getSize());
361 if (txBytes) sendFrame(txFrame);
362
363 // --- Block on condvar-backed queue ---
364 // queue_.pop() waits on popCond_ under its own mutex; wakes on:
365 // (a) push() from acceptFrame() — normal reply path,
366 // (b) queue_.stop() from Xvc::stop() — shutdown path; returns nullptr.
367 rxFrame = queue_.pop();
368 if (!rxFrame) return 0; // shutdown path — ~GilRelease then frame ~dtors (txFrame released with GIL held)
369
370 xvcLog_->debug("Receiving new frame of size %" PRIu32, rxFrame->getSize());
371
372 // Read received data into hdbuf/rxb. Frame lock + Xvc mtx_ preserve
373 // existing serialization of reply parsing (unchanged behavior).
374 ris::FrameLockPtr frLock = rxFrame->lock();
375 std::lock_guard<std::mutex> lock(mtx_);
376
377 const uint32_t payload = rxFrame->getPayload();
378 if (payload < static_cast<uint32_t>(hdBytes)) {
379 if (hdBuffer && hdBytes) std::memset(hdBuffer, 0, hdBytes);
380 xvcLog_->error("Rx frame payload %" PRIu32 " smaller than header size %u", payload, hdBytes);
381 throw rogue::GeneralError::create("Xvc::xfer",
382 "Rx frame payload smaller than header size");
383 }
384 const uint32_t rxPayload = payload - static_cast<uint32_t>(hdBytes);
385 const uint32_t rxCopy = (rxPayload > static_cast<uint32_t>(rxBytes))
386 ? static_cast<uint32_t>(rxBytes)
387 : rxPayload;
388
389 iter = rxFrame->begin();
390 if (hdBuffer && hdBytes) std::copy(iter, iter + static_cast<ptrdiff_t>(hdBytes), hdBuffer);
391 iter += static_cast<ptrdiff_t>(hdBytes);
392 if (rxBuffer && rxCopy) ris::fromFrame(iter, rxCopy, rxBuffer);
393
394 if (rxCopy > static_cast<uint32_t>(INT_MAX))
395 throw rogue::GeneralError::create("Xvc::xfer",
396 "Rx payload size exceeds int return range");
397 return static_cast<int>(rxCopy);
398 } // inner scope ends: ~lock_guard → ~frLock → ~GilRelease (reacquires GIL)
399 // rxFrame, then txFrame (and any Boost.Python shared_ptr_deleter they
400 // carry) destruct here with the GIL held.
401}
402
403void rpx::Xvc::setup_python() {
404#ifndef NO_PYTHON
405
406 bp::class_<rpx::Xvc, rpx::XvcPtr, bp::bases<ris::Master, ris::Slave, rpx::JtagDriver>, boost::noncopyable>(
407 "Xvc",
408 bp::init<uint16_t>())
409 .def("_start", &rpx::Xvc::start)
410 .def("_stop", &rpx::Xvc::stop)
411 .def("getPort", &rpx::Xvc::getPort);
412 bp::implicitly_convertible<rpx::XvcPtr, ris::MasterPtr>();
413 bp::implicitly_convertible<rpx::XvcPtr, ris::SlavePtr>();
414 bp::implicitly_convertible<rpx::XvcPtr, rpx::JtagDriverPtr>();
415#endif
416}
static GeneralError create(std::string src, const char *fmt,...)
Creates a formatted error instance.
RAII helper that releases the Python GIL for a scope.
Definition GilRelease.h:36
static std::shared_ptr< rogue::Logging > create(const std::string &name, bool quiet=false)
Creates a logger instance.
Definition Logging.cpp:95
void setThold(uint32_t thold)
Sets busy-threshold depth used by busy().
Definition Queue.h:80
Random-access byte iterator across a Frame payload.
Base transport driver for the AxisToJtag firmware protocol.
Definition JtagDriver.h:60
rogue::Queue< std::shared_ptr< rogue::interfaces::stream::Frame > > queue_
Definition Xvc.h:58
std::shared_ptr< rogue::Logging > xvcLog_
Definition Xvc.h:61
static void fromFrame(rogue::interfaces::stream::FrameIterator &iter, uint32_t size, void *dst)
Copies bytes from a frame iterator to a destination pointer.
std::shared_ptr< rogue::interfaces::stream::Frame > FramePtr
Shared pointer alias for Frame.
Definition Frame.h:549
static void toFrame(rogue::interfaces::stream::FrameIterator &iter, uint32_t size, void *src)
Copies bytes from a source pointer into a frame iterator.
std::shared_ptr< rogue::interfaces::stream::FrameLock > FrameLockPtr
Shared pointer alias for FrameLock.
Definition FrameLock.h:110
std::shared_ptr< rogue::protocols::xilinx::Xvc > XvcPtr
Definition Xvc.h:191