Source code for pyrogue.utilities.prbs

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# Module containing the utilities module class and methods
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations

from typing import Any, TYPE_CHECKING

import pyrogue
import rogue.utilities

if TYPE_CHECKING:
    import rogue.interfaces.stream as ris


[docs] class PrbsRx(pyrogue.Device): """PRBS RX wrapper. Parameters ---------- width : int, optional PRBS word width in bits. The underlying Rogue PRBS engine requires a multiple of 32 bits. If ``None``, the C++ default is 32 bits. checkPayload : bool, optional (default = True) Enable payload checking when ``True``. taps : bytes-like, optional LFSR feedback tap bit positions used by the PRBS generator/checker. Provide a bytes-like sequence (for example ``bytes([1, 2, 6, 31])``) where each entry is a tapped bit index in the PRBS state. If ``None``, Rogue uses the C++ default taps ``[1, 2, 6, 31]``. For a 32-bit state in Rogue's left-shift Fibonacci form, this means ``new_bit0 = bit1 ^ bit2 ^ bit6 ^ bit31``, which corresponds to the polynomial ``x^32 + x^7 + x^3 + x^2 + 1`` (equivalent forms may use reciprocal indexing conventions). stream : rogue.interfaces.stream.Master, optional Optional stream endpoint to connect as the RX source upon initialization. **kwargs : Any Additional arguments forwarded to ``pyrogue.Device``. """ def __init__( self, *, width: int | None = None, checkPayload: bool = True, taps: bytes | bytearray | memoryview | None = None, stream: ris.Master | None = None, **kwargs: Any, ) -> None: """Initialize a software PRBS receiver.""" pyrogue.Device.__init__(self, description='PRBS Software Receiver', **kwargs) self._prbs = rogue.utilities.Prbs() if width is not None: self._prbs.setWidth(width) if taps is not None: self._prbs.setTaps(taps) if stream is not None: pyrogue.streamConnect(stream, self) self.add(pyrogue.LocalVariable(name='rxEnable', description='RX Enable', mode='RW', value=True, localGet=lambda : self._prbs.getRxEnable(), localSet=lambda value: self._prbs.setRxEnable(value))) self.add(pyrogue.LocalVariable(name='rxErrors', description='RX Error Count', mode='RO', pollInterval=1, value=0, typeStr='UInt32', localGet=self._prbs.getRxErrors)) self.add(pyrogue.LocalVariable(name='rxCount', description='RX Count', mode='RO', pollInterval=1, value=0, typeStr='UInt32', localGet=self._prbs.getRxCount)) self.add(pyrogue.LocalVariable(name='rxBytes', description='RX Bytes', mode='RO', pollInterval=1, value=0, typeStr='UInt32', localGet=self._prbs.getRxBytes)) self.add(pyrogue.LocalVariable(name='rxRate', description='RX Rate', disp="{:.3e}", mode='RO', pollInterval=1, value=0.0, units='Frames/s', localGet=self._prbs.getRxRate)) self.add(pyrogue.LocalVariable(name='rxBw', description='RX BW', disp="{:.3e}", mode='RO', pollInterval=1, value=0.0, units='Bytes/s', localGet=self._prbs.getRxBw)) self.add(pyrogue.LocalVariable(name='checkPayload', description='Payload Check Enable', mode='RW', value=checkPayload, localSet=self._plEnable)) def _plEnable(self, value: bool, changed: bool) -> None: """Enable or disable payload checking.""" self._prbs.checkPayload(value)
[docs] def countReset(self) -> None: """Reset PRBS counters and delegate to parent reset.""" self._prbs.resetCount() super().countReset()
def _getStreamSlave(self) -> rogue.utilities.Prbs: """Return the Rogue PRBS receiver stream endpoint.""" return self._prbs def __lshift__(self, other: Any) -> Any: """Connect ``other`` as a stream source.""" pyrogue.streamConnect(other, self) return other
[docs] def setWidth(self, width: int) -> None: """Set PRBS word width in bits (must be a multiple of 32).""" self._prbs.setWidth(width)
[docs] def setTaps(self, taps: bytes | bytearray | memoryview) -> None: """Set LFSR feedback tap bit positions.""" self._prbs.setTaps(taps)
[docs] class PrbsTx(pyrogue.Device): """PRBS TX wrapper. Parameters ---------- sendCount : bool, optional (default = False) Enable count transmission when ``True``. width : int, optional PRBS word width in bits. The underlying Rogue PRBS engine requires a multiple of 32 bits. If ``None``, the C++ default is 32 bits. taps : bytes-like, optional LFSR feedback tap bit positions used by the PRBS generator/checker. Provide a bytes-like sequence (for example ``bytes([1, 2, 6, 31])``) where each entry is a tapped bit index in the PRBS state. If ``None``, Rogue uses the C++ default taps ``[1, 2, 6, 31]``. For a 32-bit state in Rogue's left-shift Fibonacci form, this means ``new_bit0 = bit1 ^ bit2 ^ bit6 ^ bit31``, which corresponds to the polynomial ``x^32 + x^7 + x^3 + x^2 + 1`` (equivalent forms may use reciprocal indexing conventions). stream : rogue.interfaces.stream.Slave, optional Optional stream endpoint to connect as the TX destination. **kwargs : Any Additional arguments forwarded to ``pyrogue.Device``. """ def __init__( self, *, sendCount: bool = False, width: int | None = None, taps: bytes | bytearray | memoryview | None = None, stream: ris.Slave | None = None, **kwargs: Any, ) -> None: """Initialize a software PRBS transmitter.""" pyrogue.Device.__init__(self, description='PRBS Software Transmitter', **kwargs) self._prbs = rogue.utilities.Prbs() if width is not None: self._prbs.setWidth(width) if taps is not None: self._prbs.setTaps(taps) if stream is not None: pyrogue.streamConnect(self, stream) self._prbs.sendCount(sendCount) self.add(pyrogue.LocalVariable(name='txSize', description='PRBS Frame Size', units='Bytes', localSet=self._txSize, mode='RW', value=1024, typeStr='UInt32')) self.add(pyrogue.LocalVariable(name='txPeriod', description='Tx Period In Microseconds', units='uS', localSet=lambda value: self._prbs.setTxPeriod(value), localGet=lambda: self._prbs.getTxPeriod(), mode='RW', typeStr='UInt32')) self.add(pyrogue.LocalVariable(name='txEnable', description='PRBS Run Enable', mode='RW', value=False, localSet=self._txEnable)) self.add(pyrogue.LocalCommand(name='genFrame',description='Generate n frames',value=1, function=self._genFrame)) self.add(pyrogue.LocalVariable(name='txErrors', description='TX Error Count', mode='RO', pollInterval = 1, value=0, typeStr='UInt32', localGet=self._prbs.getTxErrors)) self.add(pyrogue.LocalVariable(name='txCount', description='TX Count', mode='RO', pollInterval = 1, value=0, typeStr='UInt32', localGet=self._prbs.getTxCount)) self.add(pyrogue.LocalVariable(name='txBytes', description='TX Bytes', mode='RO', pollInterval = 1, value=0, typeStr='UInt32', localGet=self._prbs.getTxBytes)) self.add(pyrogue.LocalVariable(name='genPayload', description='Payload Generate Enable', mode='RW', value=True, localSet=self._plEnable)) self.add(pyrogue.LocalVariable(name='txRate', description='TX Rate', disp="{:.3e}", mode='RO', pollInterval=1, value=0.0, units='Frames/s', localGet=self._prbs.getTxRate)) self.add(pyrogue.LocalVariable(name='txBw', description='TX BW', disp="{:.3e}", mode='RO', pollInterval=1, value=0.0, units='Bytes/s', localGet=self._prbs.getTxBw)) def _plEnable(self, value: bool, changed: bool) -> None: """Enable or disable payload generation.""" self._prbs.genPayload(value)
[docs] def countReset(self) -> None: """Reset PRBS counters and delegate to parent reset.""" self._prbs.resetCount() super().countReset()
def _genFrame(self, arg: int = 1) -> None: """Generate ``arg`` PRBS frames.""" for _ in range(arg): self._prbs.genFrame(self.txSize.value()) def _txSize(self, value: int, changed: bool) -> None: """Update the PRBS frame size, restarting TX if needed.""" if changed and int(self.txEnable.value()) == 1: self._prbs.disable() self._prbs.enable(value) def _txEnable(self, value: int, changed: bool) -> None: """Enable or disable PRBS transmission.""" if changed: if int(value) == 0: self._prbs.disable() else: self._prbs.enable(self.txSize.value()) def _getStreamMaster(self) -> rogue.utilities.Prbs: """Return the Rogue PRBS transmitter stream endpoint.""" return self._prbs def __rshift__(self, other: Any) -> Any: """Connect the transmitter to ``other``.""" pyrogue.streamConnect(self, other) return other
[docs] def setWidth(self, width: int) -> None: """Set PRBS word width in bits (must be a multiple of 32).""" self._prbs.setWidth(width)
[docs] def setTaps(self, taps: bytes | bytearray | memoryview) -> None: """Set LFSR feedback tap bit positions.""" self._prbs.setTaps(taps)
[docs] def sendCount(self, en: bool) -> None: """Enable or disable count transmission.""" self._prbs.sendCount(en)
def _stop(self) -> None: """Stop PRBS transmission.""" self._prbs.disable()
[docs] class PrbsPair(pyrogue.Device): """Paired PRBS transmitter and receiver. Parameters ---------- width : int, optional PRBS word width in bits. The underlying Rogue PRBS engine requires a multiple of 32 bits. If ``None``, the C++ default is 32 bits. taps : bytes-like, optional LFSR feedback tap bit positions used by the PRBS generator/checker. Provide a bytes-like sequence (for example ``bytes([1, 2, 6, 31])``) where each entry is a tapped bit index in the PRBS state. If ``None``, Rogue uses the C++ default taps ``[1, 2, 6, 31]``. For a 32-bit state in Rogue's left-shift Fibonacci form, this means ``new_bit0 = bit1 ^ bit2 ^ bit6 ^ bit31``, which corresponds to the polynomial ``x^32 + x^7 + x^3 + x^2 + 1`` (equivalent forms may use reciprocal indexing conventions). sendCount : bool, optional (default = False) Forward the TX count through the stream. txStream : rogue.interfaces.stream.Master or rogue.interfaces.stream.Slave, optional Optional TX stream endpoint. rxStream : rogue.interfaces.stream.Master or rogue.interfaces.stream.Slave, optional Optional RX stream endpoint. **kwargs : Any Additional arguments forwarded to ``pyrogue.Device``. """ def __init__( self, width: int | None = None, taps: bytes | bytearray | memoryview | None = None, sendCount: bool = False, txStream: ris.Master | ris.Slave | None = None, rxStream: ris.Master | ris.Slave | None = None, **kwargs: Any, ) -> None: super().__init__(**kwargs) self.add(PrbsTx(sendCount=sendCount, width=width, taps=taps, stream=txStream)) self.add(PrbsRx(width=width, taps=taps, stream=rxStream))
[docs] def setWidth(self, width: int) -> None: """Set PRBS word width in bits on both devices.""" self.PrbsTx.setWidth(width) self.PrbsRx.setWidth(width)
[docs] def setTaps(self, taps: bytes | bytearray | memoryview) -> None: """Set LFSR feedback tap bit positions on both devices.""" self.PrbsTx.setTaps(taps) self.PrbsRx.setTaps(taps)
[docs] def setCount(self, count: int) -> None: """Set RX count threshold.""" self.PrbsRx.setCount(count)
def _getStreamMaster(self) -> rogue.utilities.Prbs: """Return the stream master endpoint.""" return self.PrbsRx._getStreamMaster() def _getStreamSlave(self) -> rogue.utilities.Prbs: """Return the stream slave endpoint.""" return self.PrbsTx._getStreamSlave() def __rshift__(self, other: Any) -> Any: """Connect this pair output to ``other``.""" pyrogue.streamConnect(self, other) return other def __lshift__(self, other: Any) -> Any: """Connect ``other`` output to this pair input.""" pyrogue.streamConnect(other, self) return other def __eq__(self, other: Any) -> None: """Create a bidirectional stream connection to ``other``.""" pyrogue.streamConnectBiDir(other, self)