Source code for pyrogue._DataReceiver

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue base module - Data Receiver Device
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import rogue.interfaces.stream as ris
import pyrogue as pr
import numpy


[docs] class DataReceiver(pr.Device,ris.Slave): """Data Receiver Devicer.""" def __init__(self, typeStr='UInt8[np]', hideData=True, value=numpy.zeros(shape=1, dtype=numpy.uint8, order='C'), enableOnStart=True, **kwargs): pr.Device.__init__(self, **kwargs) ris.Slave.__init__(self) self._enableOnStart = enableOnStart self.add(pr.LocalVariable(name='RxEnable', value=True, description='Frame Rx Enable')) self.add(pr.LocalVariable(name='FrameCount', value=0, mode = 'RO', pollInterval=1, description='Frame Rx Counter')) self.add(pr.LocalVariable(name='ErrorCount', value=0, mode = 'RO', pollInterval=1, description='Frame Error Counter')) self.add(pr.LocalVariable(name='ByteCount', value=0, mode = 'RO', pollInterval=1, description='Byte Rx Counter')) self.add(pr.LocalVariable(name='Updated', value=False, mode = 'RW', description='Data has been updated flag. Set in the TRUE in DataReceiver and reset to zero by application')) self.add(pr.LocalVariable(name='Data', typeStr=typeStr, disp='', groups=['NoState','NoStream', 'NoConfig'], value=value, hidden=hideData, description='Data Frame Container')) def countReset(self): """ """ self.FrameCount.set(0) self.ErrorCount.set(0) self.ByteCount.set(0) super().countReset() def _acceptFrame(self, frame): """ Parameters ---------- frame : Returns ------- """ # Do nothing if not yet started or enabled if self.running is False or not self.RxEnable.value(): return # Lock frame with frame.lock(): # Drop errored frames if frame.getError() != 0: with self.ErrorCount.lock: self.ErrorCount.set(self.ErrorCount.value() + 1, write=False) return with self.FrameCount.lock: self.FrameCount.set(self.FrameCount.value() + 1, write=False) with self.ByteCount.lock: self.ByteCount.set(self.ByteCount.value() + frame.getPayload(), write=False) # User overridable method for data restructuring self.process(frame)
[docs] def process(self,frame): """ The user can use this method to process the data, by default a byte numpy array is generated This may include separating data, header and other payload sub-fields This all occurs with the frame lock held Parameters ---------- frame : Returns ------- """ # Get data from frame fl = frame.getPayload() dat = frame.getNumpy(0,fl) # uint8 # Update data self.Data.set(dat,write=True) self.Updated.set(True,write=True)
def _start(self): """ """ super()._start() self.RxEnable.set(value=self._enableOnStart) def _stop(self): """ """ self.RxEnable.set(value=False) super()._stop() # source >> destination def __rshift__(self,other): pr.streamConnect(self,other) return other # destination << source def __lshift__(self,other): pr.streamConnect(other,self) return other