Advanced Stream Pattern Recipes

Recipe entry points for advanced stream implementations:

Recipe 1: Stabilize A Bursty Producer

Problem

Input bursts overwhelm downstream consumers and cause unstable latency.

Procedure

  1. Insert FIFO between producer and consumer.

  2. Add RateDrop if bounded loss is acceptable.

  3. Use debug stream instrumentation to verify queue and drop behavior.

Deep Dive

Recipe 2: Prototype Then Harden A Custom Stage

Problem

Need custom transformation logic without committing to C++ immediately.

Procedure

  1. Build topology and logic in Python bindings first.

  2. Validate behavior on representative payloads and rates.

  3. Migrate bottleneck stage to C++ while preserving external interfaces.

Deep Dive

Recipe 3: Decouple Receive Callback With A Worker Thread

Problem

Frame processing in _acceptFrame is too heavy and introduces backpressure.

Procedure

  1. Keep _acceptFrame minimal: copy frame data + metadata, then enqueue.

  2. Run a worker thread that dequeues and performs expensive processing.

  3. Bound queue depth and drop/flag when overloaded.

Python Pattern

import queue
import threading
import numpy as np
import rogue.interfaces.stream as ris

class ThreadedRx(ris.Slave):
    def __init__(self, depth: int = 1024):
        super().__init__()
        self._q = queue.Queue(maxsize=depth)
        self._run = True
        self._thr = threading.Thread(target=self._worker, daemon=True)
        self._thr.start()

    def _acceptFrame(self, frame):
        with frame.lock():
            pkt = (
                frame.getNumpy(),          # payload copy
                frame.getChannel(),
                frame.getError(),
                frame.getFlags(),
            )

        try:
            self._q.put_nowait(pkt)
        except queue.Full:
            # Optional: count drops / log / set alarm
            pass

    def _worker(self):
        while self._run:
            try:
                data, chan, err, flags = self._q.get(timeout=0.1)
            except queue.Empty:
                continue
            self.process(data, chan, err, flags)
            self._q.task_done()

    def process(self, data: np.ndarray, chan: int, err: int, flags: int):
        # Expensive decode/analysis here
        pass

    def _stop(self):
        self._run = False
        self._thr.join(timeout=1.0)

Deep Dive