Source code for pyrogue._Device

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue base module - Device Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations

import collections
import datetime
import functools as ft
import glob
import os
import threading
import zipfile
from typing import Any, Callable, Iterable, Literal

import pyrogue as pr
import rogue.interfaces.memory as rim


class EnableVariable(pr.BaseVariable):
    """Enable flag variable with dependency-aware reporting.

    Parameters
    ----------
    enabled : bool
        Initial enable state.
    deps : iterable, optional
        Dependency variables that can disable this one.
    """
    def __init__(self, *, enabled: bool, deps: Iterable[pr.BaseVariable] | None = None) -> None:
        """Initialize the enable-state variable."""
        pr.BaseVariable.__init__(
            self,
            description='Determines if device is enabled for hardware access',
            name='enable',
            mode='RW',
            value=enabled,
            groups='Enable',
            disp={False: 'False', True: 'True', 'parent': 'ParentFalse', 'deps': 'ExtDepFalse'})

        if deps is None:
            self._deps   = []
            self._depDis = False
        else:
            self._deps   = deps
            self._depDis = True

        for d in self._deps:
            d.addListener(self)

        self._value  = enabled
        self._lock   = threading.Lock()

    @pr.expose
    def get(self, read: bool = False, index: int = -1) -> Any:
        """Return the effective enable state.

        Parameters
        ----------
        read : bool, optional (default = False)
            Unused for enable evaluation.
        index : int, optional (default = -1)
            Unused for enable evaluation.

        Returns
        -------
        Any
            ``True``/``False`` or a dependency status string.
        """
        ret = self._value

        with self._lock:
            if self._value is False:
                ret = False
            elif self._depDis:
                ret = 'deps'
            elif self._parent is self._root:
                #print("Root enable = {}".format(self._value))
                ret = self._value
            else:
                if self._parent._parent.enable.value() is not True:
                    ret = 'parent'
                else:
                    ret = True

        return ret

    @pr.expose
    def set(
        self,
        value: bool | Literal['parent', 'deps'],
        write: bool = True,
        index: int = -1,
    ) -> None:
        """Set the enable value.

        Parameters
        ----------
        value : bool | 'parent' | 'deps'
            New enable value.
        write : bool, optional (default = True)
            Unused for enable evaluation.
        index : int, optional (default = -1)
            Unused for enable evaluation.
        """
        if value != 'parent' and value != 'deps':
            old = self.value()

            with self._lock:
                self._value = value

            if old != value and old != 'parent' and old != 'deps':
                self.parent._updateBlockEnable()
                self.parent.enableChanged(value)

            with self.parent.root.updateGroup():
                self._queueUpdate()

            # The following concept will trigger enable listeners
            # directly. This is causing lock contentions in practice
            # (epics4 as an example)

            #self._doUpdate()
            #for var in self._listeners:
            #    var._doUpdate()

    def _doUpdate(self) -> Any:
        """Update dependency state before notifying listeners."""
        if len(self._deps) != 0:
            oldEn = (self.value() is True)

            with self._lock:
                self._depDis = not all(x.value() for x in self._deps)

            newEn = (self.value() is True)

            if oldEn != newEn:
                self.parent._updateBlockEnable()
                self.parent.enableChanged(newEn)

        return super()._doUpdate()

    def _rootAttached(self, parent: Any, root: Any) -> None:
        """Attach enable listeners when rooted."""
        pr.Node._rootAttached(self,parent,root)

        if parent is not root:
            parent._parent.enable.addListener(self)


class DeviceError(Exception):
    """Raised for device-level errors."""
    pass


[docs] class Device(pr.Node,rim.Hub): """Base class for PyRogue devices. Parameters ---------- name : str, optional Device name, defaults to the class name. description : str, optional (default = "") Human-readable description. memBase : rim.MemorySlave, optional Optional memory interface base, inherited from parent if not provided. offset : int, optional (default = 0) Memory offset for the device. hidden : bool, optional (default = False) If True, add the device to the ``Hidden`` group. groups : list[str], optional Groups to assign. expand : bool, optional (default = False) Default GUI expand state. enabled : bool, optional (default = True) Initial enable state. defaults : dict, optional Default variable values keyed by name. enableDeps : iterable, optional Enable dependency variables. hubMin : int, optional (default = 0) Hub minimum access size. hubMax : int, optional (default = 0) Hub maximum access size. guiGroup : str, optional GUI grouping label. """ def __init__( self, *, name: str | None = None, description: str = '', memBase: rim.MemorySlave | None = None, offset: int = 0, hidden: bool = False, groups: list[str] | None = None, expand: bool = False, enabled: bool = True, defaults: dict | None = None, enableDeps: Iterable[pr.BaseVariable] | None = None, hubMin: int = 0, hubMax: int = 0, guiGroup: str | None = None, ) -> None: """Initialize the device node and associated interfaces.""" if name is None: name = self.__class__.__name__ # Hub.__init__ must be called first for _setSlave to work below rim.Hub.__init__(self,offset,hubMin,hubMax) # Blocks self._blocks = [] self._custBlocks = [] self._memBase = memBase self._memLock = threading.RLock() self._defaults = defaults if defaults is not None else {} self._ifAndProto = [] self.forceCheckEach = False self._preWriteListeners = [] # Connect to memory slave if memBase: self._setSlave(memBase) # Node.__init__ can't be called until after self._memBase is created pr.Node.__init__(self, name=name, hidden=hidden, groups=groups, description=description, expand=expand, guiGroup=guiGroup) self._log.info("Making device %s", name) # Convenience methods self.addRemoteCommands = ft.partial(self.addNodes, pr.RemoteCommand) # Variable interface to enable flag self.add(EnableVariable(enabled=enabled, deps=enableDeps)) self.add(pr.LocalCommand(name='ReadDevice', value=False, hidden=True, function=lambda arg: self.readAndCheckBlocks(recurse=arg), description='Force read of device without recursion')) self.add(pr.LocalCommand(name='WriteDevice', value='', hidden=True, function=lambda arg: self.writeAndVerifyBlocks(force=True,recurse=arg), description='Force write of device without recursion')) @pr.expose @property def address(self) -> Any: """Return the device base address.""" return self._getAddress() @pr.expose @property def offset(self) -> Any: """Return the device offset.""" return self._getOffset()
[docs] def addCustomBlock(self, block: Any) -> None: """Add a pre-defined memory block to the device.""" self._custBlocks.append(block) self._custBlocks.sort(key=lambda x: (x.offset, x.size))
[docs] def add(self, node: Any) -> None: """Add a child node to the device.""" # Call node add pr.Node.add(self,node) # Adding device if isinstance(node,Device): # Device does not have a membase if node._memBase is None: node._setSlave(self)
[docs] def addInterface(self, *interfaces: Any | Iterable[Any]) -> None: """Add stream or memory interfaces to manage. Parameters ---------- *interfaces : Any One or more interfaces or iterables of interfaces. """ for interface in interfaces: if isinstance(interface, collections.abc.Iterable): for item in interface: self._log.debug("Managing interface/protocol %s", item) self._ifAndProto.append(item) else: self._log.debug("Managing interface/protocol %s", interface) self._ifAndProto.append(interface)
[docs] def addProtocol(self, *protocols: Any) -> None: """Add protocol entities (alias of ``addInterface``).""" self.addInterface(protocols)
[docs] def manage(self, *interfaces: Any) -> None: """Manage additional interfaces for start/stop.""" self._ifAndProto.extend(interfaces)
def _start(self) -> None: """ Called recursively from Root.start when starting """ for intf in self._ifAndProto: if hasattr(intf,"_start"): self._log.debug("Starting managed interface/protocol %s", intf) intf._start() for d in self.devices.values(): d._start() def _stop(self) -> None: """Called recursively from Root.stop when exiting""" for intf in self._ifAndProto: if hasattr(intf,"_stop"): self._log.debug("Stopping managed interface/protocol %s", intf) intf._stop() for d in self.devices.values(): d._stop() @property def running(self) -> bool: """Return True if the device is running.""" return self.root is not None and self.root.running
[docs] def addRemoteVariables( self, number: int, stride: int, pack: bool = False, **kwargs: Any, ) -> None: """Add a repeating block of remote variables. Parameters ---------- number : int Number of variables to add. stride : int Byte stride between instances. pack : bool, optional (default = False) If True, also add a ``LinkVariable`` named ``<name>_All`` which combines all entries into one underscore-delimited display string. The indexed ``RemoteVariable`` container remains available at the original ``name``. The packed alias uses each element variable's ``getDisp()`` and ``setDisp()`` behavior, so formatting follows the underlying variable type. Values are joined in reverse index order: for a variable array ``Field[0]``, ``Field[1]``, ``Field[2]``, the packed value ``Field_All`` reads like ``Field[2]_Field[1]_Field[0]`` and writing ``"1_2_3"`` updates ``Field[2] = 1``, ``Field[1] = 2``, and ``Field[0] = 3``. This is intended for orthogonal register fields that are convenient to expose both as individual variables and as one combined value. **kwargs : Any Arguments forwarded to ``RemoteVariable``. """ if pack: hidden=True else: hidden=kwargs.pop('hidden', False) self.addNodes(pr.RemoteVariable, number, stride, hidden=hidden, **kwargs) # If pack specified, create a linked variable to combine everything if pack: varList = getattr(self, kwargs['name']).values() def linkedSet(dev: Any, var: pr.LinkVariable, value: str, write: bool) -> None: """Split a packed display string and write each element variable.""" if value == '': return values = reversed(value.split('_')) for variable, item in zip(varList, values): variable.setDisp(item, write=write) def linkedGet(dev: Any, var: pr.LinkVariable, read: bool) -> str: """Join element display values into one packed underscore string.""" values = [v.getDisp(read=read) for v in varList] return '_'.join(reversed(values)) name = kwargs.pop('name') # Preserve the indexed array container at ``name`` and expose the # packed LinkVariable using the historical ``<name>_All`` alias. name += '_All' kwargs.pop('value', None) lv = pr.LinkVariable(name=name, value='', dependencies=varList, linkedGet=linkedGet, linkedSet=linkedSet, **kwargs) self.add(lv)
[docs] def setPollInterval( self, interval: float, variables: Iterable[Any] | None = None, ) -> None: """Set the poll interval for a group of variables. Parameters ---------- interval : float Polling interval in seconds. variables : iterable, optional Iterable of variable names or variables. If ``None``, all variables with nonzero poll interval are updated. """ if variables is None: variables = [k for k,v in self.variables.items() if v.pollInterval != 0] for x in variables: self.node(x).setPollInterval(interval)
[docs] def hideVariables( self, hidden: bool, variables: Iterable[Any] | None = None, ) -> None: """Hide a list of variables or variable names. Parameters ---------- hidden : bool True to hide variables, False to show them. variables : iterable, optional Iterable of variable names or variables. """ if variables is None: variables=self.variables.values() for v in variables: if isinstance(v, pr.BaseVariable): v.hidden = hidden elif isinstance(variables[0], str): self.variables[v].hidden = hidden
[docs] def initialize(self) -> None: """Call ``initialize`` on all child devices.""" for key,value in self.devices.items(): value.initialize()
[docs] def hardReset(self) -> None: """Call ``hardReset`` on all child devices.""" for key,value in self.devices.items(): value.hardReset()
[docs] def countReset(self) -> None: """Call ``countReset`` on all child devices.""" for key,value in self.devices.items(): value.countReset()
[docs] def enableChanged(self, value: Any) -> None: """Hook for reacting to enable state changes.""" pass
#if value is True: # self.writeAndVerifyBlocks(force=True, recurse=True, variable=None)
[docs] def addPreWriteListener( self, listener: Callable[[str, Any, dict[str, Any]], Any | None], stateVars: list[pr.BaseVariable] | None = None, ) -> None: """Register a device-level pre-write listener. Fires before any child variable write on this device. See BaseVariable.addPreWriteListener for callback semantics. Parameters ---------- listener : callable Callback: func(path, value, state) -> value | None stateVars : list of BaseVariable, optional Variables whose current values are captured into the state dict. """ entry = (listener, stateVars or []) if entry not in self._preWriteListeners: self._preWriteListeners.append(entry)
[docs] def delPreWriteListener( self, listener: Callable[[str, Any, dict[str, Any]], Any | None], ) -> None: """Remove a device-level pre-write listener. Parameters ---------- listener : callable The callback to remove. """ self._preWriteListeners = [ (cb, sv) for (cb, sv) in self._preWriteListeners if cb is not listener ]
[docs] def writeBlocks( self, *, force: bool = False, recurse: bool = True, variable: Any | None = None, checkEach: bool = False, index: int = -1, **kwargs: Any, ) -> None: """Write all blocks held by this device. Parameters ---------- force : bool, optional (default = False) Force the write even if values are unchanged. recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to write. checkEach : bool, optional (default = False) Perform per-variable verification checks. index : int, optional (default = -1) Optional index for array variables. **kwargs : Any Additional arguments passed through to the transaction. """ checkEach = checkEach or self.forceCheckEach if variable is not None: pr.startTransaction(variable._block, type=rim.Write, forceWr=force, checkEach=checkEach, variable=variable, index=index, **kwargs) else: for block in self._blocks: if block.bulkOpEn: pr.startTransaction(block, type=rim.Write, forceWr=force, checkEach=checkEach, **kwargs) if recurse: for key,value in self.devices.items(): value.writeBlocks(force=force, recurse=True, checkEach=checkEach, **kwargs)
[docs] def verifyBlocks( self, *, recurse: bool = True, variable: Any | None = None, checkEach: bool = False, **kwargs: Any, ) -> None: """Verify blocks in the background. Parameters ---------- recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to verify. checkEach : bool, optional (default = False) Perform per-variable verification checks. **kwargs : Any Additional arguments passed through to the transaction. """ checkEach = checkEach or self.forceCheckEach if variable is not None: pr.startTransaction(variable._block, type=rim.Verify, checkEach=checkEach, **kwargs) # Verify range is set by previous write else: for block in self._blocks: if block.bulkOpEn: pr.startTransaction(block, type=rim.Verify, checkEach=checkEach, **kwargs) if recurse: for key,value in self.devices.items(): value.verifyBlocks(recurse=True, checkEach=checkEach, **kwargs)
[docs] def readBlocks( self, *, recurse: bool = True, variable: Any | None = None, checkEach: bool = False, index: int = -1, **kwargs: Any, ) -> None: """Read blocks in the background. Parameters ---------- recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to read. checkEach : bool, optional (default = False) Perform per-variable verification checks. index : int, optional (default = -1) Optional index for array variables. **kwargs : Any Additional arguments passed through to the transaction. """ checkEach = checkEach or self.forceCheckEach if variable is not None: pr.startTransaction(variable._block, type=rim.Read, checkEach=checkEach, variable=variable, index=index, **kwargs) else: for block in self._blocks: if block.bulkOpEn: pr.startTransaction(block, type=rim.Read, checkEach=checkEach, **kwargs) if recurse: for key,value in self.devices.items(): value.readBlocks(recurse=True, checkEach=checkEach, **kwargs)
[docs] def checkBlocks( self, *, recurse: bool = True, variable: Any | None = None, **kwargs: Any, ) -> None: """Check block transactions and notify variable listeners. Parameters ---------- recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to check. **kwargs : Any Additional arguments passed through to the transaction. """ if variable is not None: pr.checkTransaction(variable._block, **kwargs) else: for block in self._blocks: pr.checkTransaction(block, **kwargs) if recurse: for key,value in self.devices.items(): value.checkBlocks(recurse=True, **kwargs)
[docs] def writeAndVerifyBlocks( self, force: bool = False, recurse: bool = True, variable: Any | None = None, checkEach: bool = False, ) -> None: """Write, verify, and check all blocks. Parameters ---------- force : bool, optional (default = False) Force the write even if values are unchanged. recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to write/verify. checkEach : bool, optional (default = False) Perform per-variable verification checks. """ self.writeBlocks(force=force, recurse=recurse, variable=variable, checkEach=checkEach) self.verifyBlocks(recurse=recurse, variable=variable, checkEach=checkEach) self.checkBlocks(recurse=recurse, variable=variable)
[docs] def readAndCheckBlocks( self, recurse: bool = True, variable: Any | None = None, checkEach: bool = False, ) -> None: """Read and check all blocks. Parameters ---------- recurse : bool, optional (default = True) If True, recurse into child devices. variable : object, optional Optional variable to read. checkEach : bool, optional (default = False) Perform per-variable verification checks. """ self.readBlocks(recurse=recurse, variable=variable, checkEach=checkEach) self.checkBlocks(recurse=recurse, variable=variable)
[docs] @pr.expose def saveYaml( self, name: str | None, readFirst: bool, modes: list[str] = ['RW', 'RO', 'WO'], incGroups: str | list[str] | None = None, excGroups: str | list[str] | None = None, autoPrefix: str = '', autoCompress: bool = False, ) -> bool: """Save YAML configuration or status to a file. Parameters ---------- name : str, optional Destination file path. If empty, a timestamped name is generated. readFirst : bool Read values from hardware before exporting. modes : list['RW' | 'WO' | 'RO'], optional (default = ['RW', 'RO', 'WO']) Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``. incGroups : str or list[str], optional Group name or group names to include. excGroups : str or list[str], optional Group name or group names to exclude. autoPrefix : str, optional Prefix for auto-generated filenames. autoCompress : bool, optional Generate a ``.zip`` file when auto-generating names. Default False Returns ------- bool Returns ``True`` when export completes. """ # Auto generate name if no arg if name is None or name == '': name = datetime.datetime.now().strftime(autoPrefix + "_%Y%m%d_%H%M%S.yml") if autoCompress: name += '.zip' yml = self.getYaml(readFirst=readFirst,modes=modes,incGroups=incGroups,excGroups=excGroups, recurse=True) if name.split('.')[-1] == 'zip': with zipfile.ZipFile(name, 'w', compression=zipfile.ZIP_LZMA) as zf: with zf.open(os.path.basename(name[:-4]),'w') as f: f.write(yml.encode('utf-8')) else: with open(name,'w') as f: f.write(yml) return True
[docs] def loadYaml( self, name: str | list[str], writeEach: bool, modes: list[str], incGroups: str | list[str] | None = None, excGroups: str | list[str] | None = None, ) -> bool: """Load YAML configuration from files or directories. Parameters ---------- name : str or list[str] Input file, directory, zip-path, or list of those entries. writeEach : bool Write each variable as it is applied. modes : list['RW' | 'WO' | 'RO'] Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``. incGroups : str or list[str], optional Group name or group names to include. excGroups : str or list[str], optional Group name or group names to exclude. Returns ------- bool Returns ``True`` when load completes. """ # Pass arg is a python list if isinstance(name,list): rawlst = name # Passed arg is a comma separated list of files elif ',' in name: rawlst = name.split(',') # Not a list else: rawlst = [name] # Init final list lst = [] # Iterate through raw list and look for directories for rl in rawlst: # Name ends with .yml or .yaml if rl[-4:] == '.yml' or rl[-5:] == '.yaml': lst.append(rl) # Entry is a zip file directory elif '.zip' in rl: base = rl.split('.zip')[0] + '.zip' sub = rl.split('.zip')[1][1:] # Open zipfile with zipfile.ZipFile(base, 'r', compression=zipfile.ZIP_LZMA) as myzip: # Check if passed name is a directory, otherwise generate an error if not any(x.startswith("%s/" % sub.rstrip("/")) for x in myzip.namelist()): raise Exception("loadYaml: Invalid load file: {}, must be a directory or end in .yml or .yaml".format(rl)) else: zip_yaml = [] # Iterate through directory contents for zfn in myzip.namelist(): # Filter by base directory if zfn.find(sub) == 0: spt = zfn.split('%s/' % sub.rstrip('/'))[1] # Entry ends in .yml or *.yml and is in current directory if '/' not in spt and (spt[-4:] == '.yml' or spt[-5:] == '.yaml'): zip_yaml.append(base + '/' + zfn) # Keep zip-directory loads aligned with normal directory # loads by applying a lexicographic pathname sort. lst.extend(sorted(zip_yaml)) # Entry is a directory elif os.path.isdir(rl): dlst = glob.glob('{}/*.yml'.format(rl)) dlst.extend(glob.glob('{}/*.yaml'.format(rl))) lst.extend(sorted(dlst)) # Not a zipfile, not a directory and does not end in .yml else: raise Exception("loadYaml: Invalid load file: {}, must be a directory or end in .yml or .yaml".format(rl)) self._log.info( "Loading YAML config from %s file(s), writeEach=%s, modes=%s, incGroups=%s, excGroups=%s", len(lst), writeEach, modes, incGroups, excGroups, ) # Read each file with self.root.pollBlock(), self.root.updateGroup(): for fn in lst: self._log.debug("Applying YAML config file %s", fn) d = pr.yamlToData(fName=fn) self._applyYamlDict(d=d,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups) if not writeEach: self._log.info("Committing staged YAML config to hardware") self._writeConfig() return True
[docs] def setYaml( self, yml: str, writeEach: bool, modes: list[str], incGroups: str | list[str] | None = None, excGroups: str | list[str] | None = None, ) -> None: """Set variable values from YAML text. Parameters ---------- yml : str YAML text containing values to apply. writeEach : bool Write each variable as it is applied. modes : list['RW' | 'WO' | 'RO'] Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``. incGroups : str or list[str], optional Group name or group names to include. excGroups : str or list[str], optional Group name or group names to exclude. """ d = pr.yamlToData(yml) self._log.info( "Applying YAML text config, writeEach=%s, modes=%s, incGroups=%s, excGroups=%s", writeEach, modes, incGroups, excGroups, ) with self.root.pollBlock(), self.root.updateGroup(): self._applyYamlDict(d=d,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups) if not writeEach: self._log.info("Committing staged YAML text config to hardware") self._writeConfig()
def _applyYamlDict( self, d: dict[str, Any], writeEach: bool, modes: list[str], incGroups: str | list[str] | None = None, excGroups: str | list[str] | None = None, ) -> None: """Apply a parsed YAML dictionary relative to this device. For each top-level key in the dictionary, if the key matches this device's name the value is treated as the device's children. Otherwise the key is resolved as a direct child via ``nodeMatch``. Root overrides this method to resolve absolute dotted paths. Parameters ---------- d : dict[str, object] Parsed YAML dictionary to apply. writeEach : bool Write each variable as it is applied. modes : list['RW' | 'WO' | 'RO'] Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``. incGroups : str or list[str], optional Group name or group names to include. excGroups : str or list[str], optional Group name or group names to exclude. """ for key, value in d.items(): if key == self.name: self._setDict(d=value,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups,keys=None) else: nodes, keys = self.nodeMatch(key) if len(nodes) == 0: self._log.error("Entry %s not found", key) else: for n in nodes: if n.filterByGroup(incGroups, excGroups): n._setDict(value,writeEach,modes,incGroups,excGroups,keys) def _writeConfig(self) -> None: """Write staged configuration to hardware for this device subtree. Root overrides this method to use the full-tree write path. """ force = self.root.ForceWrite.value() if hasattr(self.root, 'ForceWrite') else False self._log.info("Start device config write (forceWrite=%s)", force) self.writeBlocks(force=force, recurse=True) self.verifyBlocks(recurse=True) self.checkBlocks(recurse=True) self._log.info("Done device config write") def _updateBlockEnable(self) -> None: """Propagate effective enable state to this device's blocks.""" for block in self._blocks: block.setEnable(self.enable.value() is True) for key,value in self.devices.items(): value._updateBlockEnable() def _validateMemBase(self, remVars: list) -> None: """Raise DeviceError if this device has remote variables/commands but no memBase in its ancestor chain. A device added directly to Root without an explicit memBase will have Root as its memory slave (via _setSlave), but Root has no upstream memory path. This produces silent timeouts; raise early instead. Parameters ---------- remVars : list List of RemoteVariable/RemoteCommand nodes already collected in _buildBlocks. """ if not remVars: return # Walk up the parent chain looking for an ancestor with an explicit memBase. # If we reach Root without finding one, the device has no memory path. # Root sets self._parent = self, so detect that self-cycle to avoid infinite loops. node = self while node is not None: if node._memBase is not None: return # Found a memBase — memory path is valid if node._parent is node: raise DeviceError( f"Device '{self.path}' has RemoteVariables or RemoteCommands but no memBase. " f"Pass memBase= when creating this device or when adding it to the tree." ) node = node._parent def _buildBlocks(self) -> None: """Build and attach memory blocks for local/remote variables.""" remVars = [] # Use min block size, larger blocks can be pre-created blkSize = self._blkMinAccess() # Process all of the variables for k,n in self.nodes.items(): # Local variables have a 1:1 block association if isinstance(n,pr.LocalVariable): self._blocks.append(n._block) # Align to min access, create list of remote variables elif isinstance(n,pr.RemoteVariable) and n.offset is not None: self._log.info( "Before shift variable %s offset=%s bitSize=%s bytes=%s", n.name, n.offset, n.bitSize, n.varBytes, ) n._updatePath(n.path) n._shiftOffsetDown(n.offset % blkSize, blkSize) remVars += [n] self._log.info( "Creating variable %s offset=%s bitSize=%s bytes=%s", n.name, n.offset, n.bitSize, n.varBytes, ) self._validateMemBase(remVars) # Sort var list by offset, size remVars.sort(key=lambda x: (x.offset, x.varBytes)) blocks = [] blk = None # Go through sorted variable list, look for overlaps, group into blocks for n in remVars: if blk is not None and ( (blk['offset'] + blk['size']) > n.offset): self._log.info( "Overlap detected var offset=%s block offset=%s block bytes=%s", n.offset, blk['offset'], blk['size'], ) n._shiftOffsetDown(n.offset - blk['offset'], blkSize) blk['vars'].append(n) if n.varBytes > blk['size']: blk['size'] = n.varBytes # We need a new block for this variable else: blk = None # Look for pre-made block which overlaps for b in self._custBlocks: if ( (n.offset >= b.offset) and ((b.offset + b.size) > n.offset)): # Just in case a variable extends past the end of pre-made block, user mistake if n.varBytes > b.size: msg = 'Failed to add variable {n.name} to pre-made block with offset {b.offset} and size {b.size}' raise pr.MemoryError(name=self.path, address=self.address, msg=msg) blk = {'offset':b.offset, 'size':b.size, 'vars':[n], 'block':b} break # Block not found if blk is None: blk = {'offset':n.offset, 'size':n.varBytes, 'vars':[n], 'block':None} blocks.append(blk) # Clear pre-made list self._custBlocks = [] # Create new blocks and add new and pre-made blocks to device # Add variables to the block for b in blocks: # Create new block if b['block'] is None: newBlock = rim.Block(b['offset'], b['size']) self._log.debug("Adding new block at offset %#02x, size %s", b['offset'], b['size']) else: newBlock = b['block'] # Set memory slave newBlock._setSlave(self) # Verify the block is not too small or large for the memory interface if newBlock.size > self._blkMaxAccess() or newBlock.size < self._blkMinAccess(): msg = f'Block size {newBlock.size} is not in the range: {self._blkMinAccess()} - {self._blkMaxAccess()}' raise pr.MemoryError(name=self.path, address=self.address, msg=msg) # Add variables to the block newBlock.addVariables(b['vars']) # Set varible block links for v in b['vars']: v._block = newBlock # Add to device self._blocks.append(newBlock) newBlock.setEnable(self.enable.value() is True) def _rootAttached(self, parent: pr.Node, root: pr.Root) -> None: """Attach this device into the rooted tree and build variable blocks.""" pr.Node._rootAttached(self, parent, root) for key,value in self._nodes.items(): value._rootAttached(self,root) self._buildBlocks() # Override defaults as dictated by the _defaults dict for varName, defValue in self._defaults.items(): nodes,keys = self.nodeMatch(varName) if keys is None: for var in nodes: var._default = defValue def _setTimeout(self, timeout: float) -> None: """Set transaction timeout on this device, blocks, and child devices. Parameters ---------- timeout : float Timeout in seconds. """ for block in self._blocks: block._setTimeout(int(timeout*1000000)) rim.Master._setTimeout(self, int(timeout*1000000)) for key,value in self._nodes.items(): if isinstance(value,Device): value._setTimeout(timeout)
[docs] def command(self, **kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Decorator to add inline methods as commands. The decorated function is used as ``LocalCommand(function=...)``. The command wrapper supplies keyword arguments ``root``, ``dev``, ``cmd``, and ``arg``; the function may accept any subset. """ def _decorator(func: Callable[..., Any]) -> Callable[..., Any]: """Wrap and register a method as a LocalCommand.""" if 'name' not in kwargs: kwargs['name'] = func.__name__ self.add(pr.LocalCommand(function=func, **kwargs)) return func return _decorator
[docs] def linkVariableGet(self, **kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Decorator to add inline ``linkedGet`` functions. The decorated function is used as ``LinkVariable(linkedGet=...)``. The linked-get wrapper supplies keyword arguments ``dev``, ``var``, ``read``, ``index``, and ``check``; the function may accept any subset. """ def _decorator(func: Callable[..., Any]) -> Callable[..., Any]: """Wrap and register a function as LinkVariable linkedGet.""" if 'name' not in kwargs: kwargs['name'] = func.__name__ self.add(pr.LinkVariable(linkedGet=func, **kwargs)) return func return _decorator
[docs] def genDocuments( self, path: str, incGroups: str | list[str] | None = None, excGroups: str | list[str] | None = None, ) -> None: """Generate Sphinx documentation pages for this device. Parameters ---------- path : str Output directory path. incGroups : str or list[str], optional Group name or group names to include. excGroups : str or list[str], optional Group name or group names to exclude. """ with open(path + '/' + self.path.replace('.','_') + '.rst','w') as file: print("****************************",file=file) print(self.name,file=file) print("****************************",file=file) print('',file=file) print(pr.genDocDesc(self.description,0),file=file) print('',file=file) plist = self.getNodes(typ=pr.Process,incGroups=incGroups,excGroups=excGroups) dlist = self.devicesByGroup(incGroups=incGroups,excGroups=excGroups) vlist = self.variablesByGroup(incGroups=incGroups,excGroups=excGroups) clist = self.commandsByGroup(incGroups=incGroups,excGroups=excGroups) tlist = {k:v for k,v in dlist.items() if k not in plist} if len(tlist) > 0: print(".. toctree::",file=file) print(" :maxdepth: 1",file=file) print(" :caption: Sub Devices:",file=file) print('',file=file) for k,v in tlist.items(): print(' ' + v.path.replace('.','_'),file=file) v.genDocuments(path,incGroups,excGroups) print('',file=file) if len(plist) > 0: print(".. toctree::",file=file) print(" :maxdepth: 1",file=file) print(" :caption: Processes:",file=file) print('',file=file) for k,v in plist.items(): print(' ' + v.path.replace('.','_'),file=file) v.genDocuments(path,incGroups,excGroups) print('',file=file) print("Summary",file=file) print("#######",file=file) print('',file=file) if len(vlist): print("Variable List",file=file) print("*************",file=file) print('',file=file) for k,v in vlist.items(): print(f"* {k}",file=file) print('',file=file) if len(clist): print("Command List",file=file) print("*************",file=file) print('',file=file) for k,v in clist.items(): print(f"* {k}",file=file) print('',file=file) print("Details",file=file) print("#######",file=file) print('',file=file) if len(vlist): print("Variables",file=file) print("*********",file=file) print('',file=file) for k,v in vlist.items(): v._genDocs(file=file) print('',file=file) if len(clist): print("Commands",file=file) print("********",file=file) print('',file=file) for k,v in clist.items(): v._genDocs(file=file) print('',file=file)
class ArrayDevice(Device): """Device that instantiates an array of sub-devices. Parameters ---------- arrayClass : object Class to instantiate for each element. number : int Number of devices in the array. stride : int, optional (default = 0) Address stride between array elements. arrayArgs : object, optional Per-element argument overrides. **kwargs : Any Additional arguments forwarded to ``Device``. """ def __init__( self, *, arrayClass: Any, number: int, stride: int = 0, arrayArgs: Any | None = None, **kwargs: Any, ) -> None: """Initialize and populate an array-style device container.""" if 'name' not in kwargs: kwargs['name'] = f'{arrayClass.__name__}Array' super().__init__(**kwargs) if arrayArgs is None: arrayArgs = [{} for x in range(number)] elif isinstance(arrayArgs, dict): arrayArgs = [arrayArgs.copy() for x in range(number)] for i in range(number): args = arrayArgs[i] if 'name' in args: name = args.pop('name') else: name = f'{arrayClass.__name__}[{i}]' self.add(arrayClass( name=name, offset=i*stride, **args))