Connecting Streams

A stream topology connects sources, processors, and sinks through stream master/slave links. In practical systems, a hardware ingress source (for example AxiStreamDma) is often shaped by stream modules and then routed to an output sink such as a file writer.

Each stream Master can connect to one or more Slave endpoints. The first attached slave becomes the primary slave, which matters because that slave usually services frame-allocation requests from the master. The primary slave is also the last endpoint to receive a transmitted frame, so a diagnostic tap or monitor branch should usually be attached after the main processing path, not before it.

Operator Syntax And Helper Functions

Rogue supports connection operators directly on stream objects:

  • >> for one-way source-to-destination links

  • << for one-way destination-from-source links

  • == for bi-directional links between dual-role endpoints

Python Usage

In Python, operator syntax is generally preferred because it is concise and reads like a pipeline.

src >> stage1 >> stage2 >> dst
dst << src
ep_a == ep_b

Equivalent helper functions are also available:

import pyrogue as pr

pr.streamConnect(src, dst)
pr.streamConnectBiDir(ep_a, ep_b)

C++ Usage

In C++, operator syntax works the same way, but chained expressions require pointer dereferencing and can become harder to read.

*(*(*src >> fifo) >> rate) >> writer->getChannel(0);
*epA == epB;

For readability, many users prefer helper macros from #include "rogue/Helpers.h":

rogueStreamConnect(src, fifo);
rogueStreamConnect(fifo, rate);
rogueStreamConnect(rate, writer->getChannel(0));
rogueStreamConnectBiDir(epA, epB);

Connection Semantics And Ordering

  • master >> slave and slave << master create one-way links.

  • endpointA == endpointB creates links both ways.

  • The first slave attached to a master is the primary slave.

  • reqFrame(size, zeroCopyEn) allocation requests go to the primary slave.

  • sendFrame(frame) delivers to secondary slaves first, then the primary.

That final ordering matters if the primary path consumes or empties a zero-copy frame.

Fan-Out Topology: Primary Processing + Diagnostic Debug

A common pattern is to keep the primary path unmodified while creating a secondary branch for reduced-rate diagnostics.

Python

import rogue.hardware.axi as rha
import rogue.interfaces.stream as ris
import rogue.utilities.fileio as ruf

# Open a DMA stream
dma = rha.AxiStreamDma('/dev/datadev_0', 1, True)

# Primary processing path: fifo + rate limit + file writer
fifo = ris.Fifo(200, 64, False)
rate = ris.RateDrop(True, 0.2)  # about 5 Hz
writer = ruf.StreamWriter()
writer.open('diagnostic.dat')

# Dump raw DMA traffic to terminal
debug = ris.Slave()
debug.setDebug(64, 'DMA Debug')

# Connect the streams
# Attach processing chain first (primary), then add debug branch
dma >> fifo >> rate >> writer.getChannel(1)
dma >> debug

C++

#include "rogue/Helpers.h"
#include "rogue/hardware/axi/AxiStreamDma.h"
#include "rogue/interfaces/stream/Fifo.h"
#include "rogue/interfaces/stream/RateDrop.h"
#include "rogue/utilities/fileio/StreamWriter.h"

namespace rha = rogue::hardware::axi;
namespace ris = rogue::interfaces::stream;
namespace ruf = rogue::utilities::fileio;

int main() {
   auto dma  = rha::AxiStreamDma::create("/dev/datadev_0", 1, true);
   auto fifo = ris::Fifo::create(200, 64, false);
   auto rate = ris::RateDrop::create(true, 0.2);
   auto debug   = ris::Slave::create();
   debug->setDebug(64, "DMA Debug");

   auto writer = ruf::StreamWriter::create();
   writer->open("diagnostic.dat");

   // Attach processing chain first (primary), then add debug branch
   rogueStreamConnect(dma, fifo);
   rogueStreamConnect(fifo, rate);
   rogueStreamConnect(rate, writer->getChannel(1));
   rogueStreamConnect(dma, debug);

   // Equivalent operator-style form:
   // *(*(*dma >> fifo) >> rate) >> writer->getChannel(1);
   // *dma >> debug;

   return 0;
}

What To Explore Next

API Reference