#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# Module containing the utilities module class and methods
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import rogue.utilities
import pyrogue
[docs]
class PrbsRx(pyrogue.Device):
"""PRBS RX Wrapper"""
def __init__(self, *, width=None, checkPayload=True, taps=None, stream=None, **kwargs ):
pyrogue.Device.__init__(self, description='PRBS Software Receiver', **kwargs)
self._prbs = rogue.utilities.Prbs()
if width is not None:
self._prbs.setWidth(width)
if taps is not None:
self._prbs.setTaps(taps)
if stream is not None:
pyrogue.streamConnect(stream, self)
self.add(pyrogue.LocalVariable(name='rxEnable', description='RX Enable',
mode='RW', value=True,
localGet=lambda : self._prbs.getRxEnable(),
localSet=lambda value: self._prbs.setRxEnable(value)))
self.add(pyrogue.LocalVariable(name='rxErrors', description='RX Error Count',
mode='RO', pollInterval=1, value=0, typeStr='UInt32',
localGet=self._prbs.getRxErrors))
self.add(pyrogue.LocalVariable(name='rxCount', description='RX Count',
mode='RO', pollInterval=1, value=0, typeStr='UInt32',
localGet=self._prbs.getRxCount))
self.add(pyrogue.LocalVariable(name='rxBytes', description='RX Bytes',
mode='RO', pollInterval=1, value=0, typeStr='UInt32',
localGet=self._prbs.getRxBytes))
self.add(pyrogue.LocalVariable(name='rxRate', description='RX Rate', disp="{:.3e}",
mode='RO', pollInterval=1, value=0.0, units='Frames/s',
localGet=self._prbs.getRxRate))
self.add(pyrogue.LocalVariable(name='rxBw', description='RX BW', disp="{:.3e}",
mode='RO', pollInterval=1, value=0.0, units='Bytes/s',
localGet=self._prbs.getRxBw))
self.add(pyrogue.LocalVariable(name='checkPayload', description='Payload Check Enable',
mode='RW', value=checkPayload, localSet=self._plEnable))
def _plEnable(self,value,changed):
self._prbs.checkPayload(value)
def countReset(self):
self._prbs.resetCount()
super().countReset()
def _getStreamSlave(self):
return self._prbs
def __lshift__(self,other):
pyrogue.streamConnect(other,self)
return other
def setWidth(self,width):
self._prbs.setWidth(width)
def setTaps(self,taps):
self._prbs.setTaps(taps)
[docs]
class PrbsTx(pyrogue.Device):
"""PRBS TX Wrapper"""
def __init__(self, *, sendCount=False, width=None, taps=None, stream=None, **kwargs ):
pyrogue.Device.__init__(self, description='PRBS Software Transmitter', **kwargs)
self._prbs = rogue.utilities.Prbs()
if width is not None:
self._prbs.setWidth(width)
if taps is not None:
self._prbs.setTaps(taps)
if stream is not None:
pyrogue.streamConnect(self, stream)
self._prbs.sendCount(sendCount)
self.add(pyrogue.LocalVariable(name='txSize', description='PRBS Frame Size', units='Bytes',
localSet=self._txSize, mode='RW', value=1024, typeStr='UInt32'))
self.add(pyrogue.LocalVariable(name='txPeriod', description='Tx Period In Microseconds', units='uS',
localSet=lambda value: self._prbs.setTxPeriod(value), localGet=lambda: self._prbs.getTxPeriod(), mode='RW', typeStr='UInt32'))
self.add(pyrogue.LocalVariable(name='txEnable', description='PRBS Run Enable', mode='RW',
value=False, localSet=self._txEnable))
self.add(pyrogue.LocalCommand(name='genFrame',description='Generate n frames',value=1,
function=self._genFrame))
self.add(pyrogue.LocalVariable(name='txErrors', description='TX Error Count', mode='RO', pollInterval = 1,
value=0, typeStr='UInt32', localGet=self._prbs.getTxErrors))
self.add(pyrogue.LocalVariable(name='txCount', description='TX Count', mode='RO', pollInterval = 1,
value=0, typeStr='UInt32', localGet=self._prbs.getTxCount))
self.add(pyrogue.LocalVariable(name='txBytes', description='TX Bytes', mode='RO', pollInterval = 1,
value=0, typeStr='UInt32', localGet=self._prbs.getTxBytes))
self.add(pyrogue.LocalVariable(name='genPayload', description='Payload Generate Enable',
mode='RW', value=True, localSet=self._plEnable))
self.add(pyrogue.LocalVariable(name='txRate', description='TX Rate', disp="{:.3e}",
mode='RO', pollInterval=1, value=0.0, units='Frames/s',
localGet=self._prbs.getTxRate))
self.add(pyrogue.LocalVariable(name='txBw', description='TX BW', disp="{:.3e}",
mode='RO', pollInterval=1, value=0.0, units='Bytes/s',
localGet=self._prbs.getTxBw))
def _plEnable(self,value,changed):
self._prbs.genPayload(value)
def countReset(self):
self._prbs.resetCount()
super().countReset()
def _genFrame(self,arg=1):
for i in range(arg):
self._prbs.genFrame(self.txSize.value())
def _txSize(self,value,changed):
if changed and int(self.txEnable.value()) == 1:
self._prbs.disable()
self._prbs.enable(value)
def _txEnable(self,value,changed):
if changed:
if int(value) == 0:
self._prbs.disable()
else:
self._prbs.enable(self.txSize.value())
def _getStreamMaster(self):
return self._prbs
def __rshift__(self,other):
pyrogue.streamConnect(self,other)
return other
def setWidth(self,width):
self._prbs.setWidth(width)
def setTaps(self,taps):
self._prbs.setTaps(taps)
def sendCount(self,en):
self._prbs.sendCount(en)
def _stop(self):
self._prbs.disable()
[docs]
class PrbsPair(pyrogue.Device):
def __init__(self, width=None, taps=None, sendCount=False, txStream=None, rxStream=None, **kwargs):
super().__init__(self, **kwargs)
self.add(PrbsTx(width=width, taps=taps, stream=txStream))
self.add(PrbsRx(sendCount=sendCount, width=width, taps=taps, stream=rxStream))
def setWidth(self, width):
self.PrbsTx.setWidth(width)
self.PrbsRx.setWidth(width)
def setTaps(self, taps):
self.PrbsTx.setTaps(taps)
self.PrbsRx.setTaps(taps)
def setCount(self, count):
self.PrbsRx.setCount(count)
def _getStreamMaster(self):
return self.PrbsRx._getStreamMaster()
def _getStreamSlave(self):
return self.PrbsTx._getStreamSlave()
def __rshift__(self,other):
pyrogue.streamConnect(self,other)
return other
def __lshift__(self,other):
pyrogue.streamConnect(other,self)
return other
def __eq__(self,other):
pyrogue.streamConnectBiDir(other,self)