#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Device Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations
import collections
import datetime
import functools as ft
import glob
import os
import threading
import zipfile
from typing import Any, Callable, Iterable, Literal
import pyrogue as pr
import rogue.interfaces.memory as rim
class EnableVariable(pr.BaseVariable):
"""Enable flag variable with dependency-aware reporting.
Parameters
----------
enabled : bool
Initial enable state.
deps : iterable, optional
Dependency variables that can disable this one.
"""
def __init__(self, *, enabled: bool, deps: Iterable[pr.BaseVariable] | None = None) -> None:
"""Initialize the enable-state variable."""
pr.BaseVariable.__init__(
self,
description='Determines if device is enabled for hardware access',
name='enable',
mode='RW',
value=enabled,
groups='Enable',
disp={False: 'False', True: 'True', 'parent': 'ParentFalse', 'deps': 'ExtDepFalse'})
if deps is None:
self._deps = []
self._depDis = False
else:
self._deps = deps
self._depDis = True
for d in self._deps:
d.addListener(self)
self._value = enabled
self._lock = threading.Lock()
@pr.expose
def get(self, read: bool = False, index: int = -1) -> Any:
"""Return the effective enable state.
Parameters
----------
read : bool, optional (default = False)
Unused for enable evaluation.
index : int, optional (default = -1)
Unused for enable evaluation.
Returns
-------
Any
``True``/``False`` or a dependency status string.
"""
ret = self._value
with self._lock:
if self._value is False:
ret = False
elif self._depDis:
ret = 'deps'
elif self._parent is self._root:
#print("Root enable = {}".format(self._value))
ret = self._value
else:
if self._parent._parent.enable.value() is not True:
ret = 'parent'
else:
ret = True
return ret
@pr.expose
def set(
self,
value: bool | Literal['parent', 'deps'],
write: bool = True,
index: int = -1,
) -> None:
"""Set the enable value.
Parameters
----------
value : bool | 'parent' | 'deps'
New enable value.
write : bool, optional (default = True)
Unused for enable evaluation.
index : int, optional (default = -1)
Unused for enable evaluation.
"""
if value != 'parent' and value != 'deps':
old = self.value()
with self._lock:
self._value = value
if old != value and old != 'parent' and old != 'deps':
self.parent._updateBlockEnable()
self.parent.enableChanged(value)
with self.parent.root.updateGroup():
self._queueUpdate()
# The following concept will trigger enable listeners
# directly. This is causing lock contentions in practice
# (epics4 as an example)
#self._doUpdate()
#for var in self._listeners:
# var._doUpdate()
def _doUpdate(self) -> Any:
"""Update dependency state before notifying listeners."""
if len(self._deps) != 0:
oldEn = (self.value() is True)
with self._lock:
self._depDis = not all(x.value() for x in self._deps)
newEn = (self.value() is True)
if oldEn != newEn:
self.parent._updateBlockEnable()
self.parent.enableChanged(newEn)
return super()._doUpdate()
def _rootAttached(self, parent: Any, root: Any) -> None:
"""Attach enable listeners when rooted."""
pr.Node._rootAttached(self,parent,root)
if parent is not root:
parent._parent.enable.addListener(self)
class DeviceError(Exception):
"""Raised for device-level errors."""
pass
[docs]
class Device(pr.Node,rim.Hub):
"""Base class for PyRogue devices.
Parameters
----------
name : str, optional
Device name, defaults to the class name.
description : str, optional (default = "")
Human-readable description.
memBase : rim.MemorySlave, optional
Optional memory interface base, inherited from parent if not provided.
offset : int, optional (default = 0)
Memory offset for the device.
hidden : bool, optional (default = False)
If True, add the device to the ``Hidden`` group.
groups : list[str], optional
Groups to assign.
expand : bool, optional (default = False)
Default GUI expand state.
enabled : bool, optional (default = True)
Initial enable state.
defaults : dict, optional
Default variable values keyed by name.
enableDeps : iterable, optional
Enable dependency variables.
hubMin : int, optional (default = 0)
Hub minimum access size.
hubMax : int, optional (default = 0)
Hub maximum access size.
guiGroup : str, optional
GUI grouping label.
"""
def __init__(
self,
*,
name: str | None = None,
description: str = '',
memBase: rim.MemorySlave | None = None,
offset: int = 0,
hidden: bool = False,
groups: list[str] | None = None,
expand: bool = False,
enabled: bool = True,
defaults: dict | None = None,
enableDeps: Iterable[pr.BaseVariable] | None = None,
hubMin: int = 0,
hubMax: int = 0,
guiGroup: str | None = None,
) -> None:
"""Initialize the device node and associated interfaces."""
if name is None:
name = self.__class__.__name__
# Hub.__init__ must be called first for _setSlave to work below
rim.Hub.__init__(self,offset,hubMin,hubMax)
# Blocks
self._blocks = []
self._custBlocks = []
self._memBase = memBase
self._memLock = threading.RLock()
self._defaults = defaults if defaults is not None else {}
self._ifAndProto = []
self.forceCheckEach = False
self._preWriteListeners = []
# Connect to memory slave
if memBase:
self._setSlave(memBase)
# Node.__init__ can't be called until after self._memBase is created
pr.Node.__init__(self, name=name, hidden=hidden, groups=groups, description=description, expand=expand, guiGroup=guiGroup)
self._log.info("Making device %s", name)
# Convenience methods
self.addRemoteCommands = ft.partial(self.addNodes, pr.RemoteCommand)
# Variable interface to enable flag
self.add(EnableVariable(enabled=enabled, deps=enableDeps))
self.add(pr.LocalCommand(name='ReadDevice', value=False, hidden=True,
function=lambda arg: self.readAndCheckBlocks(recurse=arg),
description='Force read of device without recursion'))
self.add(pr.LocalCommand(name='WriteDevice', value='', hidden=True,
function=lambda arg: self.writeAndVerifyBlocks(force=True,recurse=arg),
description='Force write of device without recursion'))
@pr.expose
@property
def address(self) -> Any:
"""Return the device base address."""
return self._getAddress()
@pr.expose
@property
def offset(self) -> Any:
"""Return the device offset."""
return self._getOffset()
[docs]
def addCustomBlock(self, block: Any) -> None:
"""Add a pre-defined memory block to the device."""
self._custBlocks.append(block)
self._custBlocks.sort(key=lambda x: (x.offset, x.size))
[docs]
def add(self, node: Any) -> None:
"""Add a child node to the device."""
# Call node add
pr.Node.add(self,node)
# Adding device
if isinstance(node,Device):
# Device does not have a membase
if node._memBase is None:
node._setSlave(self)
[docs]
def addInterface(self, *interfaces: Any | Iterable[Any]) -> None:
"""Add stream or memory interfaces to manage.
Parameters
----------
*interfaces : Any
One or more interfaces or iterables of interfaces.
"""
for interface in interfaces:
if isinstance(interface, collections.abc.Iterable):
for item in interface:
self._log.debug("Managing interface/protocol %s", item)
self._ifAndProto.append(item)
else:
self._log.debug("Managing interface/protocol %s", interface)
self._ifAndProto.append(interface)
[docs]
def addProtocol(self, *protocols: Any) -> None:
"""Add protocol entities (alias of ``addInterface``)."""
self.addInterface(protocols)
[docs]
def manage(self, *interfaces: Any) -> None:
"""Manage additional interfaces for start/stop."""
self._ifAndProto.extend(interfaces)
def _start(self) -> None:
""" Called recursively from Root.start when starting """
for intf in self._ifAndProto:
if hasattr(intf,"_start"):
self._log.debug("Starting managed interface/protocol %s", intf)
intf._start()
for d in self.devices.values():
d._start()
def _stop(self) -> None:
"""Called recursively from Root.stop when exiting"""
for intf in self._ifAndProto:
if hasattr(intf,"_stop"):
self._log.debug("Stopping managed interface/protocol %s", intf)
intf._stop()
for d in self.devices.values():
d._stop()
@property
def running(self) -> bool:
"""Return True if the device is running."""
return self.root is not None and self.root.running
[docs]
def addRemoteVariables(
self,
number: int,
stride: int,
pack: bool = False,
**kwargs: Any,
) -> None:
"""Add a repeating block of remote variables.
Parameters
----------
number : int
Number of variables to add.
stride : int
Byte stride between instances.
pack : bool, optional (default = False)
If True, also add a ``LinkVariable`` named ``<name>_All`` which
combines all entries into one underscore-delimited display string.
The indexed ``RemoteVariable`` container remains available at the
original ``name``. The packed alias uses each element variable's
``getDisp()`` and ``setDisp()`` behavior, so formatting follows the
underlying variable type. Values are joined in reverse index order:
for a variable array ``Field[0]``, ``Field[1]``, ``Field[2]``, the
packed value ``Field_All`` reads like
``Field[2]_Field[1]_Field[0]`` and writing ``"1_2_3"`` updates
``Field[2] = 1``, ``Field[1] = 2``, and ``Field[0] = 3``.
This is intended for orthogonal register fields that are convenient
to expose both as individual variables and as one combined value.
**kwargs : Any
Arguments forwarded to ``RemoteVariable``.
"""
if pack:
hidden=True
else:
hidden=kwargs.pop('hidden', False)
self.addNodes(pr.RemoteVariable, number, stride, hidden=hidden, **kwargs)
# If pack specified, create a linked variable to combine everything
if pack:
varList = getattr(self, kwargs['name']).values()
def linkedSet(dev: Any, var: pr.LinkVariable, value: str, write: bool) -> None:
"""Split a packed display string and write each element variable."""
if value == '':
return
values = reversed(value.split('_'))
for variable, item in zip(varList, values):
variable.setDisp(item, write=write)
def linkedGet(dev: Any, var: pr.LinkVariable, read: bool) -> str:
"""Join element display values into one packed underscore string."""
values = [v.getDisp(read=read) for v in varList]
return '_'.join(reversed(values))
name = kwargs.pop('name')
# Preserve the indexed array container at ``name`` and expose the
# packed LinkVariable using the historical ``<name>_All`` alias.
name += '_All'
kwargs.pop('value', None)
lv = pr.LinkVariable(name=name, value='', dependencies=varList, linkedGet=linkedGet, linkedSet=linkedSet, **kwargs)
self.add(lv)
[docs]
def setPollInterval(
self,
interval: float,
variables: Iterable[Any] | None = None,
) -> None:
"""Set the poll interval for a group of variables.
Parameters
----------
interval : float
Polling interval in seconds.
variables : iterable, optional
Iterable of variable names or variables. If ``None``, all
variables with nonzero poll interval are updated.
"""
if variables is None:
variables = [k for k,v in self.variables.items() if v.pollInterval != 0]
for x in variables:
self.node(x).setPollInterval(interval)
[docs]
def hideVariables(
self,
hidden: bool,
variables: Iterable[Any] | None = None,
) -> None:
"""Hide a list of variables or variable names.
Parameters
----------
hidden : bool
True to hide variables, False to show them.
variables : iterable, optional
Iterable of variable names or variables.
"""
if variables is None:
variables=self.variables.values()
for v in variables:
if isinstance(v, pr.BaseVariable):
v.hidden = hidden
elif isinstance(variables[0], str):
self.variables[v].hidden = hidden
[docs]
def initialize(self) -> None:
"""Call ``initialize`` on all child devices."""
for key,value in self.devices.items():
value.initialize()
[docs]
def hardReset(self) -> None:
"""Call ``hardReset`` on all child devices."""
for key,value in self.devices.items():
value.hardReset()
[docs]
def countReset(self) -> None:
"""Call ``countReset`` on all child devices."""
for key,value in self.devices.items():
value.countReset()
[docs]
def enableChanged(self, value: Any) -> None:
"""Hook for reacting to enable state changes."""
pass
#if value is True:
# self.writeAndVerifyBlocks(force=True, recurse=True, variable=None)
[docs]
def addPreWriteListener(
self,
listener: Callable[[str, Any, dict[str, Any]], Any | None],
stateVars: list[pr.BaseVariable] | None = None,
) -> None:
"""Register a device-level pre-write listener.
Fires before any child variable write on this device.
See BaseVariable.addPreWriteListener for callback semantics.
Parameters
----------
listener : callable
Callback: func(path, value, state) -> value | None
stateVars : list of BaseVariable, optional
Variables whose current values are captured into the state dict.
"""
entry = (listener, stateVars or [])
if entry not in self._preWriteListeners:
self._preWriteListeners.append(entry)
[docs]
def delPreWriteListener(
self,
listener: Callable[[str, Any, dict[str, Any]], Any | None],
) -> None:
"""Remove a device-level pre-write listener.
Parameters
----------
listener : callable
The callback to remove.
"""
self._preWriteListeners = [
(cb, sv) for (cb, sv) in self._preWriteListeners if cb is not listener
]
[docs]
def writeBlocks(
self,
*,
force: bool = False,
recurse: bool = True,
variable: Any | None = None,
checkEach: bool = False,
index: int = -1,
**kwargs: Any,
) -> None:
"""Write all blocks held by this device.
Parameters
----------
force : bool, optional (default = False)
Force the write even if values are unchanged.
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to write.
checkEach : bool, optional (default = False)
Perform per-variable verification checks.
index : int, optional (default = -1)
Optional index for array variables.
**kwargs : Any
Additional arguments passed through to the transaction.
"""
checkEach = checkEach or self.forceCheckEach
if variable is not None:
pr.startTransaction(variable._block, type=rim.Write, forceWr=force, checkEach=checkEach, variable=variable, index=index, **kwargs)
else:
for block in self._blocks:
if block.bulkOpEn:
pr.startTransaction(block, type=rim.Write, forceWr=force, checkEach=checkEach, **kwargs)
if recurse:
for key,value in self.devices.items():
value.writeBlocks(force=force, recurse=True, checkEach=checkEach, **kwargs)
[docs]
def verifyBlocks(
self,
*,
recurse: bool = True,
variable: Any | None = None,
checkEach: bool = False,
**kwargs: Any,
) -> None:
"""Verify blocks in the background.
Parameters
----------
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to verify.
checkEach : bool, optional (default = False)
Perform per-variable verification checks.
**kwargs : Any
Additional arguments passed through to the transaction.
"""
checkEach = checkEach or self.forceCheckEach
if variable is not None:
pr.startTransaction(variable._block, type=rim.Verify, checkEach=checkEach, **kwargs) # Verify range is set by previous write
else:
for block in self._blocks:
if block.bulkOpEn:
pr.startTransaction(block, type=rim.Verify, checkEach=checkEach, **kwargs)
if recurse:
for key,value in self.devices.items():
value.verifyBlocks(recurse=True, checkEach=checkEach, **kwargs)
[docs]
def readBlocks(
self,
*,
recurse: bool = True,
variable: Any | None = None,
checkEach: bool = False,
index: int = -1,
**kwargs: Any,
) -> None:
"""Read blocks in the background.
Parameters
----------
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to read.
checkEach : bool, optional (default = False)
Perform per-variable verification checks.
index : int, optional (default = -1)
Optional index for array variables.
**kwargs : Any
Additional arguments passed through to the transaction.
"""
checkEach = checkEach or self.forceCheckEach
if variable is not None:
pr.startTransaction(variable._block, type=rim.Read, checkEach=checkEach, variable=variable, index=index, **kwargs)
else:
for block in self._blocks:
if block.bulkOpEn:
pr.startTransaction(block, type=rim.Read, checkEach=checkEach, **kwargs)
if recurse:
for key,value in self.devices.items():
value.readBlocks(recurse=True, checkEach=checkEach, **kwargs)
[docs]
def checkBlocks(
self,
*,
recurse: bool = True,
variable: Any | None = None,
**kwargs: Any,
) -> None:
"""Check block transactions and notify variable listeners.
Parameters
----------
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to check.
**kwargs : Any
Additional arguments passed through to the transaction.
"""
if variable is not None:
pr.checkTransaction(variable._block, **kwargs)
else:
for block in self._blocks:
pr.checkTransaction(block, **kwargs)
if recurse:
for key,value in self.devices.items():
value.checkBlocks(recurse=True, **kwargs)
[docs]
def writeAndVerifyBlocks(
self,
force: bool = False,
recurse: bool = True,
variable: Any | None = None,
checkEach: bool = False,
) -> None:
"""Write, verify, and check all blocks.
Parameters
----------
force : bool, optional (default = False)
Force the write even if values are unchanged.
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to write/verify.
checkEach : bool, optional (default = False)
Perform per-variable verification checks.
"""
self.writeBlocks(force=force, recurse=recurse, variable=variable, checkEach=checkEach)
self.verifyBlocks(recurse=recurse, variable=variable, checkEach=checkEach)
self.checkBlocks(recurse=recurse, variable=variable)
[docs]
def readAndCheckBlocks(
self,
recurse: bool = True,
variable: Any | None = None,
checkEach: bool = False,
) -> None:
"""Read and check all blocks.
Parameters
----------
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to read.
checkEach : bool, optional (default = False)
Perform per-variable verification checks.
"""
self.readBlocks(recurse=recurse, variable=variable, checkEach=checkEach)
self.checkBlocks(recurse=recurse, variable=variable)
[docs]
@pr.expose
def saveYaml(
self,
name: str | None,
readFirst: bool,
modes: list[str] = ['RW', 'RO', 'WO'],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
autoPrefix: str = '',
autoCompress: bool = False,
) -> bool:
"""Save YAML configuration or status to a file.
Parameters
----------
name : str, optional
Destination file path. If empty, a timestamped name is generated.
readFirst : bool
Read values from hardware before exporting.
modes : list['RW' | 'WO' | 'RO'], optional (default = ['RW', 'RO', 'WO'])
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
autoPrefix : str, optional
Prefix for auto-generated filenames.
autoCompress : bool, optional
Generate a ``.zip`` file when auto-generating names. Default False
Returns
-------
bool
Returns ``True`` when export completes.
"""
# Auto generate name if no arg
if name is None or name == '':
name = datetime.datetime.now().strftime(autoPrefix + "_%Y%m%d_%H%M%S.yml")
if autoCompress:
name += '.zip'
yml = self.getYaml(readFirst=readFirst,modes=modes,incGroups=incGroups,excGroups=excGroups, recurse=True)
if name.split('.')[-1] == 'zip':
with zipfile.ZipFile(name, 'w', compression=zipfile.ZIP_LZMA) as zf:
with zf.open(os.path.basename(name[:-4]),'w') as f:
f.write(yml.encode('utf-8'))
else:
with open(name,'w') as f:
f.write(yml)
return True
[docs]
def loadYaml(
self,
name: str | list[str],
writeEach: bool,
modes: list[str],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
) -> bool:
"""Load YAML configuration from files or directories.
Parameters
----------
name : str or list[str]
Input file, directory, zip-path, or list of those entries.
writeEach : bool
Write each variable as it is applied.
modes : list['RW' | 'WO' | 'RO']
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
Returns
-------
bool
Returns ``True`` when load completes.
"""
# Pass arg is a python list
if isinstance(name,list):
rawlst = name
# Passed arg is a comma separated list of files
elif ',' in name:
rawlst = name.split(',')
# Not a list
else:
rawlst = [name]
# Init final list
lst = []
# Iterate through raw list and look for directories
for rl in rawlst:
# Name ends with .yml or .yaml
if rl[-4:] == '.yml' or rl[-5:] == '.yaml':
lst.append(rl)
# Entry is a zip file directory
elif '.zip' in rl:
base = rl.split('.zip')[0] + '.zip'
sub = rl.split('.zip')[1][1:]
# Open zipfile
with zipfile.ZipFile(base, 'r', compression=zipfile.ZIP_LZMA) as myzip:
# Check if passed name is a directory, otherwise generate an error
if not any(x.startswith("%s/" % sub.rstrip("/")) for x in myzip.namelist()):
raise Exception("loadYaml: Invalid load file: {}, must be a directory or end in .yml or .yaml".format(rl))
else:
zip_yaml = []
# Iterate through directory contents
for zfn in myzip.namelist():
# Filter by base directory
if zfn.find(sub) == 0:
spt = zfn.split('%s/' % sub.rstrip('/'))[1]
# Entry ends in .yml or *.yml and is in current directory
if '/' not in spt and (spt[-4:] == '.yml' or spt[-5:] == '.yaml'):
zip_yaml.append(base + '/' + zfn)
# Keep zip-directory loads aligned with normal directory
# loads by applying a lexicographic pathname sort.
lst.extend(sorted(zip_yaml))
# Entry is a directory
elif os.path.isdir(rl):
dlst = glob.glob('{}/*.yml'.format(rl))
dlst.extend(glob.glob('{}/*.yaml'.format(rl)))
lst.extend(sorted(dlst))
# Not a zipfile, not a directory and does not end in .yml
else:
raise Exception("loadYaml: Invalid load file: {}, must be a directory or end in .yml or .yaml".format(rl))
self._log.info(
"Loading YAML config from %s file(s), writeEach=%s, modes=%s, incGroups=%s, excGroups=%s",
len(lst),
writeEach,
modes,
incGroups,
excGroups,
)
# Read each file
with self.root.pollBlock(), self.root.updateGroup():
for fn in lst:
self._log.debug("Applying YAML config file %s", fn)
d = pr.yamlToData(fName=fn)
self._applyYamlDict(d=d,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups)
if not writeEach:
self._log.info("Committing staged YAML config to hardware")
self._writeConfig()
return True
[docs]
def setYaml(
self,
yml: str,
writeEach: bool,
modes: list[str],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
) -> None:
"""Set variable values from YAML text.
Parameters
----------
yml : str
YAML text containing values to apply.
writeEach : bool
Write each variable as it is applied.
modes : list['RW' | 'WO' | 'RO']
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
"""
d = pr.yamlToData(yml)
self._log.info(
"Applying YAML text config, writeEach=%s, modes=%s, incGroups=%s, excGroups=%s",
writeEach,
modes,
incGroups,
excGroups,
)
with self.root.pollBlock(), self.root.updateGroup():
self._applyYamlDict(d=d,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups)
if not writeEach:
self._log.info("Committing staged YAML text config to hardware")
self._writeConfig()
def _applyYamlDict(
self,
d: dict[str, Any],
writeEach: bool,
modes: list[str],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
) -> None:
"""Apply a parsed YAML dictionary relative to this device.
For each top-level key in the dictionary, if the key matches this
device's name the value is treated as the device's children.
Otherwise the key is resolved as a direct child via ``nodeMatch``.
Root overrides this method to resolve absolute dotted paths.
Parameters
----------
d : dict[str, object]
Parsed YAML dictionary to apply.
writeEach : bool
Write each variable as it is applied.
modes : list['RW' | 'WO' | 'RO']
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
"""
for key, value in d.items():
if key == self.name:
self._setDict(d=value,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups,keys=None)
else:
nodes, keys = self.nodeMatch(key)
if len(nodes) == 0:
self._log.error("Entry %s not found", key)
else:
for n in nodes:
if n.filterByGroup(incGroups, excGroups):
n._setDict(value,writeEach,modes,incGroups,excGroups,keys)
def _writeConfig(self) -> None:
"""Write staged configuration to hardware for this device subtree.
Root overrides this method to use the full-tree write path.
"""
force = self.root.ForceWrite.value() if hasattr(self.root, 'ForceWrite') else False
self._log.info("Start device config write (forceWrite=%s)", force)
self.writeBlocks(force=force, recurse=True)
self.verifyBlocks(recurse=True)
self.checkBlocks(recurse=True)
self._log.info("Done device config write")
def _updateBlockEnable(self) -> None:
"""Propagate effective enable state to this device's blocks."""
for block in self._blocks:
block.setEnable(self.enable.value() is True)
for key,value in self.devices.items():
value._updateBlockEnable()
def _validateMemBase(self, remVars: list) -> None:
"""Raise DeviceError if this device has remote variables/commands but no memBase in its ancestor chain.
A device added directly to Root without an explicit memBase will have Root
as its memory slave (via _setSlave), but Root has no upstream memory path.
This produces silent timeouts; raise early instead.
Parameters
----------
remVars : list
List of RemoteVariable/RemoteCommand nodes already collected in _buildBlocks.
"""
if not remVars:
return
# Walk up the parent chain looking for an ancestor with an explicit memBase.
# If we reach Root without finding one, the device has no memory path.
# Root sets self._parent = self, so detect that self-cycle to avoid infinite loops.
node = self
while node is not None:
if node._memBase is not None:
return # Found a memBase — memory path is valid
if node._parent is node:
raise DeviceError(
f"Device '{self.path}' has RemoteVariables or RemoteCommands but no memBase. "
f"Pass memBase= when creating this device or when adding it to the tree."
)
node = node._parent
def _buildBlocks(self) -> None:
"""Build and attach memory blocks for local/remote variables."""
remVars = []
# Use min block size, larger blocks can be pre-created
blkSize = self._blkMinAccess()
# Process all of the variables
for k,n in self.nodes.items():
# Local variables have a 1:1 block association
if isinstance(n,pr.LocalVariable):
self._blocks.append(n._block)
# Align to min access, create list of remote variables
elif isinstance(n,pr.RemoteVariable) and n.offset is not None:
self._log.info(
"Before shift variable %s offset=%s bitSize=%s bytes=%s",
n.name,
n.offset,
n.bitSize,
n.varBytes,
)
n._updatePath(n.path)
n._shiftOffsetDown(n.offset % blkSize, blkSize)
remVars += [n]
self._log.info(
"Creating variable %s offset=%s bitSize=%s bytes=%s",
n.name,
n.offset,
n.bitSize,
n.varBytes,
)
self._validateMemBase(remVars)
# Sort var list by offset, size
remVars.sort(key=lambda x: (x.offset, x.varBytes))
blocks = []
blk = None
# Go through sorted variable list, look for overlaps, group into blocks
for n in remVars:
if blk is not None and ( (blk['offset'] + blk['size']) > n.offset):
self._log.info(
"Overlap detected var offset=%s block offset=%s block bytes=%s",
n.offset,
blk['offset'],
blk['size'],
)
n._shiftOffsetDown(n.offset - blk['offset'], blkSize)
blk['vars'].append(n)
if n.varBytes > blk['size']:
blk['size'] = n.varBytes
# We need a new block for this variable
else:
blk = None
# Look for pre-made block which overlaps
for b in self._custBlocks:
if ( (n.offset >= b.offset) and ((b.offset + b.size) > n.offset)):
# Just in case a variable extends past the end of pre-made block, user mistake
if n.varBytes > b.size:
msg = 'Failed to add variable {n.name} to pre-made block with offset {b.offset} and size {b.size}'
raise pr.MemoryError(name=self.path, address=self.address, msg=msg)
blk = {'offset':b.offset, 'size':b.size, 'vars':[n], 'block':b}
break
# Block not found
if blk is None:
blk = {'offset':n.offset, 'size':n.varBytes, 'vars':[n], 'block':None}
blocks.append(blk)
# Clear pre-made list
self._custBlocks = []
# Create new blocks and add new and pre-made blocks to device
# Add variables to the block
for b in blocks:
# Create new block
if b['block'] is None:
newBlock = rim.Block(b['offset'], b['size'])
self._log.debug("Adding new block at offset %#02x, size %s", b['offset'], b['size'])
else:
newBlock = b['block']
# Set memory slave
newBlock._setSlave(self)
# Verify the block is not too small or large for the memory interface
if newBlock.size > self._blkMaxAccess() or newBlock.size < self._blkMinAccess():
msg = f'Block size {newBlock.size} is not in the range: {self._blkMinAccess()} - {self._blkMaxAccess()}'
raise pr.MemoryError(name=self.path, address=self.address, msg=msg)
# Add variables to the block
newBlock.addVariables(b['vars'])
# Set varible block links
for v in b['vars']:
v._block = newBlock
# Add to device
self._blocks.append(newBlock)
newBlock.setEnable(self.enable.value() is True)
def _rootAttached(self, parent: pr.Node, root: pr.Root) -> None:
"""Attach this device into the rooted tree and build variable blocks."""
pr.Node._rootAttached(self, parent, root)
for key,value in self._nodes.items():
value._rootAttached(self,root)
self._buildBlocks()
# Override defaults as dictated by the _defaults dict
for varName, defValue in self._defaults.items():
nodes,keys = self.nodeMatch(varName)
if keys is None:
for var in nodes:
var._default = defValue
def _setTimeout(self, timeout: float) -> None:
"""Set transaction timeout on this device, blocks, and child devices.
Parameters
----------
timeout : float
Timeout in seconds.
"""
for block in self._blocks:
block._setTimeout(int(timeout*1000000))
rim.Master._setTimeout(self, int(timeout*1000000))
for key,value in self._nodes.items():
if isinstance(value,Device):
value._setTimeout(timeout)
[docs]
def command(self, **kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Decorator to add inline methods as commands.
The decorated function is used as ``LocalCommand(function=...)``.
The command wrapper supplies keyword arguments ``root``, ``dev``,
``cmd``, and ``arg``; the function may accept any subset.
"""
def _decorator(func: Callable[..., Any]) -> Callable[..., Any]:
"""Wrap and register a method as a LocalCommand."""
if 'name' not in kwargs:
kwargs['name'] = func.__name__
self.add(pr.LocalCommand(function=func, **kwargs))
return func
return _decorator
[docs]
def linkVariableGet(self, **kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Decorator to add inline ``linkedGet`` functions.
The decorated function is used as ``LinkVariable(linkedGet=...)``.
The linked-get wrapper supplies keyword arguments ``dev``, ``var``,
``read``, ``index``, and ``check``; the function may accept any subset.
"""
def _decorator(func: Callable[..., Any]) -> Callable[..., Any]:
"""Wrap and register a function as LinkVariable linkedGet."""
if 'name' not in kwargs:
kwargs['name'] = func.__name__
self.add(pr.LinkVariable(linkedGet=func, **kwargs))
return func
return _decorator
[docs]
def genDocuments(
self,
path: str,
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
) -> None:
"""Generate Sphinx documentation pages for this device.
Parameters
----------
path : str
Output directory path.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
"""
with open(path + '/' + self.path.replace('.','_') + '.rst','w') as file:
print("****************************",file=file)
print(self.name,file=file)
print("****************************",file=file)
print('',file=file)
print(pr.genDocDesc(self.description,0),file=file)
print('',file=file)
plist = self.getNodes(typ=pr.Process,incGroups=incGroups,excGroups=excGroups)
dlist = self.devicesByGroup(incGroups=incGroups,excGroups=excGroups)
vlist = self.variablesByGroup(incGroups=incGroups,excGroups=excGroups)
clist = self.commandsByGroup(incGroups=incGroups,excGroups=excGroups)
tlist = {k:v for k,v in dlist.items() if k not in plist}
if len(tlist) > 0:
print(".. toctree::",file=file)
print(" :maxdepth: 1",file=file)
print(" :caption: Sub Devices:",file=file)
print('',file=file)
for k,v in tlist.items():
print(' ' + v.path.replace('.','_'),file=file)
v.genDocuments(path,incGroups,excGroups)
print('',file=file)
if len(plist) > 0:
print(".. toctree::",file=file)
print(" :maxdepth: 1",file=file)
print(" :caption: Processes:",file=file)
print('',file=file)
for k,v in plist.items():
print(' ' + v.path.replace('.','_'),file=file)
v.genDocuments(path,incGroups,excGroups)
print('',file=file)
print("Summary",file=file)
print("#######",file=file)
print('',file=file)
if len(vlist):
print("Variable List",file=file)
print("*************",file=file)
print('',file=file)
for k,v in vlist.items():
print(f"* {k}",file=file)
print('',file=file)
if len(clist):
print("Command List",file=file)
print("*************",file=file)
print('',file=file)
for k,v in clist.items():
print(f"* {k}",file=file)
print('',file=file)
print("Details",file=file)
print("#######",file=file)
print('',file=file)
if len(vlist):
print("Variables",file=file)
print("*********",file=file)
print('',file=file)
for k,v in vlist.items():
v._genDocs(file=file)
print('',file=file)
if len(clist):
print("Commands",file=file)
print("********",file=file)
print('',file=file)
for k,v in clist.items():
v._genDocs(file=file)
print('',file=file)
class ArrayDevice(Device):
"""Device that instantiates an array of sub-devices.
Parameters
----------
arrayClass : object
Class to instantiate for each element.
number : int
Number of devices in the array.
stride : int, optional (default = 0)
Address stride between array elements.
arrayArgs : object, optional
Per-element argument overrides.
**kwargs : Any
Additional arguments forwarded to ``Device``.
"""
def __init__(
self,
*,
arrayClass: Any,
number: int,
stride: int = 0,
arrayArgs: Any | None = None,
**kwargs: Any,
) -> None:
"""Initialize and populate an array-style device container."""
if 'name' not in kwargs:
kwargs['name'] = f'{arrayClass.__name__}Array'
super().__init__(**kwargs)
if arrayArgs is None:
arrayArgs = [{} for x in range(number)]
elif isinstance(arrayArgs, dict):
arrayArgs = [arrayArgs.copy() for x in range(number)]
for i in range(number):
args = arrayArgs[i]
if 'name' in args:
name = args.pop('name')
else:
name = f'{arrayClass.__name__}[{i}]'
self.add(arrayClass(
name=name,
offset=i*stride,
**args))