Source code for pyrogue._DataReceiver

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue base module - Data Receiver Device
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations

from typing import Any

import numpy
import pyrogue as pr
import rogue.interfaces.stream as ris


[docs] class DataReceiver(pr.Device,ris.Slave): """Data receiver device. Parameters ---------- typeStr : str, optional (default = 'UInt8[np]') Type string for the data variable. hideData : bool, optional (default = True) If True, hide the data variable in the GUI. value : object, optional (default = numpy.zeros(shape=1, dtype=numpy.uint8, order='C')) Initial data value. enableOnStart : bool, optional (default = True) If True, enable Rx on start. **kwargs : Any Additional arguments forwarded to ``Device``. """ def __init__(self, typeStr: str = 'UInt8[np]', hideData: bool = True, value: Any = numpy.zeros(shape=1, dtype=numpy.uint8, order='C'), enableOnStart: bool = True, **kwargs: Any) -> None: """Initialize the data receiver.""" pr.Device.__init__(self, **kwargs) ris.Slave.__init__(self) self._enableOnStart = enableOnStart self.add(pr.LocalVariable(name='RxEnable', value=True, description='Frame Rx Enable')) self.add(pr.LocalVariable(name='FrameCount', value=0, mode = 'RO', pollInterval=1, description='Frame Rx Counter')) self.add(pr.LocalVariable(name='ErrorCount', value=0, mode = 'RO', pollInterval=1, description='Frame Error Counter')) self.add(pr.LocalVariable(name='ByteCount', value=0, mode = 'RO', pollInterval=1, description='Byte Rx Counter')) self.add(pr.LocalVariable(name='Updated', value=False, mode = 'RW', description='Data has been updated flag. Set in the TRUE in DataReceiver and reset to zero by application')) self.add(pr.LocalVariable(name='Data', typeStr=typeStr, disp='', groups=['NoState','NoStream', 'NoConfig'], value=value, hidden=hideData, description='Data Frame Container'))
[docs] def countReset(self) -> None: """Reset receiver counters.""" self.FrameCount.set(0) self.ErrorCount.set(0) self.ByteCount.set(0) super().countReset()
def _acceptFrame(self, frame: ris.Frame) -> None: """ Parameters ---------- frame : Returns ------- """ # Do nothing if not yet started or enabled if self.running is False or not self.RxEnable.value(): return # Lock frame with frame.lock(): # Drop errored frames if frame.getError() != 0: with self.ErrorCount.lock: self.ErrorCount.set(self.ErrorCount.value() + 1, write=False) return with self.FrameCount.lock: self.FrameCount.set(self.FrameCount.value() + 1, write=False) with self.ByteCount.lock: self.ByteCount.set(self.ByteCount.value() + frame.getPayload(), write=False) # User overridable method for data restructuring self.process(frame)
[docs] def process(self, frame: ris.Frame) -> None: """ The user can use this method to process the data, by default a byte numpy array is generated This may include separating data, header and other payload sub-fields This all occurs with the frame lock held Parameters ---------- frame : rogue.interfaces.stream.Frame Incoming frame to process. Returns ------- """ # Get data from frame fl = frame.getPayload() dat = frame.getNumpy(0,fl) # uint8 # Update data self.Data.set(dat,write=True) self.Updated.set(True,write=True)
def _start(self) -> None: """ """ super()._start() self.RxEnable.set(value=self._enableOnStart) def _stop(self) -> None: """ """ self.RxEnable.set(value=False) super()._stop() # source >> destination def __rshift__(self, other: Any) -> Any: """Connect this receiver to a stream destination.""" pr.streamConnect(self,other) return other # destination << source def __lshift__(self, other: Any) -> Any: """Connect a stream source to this receiver.""" pr.streamConnect(other,self) return other