#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Data Receiver Device
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations
from typing import Any
import numpy
import pyrogue as pr
import rogue.interfaces.stream as ris
[docs]
class DataReceiver(pr.Device,ris.Slave):
"""Data receiver device.
Parameters
----------
typeStr : str, optional (default = 'UInt8[np]')
Type string for the data variable.
hideData : bool, optional (default = True)
If True, hide the data variable in the GUI.
value : object, optional (default = numpy.zeros(shape=1, dtype=numpy.uint8, order='C'))
Initial data value.
enableOnStart : bool, optional (default = True)
If True, enable Rx on start.
**kwargs : Any
Additional arguments forwarded to ``Device``.
"""
def __init__(self,
typeStr: str = 'UInt8[np]',
hideData: bool = True,
value: Any = numpy.zeros(shape=1, dtype=numpy.uint8, order='C'),
enableOnStart: bool = True,
**kwargs: Any) -> None:
"""Initialize the data receiver."""
pr.Device.__init__(self, **kwargs)
ris.Slave.__init__(self)
self._enableOnStart = enableOnStart
self.add(pr.LocalVariable(name='RxEnable',
value=True,
description='Frame Rx Enable'))
self.add(pr.LocalVariable(name='FrameCount',
value=0,
mode = 'RO',
pollInterval=1,
description='Frame Rx Counter'))
self.add(pr.LocalVariable(name='ErrorCount',
value=0,
mode = 'RO',
pollInterval=1,
description='Frame Error Counter'))
self.add(pr.LocalVariable(name='ByteCount',
value=0,
mode = 'RO',
pollInterval=1,
description='Byte Rx Counter'))
self.add(pr.LocalVariable(name='Updated',
value=False,
mode = 'RW',
description='Data has been updated flag. Set in the TRUE in DataReceiver and reset to zero by application'))
self.add(pr.LocalVariable(name='Data',
typeStr=typeStr,
disp='',
groups=['NoState','NoStream', 'NoConfig'],
value=value,
hidden=hideData,
description='Data Frame Container'))
[docs]
def countReset(self) -> None:
"""Reset receiver counters."""
self.FrameCount.set(0)
self.ErrorCount.set(0)
self.ByteCount.set(0)
super().countReset()
def _acceptFrame(self, frame: ris.Frame) -> None:
"""
Parameters
----------
frame :
Returns
-------
"""
# Do nothing if not yet started or enabled
if self.running is False or not self.RxEnable.value():
return
# Lock frame
with frame.lock():
# Drop errored frames
if frame.getError() != 0:
with self.ErrorCount.lock:
self.ErrorCount.set(self.ErrorCount.value() + 1, write=False)
return
with self.FrameCount.lock:
self.FrameCount.set(self.FrameCount.value() + 1, write=False)
with self.ByteCount.lock:
self.ByteCount.set(self.ByteCount.value() + frame.getPayload(), write=False)
# User overridable method for data restructuring
self.process(frame)
[docs]
def process(self, frame: ris.Frame) -> None:
"""
The user can use this method to process the data, by default a byte numpy array is generated
This may include separating data, header and other payload sub-fields
This all occurs with the frame lock held
Parameters
----------
frame : rogue.interfaces.stream.Frame
Incoming frame to process.
Returns
-------
"""
# Get data from frame
fl = frame.getPayload()
dat = frame.getNumpy(0,fl) # uint8
# Update data
self.Data.set(dat,write=True)
self.Updated.set(True,write=True)
def _start(self) -> None:
""" """
super()._start()
self.RxEnable.set(value=self._enableOnStart)
def _stop(self) -> None:
""" """
self.RxEnable.set(value=False)
super()._stop()
# source >> destination
def __rshift__(self, other: Any) -> Any:
"""Connect this receiver to a stream destination."""
pr.streamConnect(self,other)
return other
# destination << source
def __lshift__(self, other: Any) -> Any:
"""Connect a stream source to this receiver."""
pr.streamConnect(other,self)
return other