Advanced Stream Pattern Recipes
Recipe entry points for advanced stream implementations:
Recipe 1: Stabilize A Bursty Producer
Problem
Input bursts overwhelm downstream consumers and cause unstable latency.
Procedure
Insert FIFO between producer and consumer.
Add RateDrop if bounded loss is acceptable.
Use debug stream instrumentation to verify queue and drop behavior.
Deep Dive
Recipe 2: Prototype Then Harden A Custom Stage
Problem
Need custom transformation logic without committing to C++ immediately.
Procedure
Build topology and logic in Python bindings first.
Validate behavior on representative payloads and rates.
Migrate bottleneck stage to C++ while preserving external interfaces.
Deep Dive
Recipe 3: Decouple Receive Callback With A Worker Thread
Problem
Frame processing in _acceptFrame is too heavy and introduces backpressure.
Procedure
Keep
_acceptFrameminimal: copy frame data + metadata, then enqueue.Run a worker thread that dequeues and performs expensive processing.
Bound queue depth and drop/flag when overloaded.
Python Pattern
import queue
import threading
import numpy as np
import rogue.interfaces.stream as ris
class ThreadedRx(ris.Slave):
def __init__(self, depth: int = 1024):
super().__init__()
self._q = queue.Queue(maxsize=depth)
self._run = True
self._thr = threading.Thread(target=self._worker, daemon=True)
self._thr.start()
def _acceptFrame(self, frame):
with frame.lock():
pkt = (
frame.getNumpy(), # payload copy
frame.getChannel(),
frame.getError(),
frame.getFlags(),
)
try:
self._q.put_nowait(pkt)
except queue.Full:
# Optional: count drops / log / set alarm
pass
def _worker(self):
while self._run:
try:
data, chan, err, flags = self._q.get(timeout=0.1)
except queue.Empty:
continue
self.process(data, chan, err, flags)
self._q.task_done()
def process(self, data: np.ndarray, chan: int, err: int, flags: int):
# Expensive decode/analysis here
pass
def _stop(self):
self._run = False
self._thr.join(timeout=1.0)