Source code for pyrogue._Device

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue base module - Device Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations

import collections
import functools as ft
import threading
from typing import Any, Callable, Iterable, Literal

import pyrogue as pr
import rogue.interfaces.memory as rim


class EnableVariable(pr.BaseVariable):
    """Enable flag variable with dependency-aware reporting.

    Parameters
    ----------
    enabled : bool
        Initial enable state.
    deps : iterable, optional
        Dependency variables that can disable this one.
    """
    def __init__(self, *, enabled: bool, deps: Iterable[pr.BaseVariable] | None = None) -> None:
        """Initialize the enable-state variable."""
        pr.BaseVariable.__init__(
            self,
            description='Determines if device is enabled for hardware access',
            name='enable',
            mode='RW',
            value=enabled,
            groups='Enable',
            disp={False: 'False', True: 'True', 'parent': 'ParentFalse', 'deps': 'ExtDepFalse'})

        if deps is None:
            self._deps   = []
            self._depDis = False
        else:
            self._deps   = deps
            self._depDis = True

        for d in self._deps:
            d.addListener(self)

        self._value  = enabled
        self._lock   = threading.Lock()

    @pr.expose
    def get(self, read: bool = False, index: int = -1) -> Any:
        """Return the effective enable state.

        Parameters
        ----------
        read : bool, optional (default = False)
            Unused for enable evaluation.
        index : int, optional (default = -1)
            Unused for enable evaluation.

        Returns
        -------
        Any
            ``True``/``False`` or a dependency status string.
        """
        ret = self._value

        with self._lock:
            if self._value is False:
                ret = False
            elif self._depDis:
                ret = 'deps'
            elif self._parent is self._root:
                #print("Root enable = {}".format(self._value))
                ret = self._value
            else:
                if self._parent._parent.enable.value() is not True:
                    ret = 'parent'
                else:
                    ret = True

        return ret

    @pr.expose
    def set(
        self,
        value: bool | Literal['parent', 'deps'],
        write: bool = True,
        index: int = -1,
    ) -> None:
        """Set the enable value.

        Parameters
        ----------
        value : bool | 'parent' | 'deps'
            New enable value.
        write : bool, optional (default = True)
            Unused for enable evaluation.
        index : int, optional (default = -1)
            Unused for enable evaluation.
        """
        if value != 'parent' and value != 'deps':
            old = self.value()

            with self._lock:
                self._value = value

            if old != value and old != 'parent' and old != 'deps':
                self.parent._updateBlockEnable()
                self.parent.enableChanged(value)

            with self.parent.root.updateGroup():
                self._queueUpdate()

            # The following concept will trigger enable listeners
            # directly. This is causing lock contentions in practice
            # (epics4 as an example)

            #self._doUpdate()
            #for var in self._listeners:
            #    var._doUpdate()

    def _doUpdate(self) -> Any:
        """Update dependency state before notifying listeners."""
        if len(self._deps) != 0:
            oldEn = (self.value() is True)

            with self._lock:
                self._depDis = not all(x.value() for x in self._deps)

            newEn = (self.value() is True)

            if oldEn != newEn:
                self.parent._updateBlockEnable()
                self.parent.enableChanged(newEn)

        return super()._doUpdate()

    def _rootAttached(self, parent: Any, root: Any) -> None:
        """Attach enable listeners when rooted."""
        pr.Node._rootAttached(self,parent,root)

        if parent is not root:
            parent._parent.enable.addListener(self)


class DeviceError(Exception):
    """Raised for device-level errors."""
    pass


[docs] class Device(pr.Node,rim.Hub): """Base class for PyRogue devices. Parameters ---------- name : str, optional Device name, defaults to the class name. description : str, optional (default = "") Human-readable description. memBase : rim.MemorySlave, optional Optional memory interface base, inherited from parent if not provided. offset : int, optional (default = 0) Memory offset for the device. hidden : bool, optional (default = False) If True, add the device to the ``Hidden`` group. groups : list[str], optional Groups to assign. expand : bool, optional (default = False) Default GUI expand state. enabled : bool, optional (default = True) Initial enable state. defaults : dict, optional Default variable values keyed by name. enableDeps : iterable, optional Enable dependency variables. hubMin : int, optional (default = 0) Hub minimum access size. hubMax : int, optional (default = 0) Hub maximum access size. guiGroup : str, optional GUI grouping label. """ def __init__( self, *, name: str | None = None, description: str = '', memBase: rim.MemorySlave | None = None, offset: int = 0, hidden: bool = False, groups: list[str] | None = None, expand: bool = False, enabled: bool = True, defaults: dict | None = None, enableDeps: Iterable[pr.BaseVariable] | None = None, hubMin: int = 0, hubMax: int = 0, guiGroup: str | None = None, ) -> None: """Initialize the device node and associated interfaces.""" if name is None: name = self.__class__.__name__ # Hub.__init__ must be called first for _setSlave to work below rim.Hub.__init__(self,offset,hubMin,hubMax) # Blocks self._blocks = [] self._custBlocks = [] self._memBase = memBase self._memLock = threading.RLock() self._defaults = defaults if defaults is not None else {} self._ifAndProto = [] self.forceCheckEach = False # Connect to memory slave if memBase: self._setSlave(memBase) # Node.__init__ can't be called until after self._memBase is created pr.Node.__init__(self, name=name, hidden=hidden, groups=groups, description=description, expand=expand, guiGroup=guiGroup) self._log.info("Making device {:s}".format(name)) # Convenience methods self.addRemoteCommands = ft.partial(self.addNodes, pr.RemoteCommand) # Variable interface to enable flag self.add(EnableVariable(enabled=enabled, deps=enableDeps)) self.add(pr.LocalCommand(name='ReadDevice', value=False, hidden=True, function=lambda arg: self.readAndCheckBlocks(recurse=arg), description='Force read of device without recursion')) self.add(pr.LocalCommand(name='WriteDevice', value='', hidden=True, function=lambda arg: self.writeAndVerifyBlocks(force=True,recurse=arg), description='Force write of device without recursion')) @pr.expose @property def address(self) -> Any: """Return the device base address.""" return self._getAddress() @pr.expose @property def offset(self) -> Any: """Return the device offset.""" return self._getOffset()
[docs] def addCustomBlock(self, block: Any) -> None: """Add a pre-defined memory block to the device.""" self._custBlocks.append(block) self._custBlocks.sort(key=lambda x: (x.offset, x.size))
[docs] def add(self, node: Any) -> None: """Add a child node to the device.""" # Call node add pr.Node.add(self,node) # Adding device if isinstance(node,Device): # Device does not have a membase if node._memBase is None: node._setSlave(self)
[docs] def addInterface(self, *interfaces: Any | Iterable[Any]) -> None: """Add stream or memory interfaces to manage. Parameters ---------- *interfaces : Any One or more interfaces or iterables of interfaces. """ for interface in interfaces: if isinstance(interface, collections.abc.Iterable): self._ifAndProto.extend(interface) else: self._ifAndProto.append(interface)
[docs] def addProtocol(self, *protocols: Any) -> None: """Add protocol entities (alias of ``addInterface``).""" self.addInterface(protocols)
[docs] def manage(self, *interfaces: Any) -> None: """Manage additional interfaces for start/stop.""" self._ifAndProto.extend(interfaces)
def _start(self) -> None: """ Called recursively from Root.start when starting """ for intf in self._ifAndProto: if hasattr(intf,"_start"): intf._start() for d in self.devices.values(): d._start() def _stop(self) -> None: """Called recursively from Root.stop when exiting""" for intf in self._ifAndProto: if hasattr(intf,"_stop"): intf._stop() for d in self.devices.values(): d._stop() @property def running(self) -> bool: """Return True if the device is running.""" return self.root is not None and self.root.running
[docs] def addRemoteVariables( self, number: int, stride: int, pack: bool = False, **kwargs: Any, ) -> None: """Add a repeating block of remote variables. Parameters ---------- number : int Number of variables to add. stride : int Byte stride between instances. pack : bool, optional (default = False) If True, add a packed link variable for all entries. **kwargs : Any Arguments forwarded to ``RemoteVariable``. """ if pack: hidden=True else: hidden=kwargs.pop('hidden', False) self.addNodes(pr.RemoteVariable, number, stride, hidden=hidden, **kwargs) # If pack specified, create a linked variable to combine everything if pack: varList = getattr(self, kwargs['name']).values() def linkedSet(dev: Any, var: pr.LinkVariable, val: str, write: bool) -> None: """Split a packed display string and write each element variable.""" if val == '': return values = reversed(val.split('_')) for variable, value in zip(varList, values): variable.setDisp(value, write=write) def linkedGet(dev: Any, var: pr.LinkVariable, read: bool) -> str: """Join element display values into one packed underscore string.""" values = [v.getDisp(read=read) for v in varList] return '_'.join(reversed(values)) name = kwargs.pop('name') kwargs.pop('value', None) lv = pr.LinkVariable(name=name, value='', dependencies=varList, linkedGet=linkedGet, linkedSet=linkedSet, **kwargs) self.add(lv)
[docs] def setPollInterval( self, interval: float, variables: Iterable[Any] | None = None, ) -> None: """Set the poll interval for a group of variables. Parameters ---------- interval : float Polling interval in seconds. variables : iterable, optional Iterable of variable names or variables. If ``None``, all variables with nonzero poll interval are updated. """ if variables is None: variables = [k for k,v in self.variables.items() if v.pollInterval != 0] for x in variables: self.node(x).setPollInterval(interval)
[docs] def hideVariables( self, hidden: bool, variables: Iterable[Any] | None = None, ) -> None: """Hide a list of variables or variable names. Parameters ---------- hidden : bool True to hide variables, False to show them. variables : iterable, optional Iterable of variable names or variables. """ if variables is None: variables=self.variables.values() for v in variables: if isinstance(v, pr.BaseVariable): v.hidden = hidden elif isinstance(variables[0], str): self.variables[v].hidden = hidden
[docs] def initialize(self) -> None: """Call ``initialize`` on all child devices.""" for key,value in self.devices.items(): value.initialize()
[docs] def hardReset(self) -> None: """Call ``hardReset`` on all child devices.""" for key,value in self.devices.items(): value.hardReset()
[docs] def countReset(self) -> None: """Call ``countReset`` on all child devices.""" for key,value in self.devices.items(): value.countReset()
[docs] def enableChanged(self, value: Any) -> None: """Hook for reacting to enable state changes.""" pass
#if value is True: # self.writeAndVerifyBlocks(force=True, recurse=True, variable=None)
[docs] def writeBlocks( self, *, force: bool = False, recurse: bool = True, variable: Any | None = None, checkEach: bool = False, index: int = -1, **kwargs: Any, ) -> None: """Write all blocks held by this device. Parameters ---------- force : bool, optional (default = False) Force the write even if values are unchanged. recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to write. checkEach : bool, optional (default = False) Perform per-variable verification checks. index : int, optional (default = -1) Optional index for array variables. **kwargs : Any Additional arguments passed through to the transaction. """ checkEach = checkEach or self.forceCheckEach if variable is not None: pr.startTransaction(variable._block, type=rim.Write, forceWr=force, checkEach=checkEach, variable=variable, index=index, **kwargs) else: for block in self._blocks: if block.bulkOpEn: pr.startTransaction(block, type=rim.Write, forceWr=force, checkEach=checkEach, **kwargs) if recurse: for key,value in self.devices.items(): value.writeBlocks(force=force, recurse=True, checkEach=checkEach, **kwargs)
[docs] def verifyBlocks( self, *, recurse: bool = True, variable: Any | None = None, checkEach: bool = False, **kwargs: Any, ) -> None: """Verify blocks in the background. Parameters ---------- recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to verify. checkEach : bool, optional (default = False) Perform per-variable verification checks. **kwargs : Any Additional arguments passed through to the transaction. """ checkEach = checkEach or self.forceCheckEach if variable is not None: pr.startTransaction(variable._block, type=rim.Verify, checkEach=checkEach, **kwargs) # Verify range is set by previous write else: for block in self._blocks: if block.bulkOpEn: pr.startTransaction(block, type=rim.Verify, checkEach=checkEach, **kwargs) if recurse: for key,value in self.devices.items(): value.verifyBlocks(recurse=True, checkEach=checkEach, **kwargs)
[docs] def readBlocks( self, *, recurse: bool = True, variable: Any | None = None, checkEach: bool = False, index: int = -1, **kwargs: Any, ) -> None: """Read blocks in the background. Parameters ---------- recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to read. checkEach : bool, optional (default = False) Perform per-variable verification checks. index : int, optional (default = -1) Optional index for array variables. **kwargs : Any Additional arguments passed through to the transaction. """ checkEach = checkEach or self.forceCheckEach if variable is not None: pr.startTransaction(variable._block, type=rim.Read, checkEach=checkEach, variable=variable, index=index, **kwargs) else: for block in self._blocks: if block.bulkOpEn: pr.startTransaction(block, type=rim.Read, checkEach=checkEach, **kwargs) if recurse: for key,value in self.devices.items(): value.readBlocks(recurse=True, checkEach=checkEach, **kwargs)
[docs] def checkBlocks( self, *, recurse: bool = True, variable: Any | None = None, **kwargs: Any, ) -> None: """Check block transactions and notify variable listeners. Parameters ---------- recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to check. **kwargs : Any Additional arguments passed through to the transaction. """ if variable is not None: pr.checkTransaction(variable._block, **kwargs) else: for block in self._blocks: pr.checkTransaction(block, **kwargs) if recurse: for key,value in self.devices.items(): value.checkBlocks(recurse=True, **kwargs)
[docs] def writeAndVerifyBlocks( self, force: bool = False, recurse: bool = True, variable: Any | None = None, checkEach: bool = False, ) -> None: """Write, verify, and check all blocks. Parameters ---------- force : bool, optional (default = False) Force the write even if values are unchanged. recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to write/verify. checkEach : bool, optional (default = False) Perform per-variable verification checks. """ self.writeBlocks(force=force, recurse=recurse, variable=variable, checkEach=checkEach) self.verifyBlocks(recurse=recurse, variable=variable, checkEach=checkEach) self.checkBlocks(recurse=recurse, variable=variable)
[docs] def readAndCheckBlocks( self, recurse: bool = True, variable: Any | None = None, checkEach: bool = False, ) -> None: """Read and check all blocks. Parameters ---------- recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to read. checkEach : bool, optional (default = False) Perform per-variable verification checks. """ self.readBlocks(recurse=recurse, variable=variable, checkEach=checkEach) self.checkBlocks(recurse=recurse, variable=variable)
def _updateBlockEnable(self) -> None: """Propagate effective enable state to this device's blocks.""" for block in self._blocks: block.setEnable(self.enable.value() is True) for key,value in self.devices.items(): value._updateBlockEnable() def _buildBlocks(self) -> None: """Build and attach memory blocks for local/remote variables.""" remVars = [] # Use min block size, larger blocks can be pre-created blkSize = self._blkMinAccess() # Process all of the variables for k,n in self.nodes.items(): # Local variables have a 1:1 block association if isinstance(n,pr.LocalVariable): self._blocks.append(n._block) # Align to min access, create list of remote variables elif isinstance(n,pr.RemoteVariable) and n.offset is not None: self._log.info(f"Before Shift variable {n.name} offset={n.offset} bitSize={n.bitSize} bytes={n.varBytes}") n._updatePath(n.path) n._shiftOffsetDown(n.offset % blkSize, blkSize) remVars += [n] self._log.info(f"Creating variable {n.name} offset={n.offset} bitSize={n.bitSize} bytes={n.varBytes}") # Sort var list by offset, size remVars.sort(key=lambda x: (x.offset, x.varBytes)) blocks = [] blk = None # Go through sorted variable list, look for overlaps, group into blocks for n in remVars: if blk is not None and ( (blk['offset'] + blk['size']) > n.offset): self._log.info("Overlap detected var offset={} block offset={} block bytes={}".format(n.offset,blk['offset'],blk['size'])) n._shiftOffsetDown(n.offset - blk['offset'], blkSize) blk['vars'].append(n) if n.varBytes > blk['size']: blk['size'] = n.varBytes # We need a new block for this variable else: blk = None # Look for pre-made block which overlaps for b in self._custBlocks: if ( (n.offset >= b.offset) and ((b.offset + b.size) > n.offset)): # Just in case a variable extends past the end of pre-made block, user mistake if n.varBytes > b.size: msg = 'Failed to add variable {n.name} to pre-made block with offset {b.offset} and size {b.size}' raise pr.MemoryError(name=self.path, address=self.address, msg=msg) blk = {'offset':b.offset, 'size':b.size, 'vars':[n], 'block':b} break # Block not found if blk is None: blk = {'offset':n.offset, 'size':n.varBytes, 'vars':[n], 'block':None} blocks.append(blk) # Clear pre-made list self._custBlocks = [] # Create new blocks and add new and pre-made blocks to device # Add variables to the block for b in blocks: # Create new block if b['block'] is None: newBlock = rim.Block(b['offset'], b['size']) self._log.debug("Adding new block at offset {:#02x}, size {}".format(b['offset'], b['size'])) else: newBlock = b['block'] # Set memory slave newBlock._setSlave(self) # Verify the block is not too small or large for the memory interface if newBlock.size > self._blkMaxAccess() or newBlock.size < self._blkMinAccess(): msg = f'Block size {newBlock.size} is not in the range: {self._blkMinAccess()} - {self._blkMaxAccess()}' raise pr.MemoryError(name=self.path, address=self.address, msg=msg) # Add variables to the block newBlock.addVariables(b['vars']) # Set varible block links for v in b['vars']: v._block = newBlock # Add to device self._blocks.append(newBlock) newBlock.setEnable(self.enable.value() is True) def _rootAttached(self, parent: pr.Node, root: pr.Root) -> None: """Attach this device into the rooted tree and build variable blocks.""" pr.Node._rootAttached(self, parent, root) for key,value in self._nodes.items(): value._rootAttached(self,root) self._buildBlocks() # Override defaults as dictated by the _defaults dict for varName, defValue in self._defaults.items(): nodes,keys = self.nodeMatch(varName) if keys is None: for var in nodes: var._default = defValue def _setTimeout(self, timeout: float) -> None: """Set transaction timeout on this device, blocks, and child devices. Parameters ---------- timeout : float Timeout in seconds. """ for block in self._blocks: block._setTimeout(int(timeout*1000000)) rim.Master._setTimeout(self, int(timeout*1000000)) for key,value in self._nodes.items(): if isinstance(value,Device): value._setTimeout(timeout)
[docs] def command(self, **kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Decorator to add inline methods as commands. The decorated function is used as ``LocalCommand(function=...)``. The command wrapper supplies keyword arguments ``root``, ``dev``, ``cmd``, and ``arg``; the function may accept any subset. """ def _decorator(func: Callable[..., Any]) -> Callable[..., Any]: """Wrap and register a method as a LocalCommand.""" if 'name' not in kwargs: kwargs['name'] = func.__name__ self.add(pr.LocalCommand(function=func, **kwargs)) return func return _decorator
[docs] def linkVariableGet(self, **kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Decorator to add inline ``linkedGet`` functions. The decorated function is used as ``LinkVariable(linkedGet=...)``. The linked-get wrapper supplies keyword arguments ``dev``, ``var``, ``read``, ``index``, and ``check``; the function may accept any subset. """ def _decorator(func: Callable[..., Any]) -> Callable[..., Any]: """Wrap and register a function as LinkVariable linkedGet.""" if 'name' not in kwargs: kwargs['name'] = func.__name__ self.add(pr.LinkVariable(linkedGet=func, **kwargs)) return func return _decorator
[docs] def genDocuments( self, path: str, incGroups: str | list[str] | None = None, excGroups: str | list[str] | None = None, ) -> None: """Generate Sphinx documentation pages for this device. Parameters ---------- path : str Output directory path. incGroups : str or list[str], optional Group name or group names to include. excGroups : str or list[str], optional Group name or group names to exclude. """ with open(path + '/' + self.path.replace('.','_') + '.rst','w') as file: print("****************************",file=file) print(self.name,file=file) print("****************************",file=file) print('',file=file) print(pr.genDocDesc(self.description,0),file=file) print('',file=file) plist = self.getNodes(typ=pr.Process,incGroups=incGroups,excGroups=excGroups) dlist = self.devicesByGroup(incGroups=incGroups,excGroups=excGroups) vlist = self.variablesByGroup(incGroups=incGroups,excGroups=excGroups) clist = self.commandsByGroup(incGroups=incGroups,excGroups=excGroups) tlist = {k:v for k,v in dlist.items() if k not in plist} if len(tlist) > 0: print(".. toctree::",file=file) print(" :maxdepth: 1",file=file) print(" :caption: Sub Devices:",file=file) print('',file=file) for k,v in tlist.items(): print(' ' + v.path.replace('.','_'),file=file) v.genDocuments(path,incGroups,excGroups) print('',file=file) if len(plist) > 0: print(".. toctree::",file=file) print(" :maxdepth: 1",file=file) print(" :caption: Processes:",file=file) print('',file=file) for k,v in plist.items(): print(' ' + v.path.replace('.','_'),file=file) v.genDocuments(path,incGroups,excGroups) print('',file=file) print("Summary",file=file) print("#######",file=file) print('',file=file) if len(vlist): print("Variable List",file=file) print("*************",file=file) print('',file=file) for k,v in vlist.items(): print(f"* {k}",file=file) print('',file=file) if len(clist): print("Command List",file=file) print("*************",file=file) print('',file=file) for k,v in clist.items(): print(f"* {k}",file=file) print('',file=file) print("Details",file=file) print("#######",file=file) print('',file=file) if len(vlist): print("Variables",file=file) print("*********",file=file) print('',file=file) for k,v in vlist.items(): v._genDocs(file=file) print('',file=file) if len(clist): print("Commands",file=file) print("********",file=file) print('',file=file) for k,v in clist.items(): v._genDocs(file=file) print('',file=file)
class ArrayDevice(Device): """Device that instantiates an array of sub-devices. Parameters ---------- arrayClass : object Class to instantiate for each element. number : int Number of devices in the array. stride : int, optional (default = 0) Address stride between array elements. arrayArgs : object, optional Per-element argument overrides. **kwargs : Any Additional arguments forwarded to ``Device``. """ def __init__( self, *, arrayClass: Any, number: int, stride: int = 0, arrayArgs: Any | None = None, **kwargs: Any, ) -> None: """Initialize and populate an array-style device container.""" if 'name' not in kwargs: kwargs['name'] = f'{arrayClass.__name__}Array' super().__init__(**kwargs) if arrayArgs is None: arrayArgs = [{} for x in range(number)] elif isinstance(arrayArgs, dict): arrayArgs = [arrayArgs.copy() for x in range(number)] for i in range(number): args = arrayArgs[i] if 'name' in args: name = args.pop('name') else: name = f'{arrayClass.__name__}[{i}]' self.add(arrayClass( name=name, offset=i*stride, **args))