#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Variable Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations
import ast
import re
import shlex
import sys
import threading
import time
from typing import Any, Callable, Type
import numpy as np
import pyrogue as pr
import rogue.interfaces.memory as rim
from collections import OrderedDict as odict
from collections.abc import Iterable
class VariableError(Exception):
"""Raised when variable configuration or access fails."""
pass
class VariableWaitClass(object):
"""Wait for variable conditions to become true.
Pass a variable or list of variables, and a test function.
The test function is passed a list containing the current
variableValue state indexed by position as passed in the wait list.
Each variableValue entry has an additional field updated which indicates
if the variable has refreshed while waiting. This can be used to trigger
on any update to the variable, regardless of value.
i.e. w = VariableWaitClass([root.device.var1,root.device.var2],
lambda varValues: varValues[0].value >= 10 and \
varValues[1].value >= 20)
w.wait()
i.e. w = VariableWaitClass([root.device.var1,root.device.var2],
lambda varValues: varValues[0].updated and \
varValues[1].updated)
w.wait()
If no function is provided, the class will return when all variables are updated,
regardless of value.
The routine wait class can be used multiple times with subsequent calls using arm() to trigger:
w.wait()
w.arm()
w.wait()
getValues() can be called to get the final version of the tiles after the test passed.
Parameters
----------
varList : object or list
Variable or list of variables to monitor.
testFunction : callable, optional
Predicate callback of the form
``testFunction(varValues: list[VariableValue]) -> bool``.
timeout : int or float, optional (default = 0)
Time in seconds to wait before timing out. ``0`` disables timeout.
"""
def __init__(
self,
varList: pr.BaseVariable | list[pr.BaseVariable],
testFunction: Callable[[list["VariableValue"]], bool] | None = None,
timeout: float = 0,
) -> None:
"""Initialize a wait condition across one or more variables."""
self._values = odict()
self._updated = odict()
self._cv = threading.Condition()
self._testFunc = testFunction
self._timeout = timeout
# Convert single variable to a list
if not isinstance(varList,list):
self._vlist = [varList]
else:
self._vlist = varList
self.arm()
def arm(self) -> None:
"""Arm the wait by registering variable listeners."""
with self._cv:
for v in self._vlist:
v.addListener(self._varUpdate)
self._values[v.path] = v.getVariableValue(read=False)
self._updated[v.path] = False
def wait(self) -> bool:
"""Wait until the condition is met or timeout occurs."""
start = time.time()
with self._cv:
ret = self._check()
# Run until timeout or all conditions have been met
while (not ret) and ((self._timeout == 0) or ((time.time()-start) < self._timeout)):
self._cv.wait(0.5)
ret = self._check()
# Cleanup
for v in self._vlist:
v.delListener(self._varUpdate)
return ret
def get_values(self) -> dict:
"""Return the latest collected variable values."""
return {k: self._values[k] for k in self._vlist}
def _varUpdate(self, path: str, varValue: Any) -> None:
"""Listener callback used to capture variable updates."""
with self._cv:
if path in self._values:
self._values[path] = varValue
self._updated[path] = True
self._cv.notify()
def _check(self) -> bool:
"""Evaluate wait completion state using callback or update flags."""
if self._testFunc is not None:
return (self._testFunc(list(self._values.values())))
else:
return (all(self._updated.values()))
def VariableWait(
varList: Any,
testFunction: Callable[[list["VariableValue"]], bool] | None = None,
timeout: float = 0,
) -> bool:
"""
Wait for a number of variable conditions to be true.
Pass a variable or list of variables, and a test function.
The test function is passed a list containing the current
variableValue state indexed by position as passed in the wait list.
Each variableValue entry has an additional field updated which indicates
if the variable has refreshed while waiting. This can be used to trigger
on any update to the variable, regardless of value.
i.e. w = VariableWait([root.device.var1,root.device.var2],
lambda varValues: varValues[0].value >= 10 and \
varValues[1].value >= 20)
i.e. w = VariableWait([root.device.var1,root.device.var2],
lambda varValues: varValues[0].updated and \
varValues[1].updated)
If no function is provided, the class will return when all variables are updated,
regardless of value.
Parameters
----------
varList :
List of variables to monitor.
testFunction :
Function of the form
``testFunction(varValues: list[VariableValue]) -> bool``.
Use ``None`` to trigger on update only.
timeout : int, optional (default = 0)
Timeout in seconds.
Returns
-------
bool
True if conditions were met, False if there was a timeout.
"""
wc = VariableWaitClass(varList, testFunction, timeout)
return wc.wait()
class VariableValue(object):
"""Snapshot of a variable value with display metadata."""
def __init__(self, var: Any, read: bool = False, index: int = -1) -> None:
"""Capture value, display text, and alarm metadata for a variable."""
self.value = var.get(read=read,index=index)
self.valueDisp = var.genDisp(self.value)
self.disp = var.disp
self.enum = var.enum
self.status, self.severity = var._alarmState(self.value)
def __repr__(self) -> str:
"""Return a debug representation of stored value metadata."""
return f'{str(self.__class__)}({self.__dict__})'
class VariableListData(object):
"""Container describing list-type variable storage."""
def __init__(self, numValues: int, valueBits: int, valueStride: int) -> None:
"""Initialize list-shape metadata for array variables."""
self.numValues = numValues
self.valueBits = valueBits
self.valueStride = valueStride
def __repr__(self) -> str:
"""Return a debug representation of list-shape metadata."""
return f'{self.__class__}({self.__dict__})'
[docs]
class BaseVariable(pr.Node):
"""Base class for variables in the PyRogue tree.
Parameters
----------
name : str
Variable name.
description : str, optional (default = "")
Human-readable description.
mode : str, optional (default = "RW")
Access mode: ``RW``, ``RO``, or ``WO``.
value : object, optional
Default value for the variable.
disp : object, optional (default = "{}")
Display formatter or enumeration mapping.
enum : dict, optional
Mapping from object values to display strings.
units : str, optional
Engineering units.
hidden : bool, optional (default = False)
If True, add the variable to the ``Hidden`` group.
groups : list[str], optional
Groups to assign.
minimum : object, optional
Minimum allowed value.
maximum : object, optional
Maximum allowed value.
lowWarning : object, optional
Low warning threshold.
lowAlarm : object, optional
Low alarm threshold.
highWarning : object, optional
High warning threshold.
highAlarm : object, optional
High alarm threshold.
pollInterval : object, optional (default = 0)
Polling interval in seconds.
updateNotify : bool, optional (default = True)
Enable update notifications.
typeStr : str, optional (default = "Unknown")
Type string for display.
bulkOpEn : bool, optional (default = True)
Enable bulk operations.
offset : int, optional (default = 0)
Offset used by remote variables.
guiGroup : str, optional
GUI grouping label.
**kwargs : Any
Additional attributes.
"""
PROPS = ['name', 'path', 'mode', 'typeStr', 'enum',
'disp', 'precision', 'units', 'minimum', 'maximum',
'nativeType', 'ndType', 'updateNotify',
'hasAlarm', 'lowWarning', 'highWarning', 'lowAlarm',
'highAlarm', 'alarmStatus', 'alarmSeverity', 'pollInterval']
def __init__(
self,
*,
name: str,
description: str = '',
mode: str = 'RW',
value: Any = None,
disp: Any = '{}',
enum: dict[object, str] | None = None,
units: str | None = None,
hidden: bool = False,
groups: list[str] | None = None,
minimum: Any | None = None,
maximum: Any | None = None,
lowWarning: Any | None = None,
lowAlarm: Any | None = None,
highWarning: Any | None = None,
highAlarm: Any | None = None,
pollInterval: Any = 0,
updateNotify: bool = True,
typeStr: str = 'Unknown',
bulkOpEn: bool = True,
offset: int = 0,
guiGroup: str | None = None,
**kwargs: Any,
) -> None:
"""Initialize a base variable."""
# Public Attributes
self._bulkOpEn = bulkOpEn
self._updateNotify = updateNotify
self._mode = mode
self._units = units
self._minimum = minimum
self._maximum = maximum
self._lowWarning = lowWarning
self._lowAlarm = lowAlarm
self._highWarning = highWarning
self._highAlarm = highAlarm
self._default = value
self._typeStr = typeStr
self._block = None
self._pollInterval = pollInterval
self._nativeType = None
self._ndType = None
self._extraAttr = kwargs
self._listeners = []
self.__functions = []
self.__dependencies = []
# Build enum if specified
self._disp = disp
self._enum = enum
if isinstance(disp, dict):
self._enum = disp
elif isinstance(disp, list):
self._enum = {k:str(k) for k in disp}
elif isinstance(value, bool) and enum is None:
self._enum = {False: 'False', True: 'True'}
if self._enum is not None:
self._disp = 'enum'
# Create inverted enum
self._revEnum = None
if self._enum is not None:
self._revEnum = {v:k for k,v in self._enum.items()}
# Auto determine types
if value is not None:
self._nativeType = type(value)
if self._enum is not None and (self._nativeType is np.ndarray or self._nativeType is list or self._nativeType is dict):
raise VariableError(f"Invalid use of enum with value of type {self._nativeType}")
if self._nativeType is np.ndarray:
self._ndType = value.dtype
self._typeStr = f'{value.dtype}{value.shape}'
elif self._typeStr == 'Unknown':
self._typeStr = value.__class__.__name__
# Check modes
if (self._mode != 'RW') and (self._mode != 'RO') and \
(self._mode != 'WO'):
raise VariableError(f'Invalid variable mode {self._mode}. Supported: RW, RO, WO')
# Call super constructor
pr.Node.__init__(self, name=name, description=description, hidden=hidden, groups=groups, guiGroup=guiGroup)
@pr.expose
@property
def enum(self) -> dict[object, str] | None:
"""Return the enum mapping, if any."""
return self._enum
@property
def enumYaml(self) -> str:
"""Return the enum mapping as YAML."""
return pr.dataToYaml(self._enum)
@pr.expose
@property
def revEnum(self) -> dict[str, object] | None:
"""Return the reverse enum mapping, if available."""
return self._revEnum
@pr.expose
@property
def typeStr(self) -> str:
"""Return the type string for this variable."""
if self._typeStr == 'Unknown':
v = self.value()
if self.nativeType is np.ndarray:
self._typeStr = f'{v.dtype}{v.shape}'
else:
self._typeStr = v.__class__.__name__
return self._typeStr
@pr.expose
@property
def disp(self) -> str:
"""Return the display formatter."""
return self._disp
@pr.expose
@property
def precision(self) -> int:
"""Return the display precision for float-like values."""
if self.nativeType is float or self.nativeType is np.ndarray:
res = re.search(r':([0-9])\.([0-9]*)f',self._disp)
try:
return int(res[2])
except Exception:
return 8
else:
return 0
@pr.expose
@property
def mode(self) -> str:
"""Return the access mode (``RW``, ``RO``, ``WO``)."""
return self._mode
@pr.expose
@property
def units(self) -> str | None:
"""Return the engineering units."""
return self._units
@pr.expose
@property
def minimum(self) -> Any | None:
"""Return the minimum allowed value."""
return self._minimum
@pr.expose
@property
def maximum(self) -> Any | None:
"""Return the maximum allowed value."""
return self._maximum
@pr.expose
@property
def hasAlarm(self) -> bool:
"""Return True if any alarm thresholds are configured."""
return (self._lowWarning is not None or self._lowAlarm is not None or self._highWarning is not None or self._highAlarm is not None)
@pr.expose
@property
def lowWarning(self) -> Any | None:
"""Return the low warning threshold."""
return self._lowWarning
@pr.expose
@property
def lowAlarm(self) -> Any | None:
"""Return the low alarm threshold."""
return self._lowAlarm
@pr.expose
@property
def highWarning(self) -> Any | None:
"""Return the high warning threshold."""
return self._highWarning
@pr.expose
@property
def highAlarm(self) -> Any | None:
"""Return the high alarm threshold."""
return self._highAlarm
@pr.expose
@property
def alarmStatus(self) -> str:
"""Return the current alarm status."""
stat,sevr = self._alarmState(self.value())
return stat
@pr.expose
@property
def alarmSeverity(self) -> str:
"""Return the current alarm severity."""
stat,sevr = self._alarmState(self.value())
return sevr
[docs]
def properties(self) -> dict[str, Any]:
"""Return a dictionary of properties and current value."""
d = odict()
d['value'] = self.value()
d['valueDisp'] = self.valueDisp()
d['class'] = self.__class__.__name__
for p in self.PROPS:
d[p] = getattr(self, p)
return d
[docs]
def addDependency(self, dep: BaseVariable) -> None:
"""Add a dependency variable.
When this variable changes, the dependency variable will be notified.
Parameters
----------
dep : BaseVariable
The dependency variable to add.
"""
if dep not in self.__dependencies:
self.__dependencies.append(dep)
dep.addListener(self)
@pr.expose
@property
def pollInterval(self) -> float:
"""Return the poll interval."""
return self._pollInterval
[docs]
@pr.expose
def setPollInterval(self, interval: float) -> None:
"""Set the poll interval.
Parameters
----------
interval : object
Poll interval to use.
"""
self._log.debug(f'{self.path}.setPollInterval({interval}]')
self._pollInterval = interval
self._updatePollInterval()
@pr.expose
@property
def lock(self) -> threading.Lock | None:
"""Return the underlying lock, if available."""
if self._block is not None:
return self._block._lock
else:
return None
@pr.expose
@property
def updateNotify(self) -> bool:
"""Return True if update notifications are enabled."""
return self._updateNotify
@property
def dependencies(self) -> list[BaseVariable]:
"""Return registered dependency variables."""
return self.__dependencies
[docs]
def addListener(self, listener: BaseVariable | Callable[[str, VariableValue], None]) -> None:
"""
Add a listener Variable or function to call when variable changes.
This is useful when chaining variables together. (ADC conversions, etc)
The variable and value class are passed as an arg: func(path,varValue)
Parameters
----------
listener : BaseVariable or Callable[[str, VariableValue], None]
Variable or callback function to notify.
Returns
-------
None
"""
if isinstance(listener, BaseVariable):
if listener not in self._listeners:
self._listeners.append(listener)
else:
if listener not in self.__functions:
self.__functions.append(listener)
def _addListenerCpp(self, func: Callable[[str, str], None]) -> None:
"""Add a C++/string listener callback.
Parameters
----------
func : callable
Callback of the form ``func(path, valueDisp)`` where both
arguments are strings.
"""
self.addListener(lambda path, varValue: func(path, varValue.valueDisp))
[docs]
def delListener(self, listener: BaseVariable | Callable[[str, VariableValue], None]) -> None:
"""
Remove a listener Variable or function
Parameters
----------
listener : BaseVariable or Callable[[str, VariableValue], None]
Variable or callback function to remove.
Returns
-------
None
"""
if isinstance(listener, BaseVariable):
if listener in self._listeners:
self._listeners.remove(listener)
else:
if listener in self.__functions:
self.__functions.remove(listener)
[docs]
@pr.expose
def set(
self,
value: Any,
*,
index: int = -1,
write: bool = True,
verify: bool = True,
check: bool = True,
) -> None:
"""
Set the value and write to hardware if applicable
Writes to hardware are blocking. An error will result in a logged exception.
Parameters
----------
value : object
Value to set.
index : int, optional (default = -1)
Optional index for array variables.
write : bool, optional (default = True)
If True, perform a write transaction.
verify : bool, optional (default = True)
If True, verify after write.
check : bool, optional (default = True)
If True, wait for the transaction to complete.
Returns
-------
"""
pass
[docs]
@pr.expose
def post(self, value: Any, *, index: int = -1) -> None:
"""
Set the value and write to hardware if applicable using a posted write.
This method does not call through parent.writeBlocks(), but rather
calls on self._block directly.
Parameters
----------
value : object
Value to set.
index : int, optional (default = -1)
Optional index for array variables.
Returns
-------
"""
pass
[docs]
@pr.expose
def get(self, *, index: int = -1, read: bool = True, check: bool = True) -> Any:
"""
Return the value after performing a read from hardware if applicable.
Hardware read is blocking. An error will result in a logged exception.
Listeners will be informed of the update.
Parameters
----------
index : int, optional (default = -1)
Optional index for array variables.
read : bool, optional (default = True)
If True, perform a read transaction.
check : bool, optional (default = True)
If True, check transaction completion.
Returns
-------
"""
return None
[docs]
@pr.expose
def write(self, *, verify: bool = True, check: bool = True) -> None:
"""
Force a write of the variable.
Parameters
----------
verify : bool, optional (default = True)
If True, verify after write.
check : bool, optional (default = True)
If True, check transaction completion.
Returns
-------
"""
pass
[docs]
@pr.expose
def getVariableValue(self, read: bool = True, index: int = -1) -> VariableValue:
"""
Return the value after performing a read from hardware if applicable.
Hardware read is blocking. An error will result in a logged exception.
Listeners will be informed of the update.
Parameters
----------
read : bool, optional (default = True)
If True, perform a read transaction.
index : int, optional (default = -1)
Optional index for array variables.
Returns
-------
VariableValue
"""
return VariableValue(self,read=read,index=index)
[docs]
@pr.expose
def value(self, index: int = -1) -> Any:
"""Return the current value without reading.
Parameters
----------
index : int, optional (default = -1)
Optional index for array variables.
"""
return self.get(read=False, index=index)
[docs]
@pr.expose
def genDisp(self, value: Any, *, useDisp: str | None = None) -> str:
"""Generate a display string for a value.
Parameters
----------
value : object
Value to format for display.
useDisp : str, optional
Display formatter override. If None, use the variable's disp formatter.
Returns
-------
String representation of the Variable value for display.
"""
try:
if useDisp is None:
useDisp=self.disp
if useDisp == '':
return ''
elif isinstance(value,np.ndarray):
return np.array2string(value,
separator=', ',
formatter={'all':useDisp.format},
threshold=sys.maxsize,
max_line_width=1000)
elif useDisp == 'enum':
if value in self.enum:
return self.enum[value]
else:
self._log.warning("Invalid enum value {} in variable '{}'".format(value,self.path))
return f'INVALID: {value}'
else:
return useDisp.format(value)
except Exception as e:
pr.logException(self._log,e)
self._log.error(f"Error generating disp for value {value} with type {type(value)} in variable {self.path}")
raise e
[docs]
@pr.expose
def getDisp(self, read: bool = True, index: int = -1) -> str:
"""Perform a get() and return the display string for the value.
Parameters
----------
read : bool, optional (default = True)
If True, perform a read transaction.
index : int, optional (default = -1)
Optional index for array variables.
Returns
-------
String representation of the Variable value for display.
"""
return self.genDisp(self.get(read=read,index=index))
[docs]
@pr.expose
def valueDisp(self, index: int = -1) -> str: #, read=True, index=-1):
"""Return a display string for the current value without reading.
Parameters
----------
index : int, optional (default = -1)
Optional index for array variables.
Returns
-------
String representation of the Variable value for display.
"""
return self.getDisp(read=False, index=index)
[docs]
@pr.expose
def parseDisp(self, sValue: str) -> object:
"""Parse a string representation of a value into a Python object.
Parameters
----------
sValue : object
Display-formatted value to parse.
Returns
-------
Python object representation of the string value.
"""
try:
if not isinstance(sValue,str):
return sValue
elif self.nativeType is np.ndarray:
return np.array(ast.literal_eval(sValue),self._ndType)
elif self.disp == 'enum':
return self.revEnum[sValue]
elif self.nativeType is str:
return sValue
else:
return ast.literal_eval(sValue)
except Exception as e:
msg = "Invalid value {} for variable {} with type {}: {}".format(sValue,self.name,self.nativeType,e)
self._log.error(msg)
raise VariableError(msg)
[docs]
@pr.expose
def setDisp(self, sValue: str, write: bool = True, index: int = -1) -> None:
"""Set the value of the variable using a string representation of the value.
Parameters
----------
sValue : str
Display-formatted value to set.
write : bool, optional (default = True)
If True, write the value to the hardware.
index : int, optional (default = -1)
Optional index for array variables.
Returns
-------
None
"""
self.set(self.parseDisp(sValue), write=write, index=index)
@property
def nativeType(self) -> Type[object]:
"""Return the native Python type."""
return self._nativeType
@property
def ndType(self) -> Type[np.dtype]:
"""Return the numpy dtype for array variables."""
return self._ndType
@property
def ndTypeStr(self) -> str:
"""Return the numpy dtype as a string."""
return str(self.ndType)
def _setDefault(self) -> None:
""" """
if self._default is not None:
self.setDisp(self._default, write=False)
def _updatePollInterval(self) -> None:
""" """
if self.root is not None and self.root._pollQueue is not None:
self.root._pollQueue.updatePollInterval(self)
def _finishInit(self) -> None:
""" """
# Set the default value but dont write
self._setDefault()
self._updatePollInterval()
# Setup native type
if self._nativeType is None:
v = self.value()
self._nativeType = type(v)
if self._nativeType is np.ndarray:
self._ndType = v.dtype
def _setDict(
self,
d: dict[str, str] | object,
writeEach: bool,
modes: list[str],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
keys: list[str] | None = None,
) -> None:
"""
Set variable values from a dictionary.
Invoked recursively from parent Device's _setDict method.
Override in BaseVariable subclasses to set values using setDisp method.
Parameters
----------
d : dict[str, str] | object
If a dictionary, keys are the indexes of the array variable, values are the values to set.
If a single value, it is set using the Variable's setDisp method.
writeEach : bool
If True, wait for each variable write transaction to complete before setting the next variable.
modes : list['RW' | 'WO' | 'RO']
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
keys : list[str], optional
Keys to include.
If keys is not none, it should only contain one entry
and the variable should be a list or array variable
Returns
-------
None
"""
if keys is not None:
if len(keys) != 1 or (self.nativeType is not list and self.nativeType is not np.ndarray):
self._log.error(f"Entry {self.name} with key {keys} not found")
elif self._mode in modes:
if keys[0] == '*' or keys[0] == ':':
for i in range(self._numValues()):
self.setDisp(d, write=writeEach, index=i)
else:
idxSlice = eval(f'[i for i in range(self._numValues())][{keys[0]}]')
# Single entry item
if ':' not in keys[0]:
self.setDisp(d, write=writeEach, index=idxSlice[0])
# Multi entry item
else:
if isinstance(d,str):
s = shlex.shlex(" " + d.lstrip('[').rstrip(']') + " ",posix=True)
s.whitespace_split=True
s.whitespace=','
else:
s = d
for val,i in zip(s,idxSlice):
self.setDisp(val.strip(), write=writeEach, index=i)
else:
self._log.warning(f"Skipping set for Entry {self.name} with mode {self._mode}. Enabled Modes={modes}.")
# Standard set
elif self._mode in modes:
# Array variables can be set with a dict of index/value pairs
if isinstance(d, dict):
for k,v in d.items():
self.setDisp(v, index=k, write=writeEach)
else:
self.setDisp(d,writeEach)
else:
self._log.warning(f"Skipping set for Entry {self.name} with mode {self._mode}. Enabled Modes={modes}.")
def _getDict(
self,
modes: list[str] = ['RW', 'RO', 'WO'],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
properties: bool = False,
) -> Any | None:
"""
Parameters
----------
modes : list['RW' | 'WO' | 'RO'], optional (default = ['RW', 'RO', 'WO'])
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
Returns
-------
"""
if self._mode in modes:
if properties is False:
return VariableValue(self)
else:
return self.properties()
else:
return None
def _queueUpdate(self) -> None:
""" """
self._root._queueUpdates(self)
def _doUpdate(self) -> None:
""" """
val = VariableValue(self)
for func in self.__functions:
func(self.path,val)
return val
def _alarmState(self, value: Any) -> tuple[str, str]:
"""
Parameters
----------
value :
Returns
-------
type
"""
if isinstance(value,list) or isinstance(value,dict):
return 'None','None'
if (self.hasAlarm is False):
return "None", "None"
elif (self._lowAlarm is not None and value < self._lowAlarm):
return 'AlarmLoLo', 'AlarmMajor'
elif (self._highAlarm is not None and value > self._highAlarm):
return 'AlarmHiHi', 'AlarmMajor'
elif (self._lowWarning is not None and value < self._lowWarning):
return 'AlarmLow', 'AlarmMinor'
elif (self._highWarning is not None and value > self._highWarning):
return 'AlarmHigh', 'AlarmMinor'
else:
return 'Good','Good'
def _genDocs(self, file: Any) -> None:
"""
Parameters
----------
file :
Returns
-------
"""
print(f".. topic:: {self.path}",file=file)
print('',file=file)
print(pr.genDocDesc(self.description,4),file=file)
print('',file=file)
print(pr.genDocTableHeader(['Field','Value'],4,100),file=file)
for a in ['name', 'path', 'enum',
'typeStr', 'disp', 'precision', 'mode', 'units', 'minimum',
'maximum', 'lowWarning', 'lowAlarm', 'highWarning',
'highAlarm', 'alarmStatus', 'alarmSeverity']:
astr = str(getattr(self,a))
if astr != 'None':
print(pr.genDocTableRow([a,astr],4,100),file=file)
LocalSetCallback = Callable[[Any, Any, BaseVariable | None, bool | None], Any]
LocalGetCallback = Callable[[Any, BaseVariable | None], Any]
LinkedSetCallback = Callable[[Any, BaseVariable, Any, bool, int, bool, bool], Any]
LinkedGetCallback = Callable[[Any, BaseVariable, bool, int, bool], Any]
[docs]
class RemoteVariable(BaseVariable,rim.Variable):
"""Remote variable backed by a memory-mapped interface.
Parameters
----------
name
Variable name.
description
Human-readable description.
mode
Access mode: ``RW``, ``RO``, or ``WO``.
value
Default value.
disp
Display formatter or mapping.
enum
Mapping from object values to display strings.
units
Engineering units.
hidden
If True, add the variable to the ``Hidden`` group.
groups
Groups to assign.
minimum
Minimum allowed value.
maximum
Maximum allowed value.
lowWarning
Low warning threshold.
lowAlarm
Low alarm threshold.
highWarning
High warning threshold.
highAlarm
High alarm threshold.
base
Base model instance or model factory.
offset
Memory offset or list of offsets.
numValues
Number of values for array variables.
valueBits
Bits per value for array variables.
valueStride
Bit stride between values.
bitSize
Bit size of the variable.
bitOffset
Bit offset of the variable.
pollInterval
Polling interval.
updateNotify
Enable update notifications.
overlapEn
Allow overlapping variables.
bulkOpEn
Enable bulk operations.
verify
Enable verify on write.
retryCount
Retry count for transactions.
guiGroup
GUI grouping label.
**kwargs
Additional arguments forwarded to ``BaseVariable``.
"""
PROPS = BaseVariable.PROPS + [
'address', 'overlapEn', 'offset', 'bitOffset', 'bitSize',
'verifyEn', 'numValues', 'valueBits', 'valueStride', 'retryCount',
'varBytes', 'bulkEn']
def __init__(self, *,
name: str,
description: str = '',
mode: str = 'RW',
value: Any = None,
disp: Any | None = None,
enum: dict[object, str] | None = None,
units: str | None = None,
hidden: bool = False,
groups: list[str] | None = None,
minimum: Any | None = None,
maximum: Any | None = None,
lowWarning: Any | None = None,
lowAlarm: Any | None = None,
highWarning: Any | None = None,
highAlarm: Any | None = None,
base: pr.Model | Callable[[int], pr.Model] = pr.UInt,
offset: int | list[int],
numValues: int = 0,
valueBits: int = 0,
valueStride: int = 0,
bitSize: int | list[int] = 32,
bitOffset: int | list[int] = 0,
pollInterval: Any = 0,
updateNotify: bool = True,
overlapEn: bool = False,
bulkOpEn: bool = True,
verify: bool = True,
retryCount: int = 0,
guiGroup: str | None = None,
**kwargs: Any) -> None:
"""Initialize a remote variable."""
if disp is None:
disp = base.defaultdisp
self._block = None
# Convert the address parameters into lists
addrParams = [offset, bitOffset, bitSize] # Make a copy
addrParams = [list(x) if isinstance(x, Iterable) else [x] for x in addrParams] # convert to lists
length = max((len(x) for x in addrParams))
addrParams = [x*length if len(x)==1 else x for x in addrParams] # Make single element lists as long as max
offset, bitOffset, bitSize = addrParams # Assign back
# Verify the the list lengths match
if len(offset) != len(bitOffset) != len(bitSize):
raise VariableError('Lengths of offset: {}, bitOffset: {}, bitSize {} must match'.format(offset, bitOffset, bitSize))
# Check for invalid values
if 0 in bitSize:
raise VariableError('BitSize of 0 is invalid')
# Normalize bitOffsets relative to the smallest offset
baseAddr = min(offset)
bitOffset = [x+((y-baseAddr)*8) for x,y in zip(bitOffset, offset)]
offset = baseAddr
if isinstance(base, pr.Model):
self._base = base
elif numValues != 0:
self._base = base(valueBits)
else:
self._base = base(sum(bitSize))
# Apply default min and max
if minimum is None or (self._base.minValue() is not None and minimum < self._base.minValue()):
minimum = self._base.minValue()
if maximum is None or (self._base.maxValue() is not None and maximum > self._base.maxValue()):
maximum = self._base.maxValue()
BaseVariable.__init__(self, name=name, description=description,
mode=mode, value=value, disp=disp,
enum=enum, units=units, hidden=hidden, groups=groups,
minimum=minimum, maximum=maximum, bulkOpEn=bulkOpEn,
lowWarning=lowWarning, lowAlarm=lowAlarm,
highWarning=highWarning, highAlarm=highAlarm,
pollInterval=pollInterval,updateNotify=updateNotify,
guiGroup=guiGroup, **kwargs)
# If numValues > 0 the bit size array must only have one entry
# Auto calculate the total number of bits
if numValues != 0:
self._nativeType = np.ndarray
self._ndType = self._base.ndType
self._typeStr = f'{self.ndType}({numValues},)'
if len(bitSize) != 1:
raise VariableError('BitSize array must have a length of one when numValues > 0')
if valueBits > valueStride:
raise VariableError(f'ValueBits {valueBits} is greater than valueStrude {valueStride}')
# Override the bitSize
bitSize[0] = numValues * valueBits
if self._ndType is None:
raise VariableError(f'Invalid base type {self._base} with numValues = {numValues}')
else:
self._typeStr = self._base.name
listData = VariableListData(numValues,valueBits,valueStride)
# Setup C++ Base class
rim.Variable.__init__(
self,
self._name,
self._mode,
self._minimum,
self._maximum,
offset,
bitOffset,
bitSize,
overlapEn,
verify,
self._bulkOpEn,
self._updateNotify,
self._base,
listData,
retryCount)
##############################
# Properties held by C++ class
##############################
@pr.expose
@property
def address(self) -> int:
"""Return the absolute address of the variable."""
return self._block.address
@property
def numValues(self) -> int:
"""Return the configured number of values for array variables."""
return self._numValues()
@property
def valueBits(self) -> int:
"""Return number of valid bits per array value."""
return self._valueBits()
@property
def valueStride(self) -> int:
"""Return bit-stride between adjacent array values."""
return self._valueStride()
@property
def retryCount(self) -> int:
"""Return transaction retry count for this variable."""
return self._retryCount()
@pr.expose
@property
def varBytes(self) -> int:
"""Return the byte size of the variable."""
return self._varBytes()
@pr.expose
@property
def offset(self) -> int:
"""Return the memory offset."""
return self._offset()
@pr.expose
@property
def overlapEn(self) -> bool:
"""Return True if overlap is enabled."""
return self._overlapEn()
@pr.expose
@property
def bitSize(self) -> int:
"""Return the bit size."""
return self._bitSize()
@pr.expose
@property
def bitOffset(self) -> int:
"""Return the bit offset."""
return self._bitOffset()
@pr.expose
@property
def verifyEn(self) -> bool:
"""Return True if verify is enabled."""
return self._verifyEn()
########################
# Local Properties
########################
@pr.expose
@property
def base(self) -> pr.Model:
"""Return the base model or type."""
return self._base
@pr.expose
@property
def bulkEn(self) -> bool:
"""Return True if bulk operations are enabled."""
return self._bulkOpEn
[docs]
@pr.expose
def set(
self,
value: Any,
*,
index: int = -1,
write: bool = True,
verify: bool = True,
check: bool = True,
) -> None:
"""
Set the value and write to hardware if applicable
Writes to hardware are blocking if check=True, otherwise non-blocking.
A verify will be performed according to self.verifyEn if verify=True
A verify will not be performed if verify=False
An error will result in a logged exception.
Parameters
----------
value : object
Value to set.
index : int, optional (default = -1)
Optional index for array variables.
write : bool, optional (default = True)
If True, perform a write transaction.
verify : bool, optional (default = True)
If True, verify after write.
check : bool, optional (default = True)
If True, check transaction completion.
Returns
-------
"""
try:
# Set value to block
self._set(value,index)
if write:
self._parent.writeBlocks(force=True, recurse=False, variable=self, index=index)
if verify:
self._parent.verifyBlocks(recurse=False, variable=self)
if check:
self._parent.checkBlocks(recurse=False, variable=self)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error setting value '{}' to variable '{}' with type {}. Exception={}".format(value,self.path,self.typeStr,e))
raise e
[docs]
@pr.expose
def post(self, value: Any, *, index: int = -1) -> None:
"""
Set the value and write to hardware if applicable using a posted write.
This method does not call through parent.writeBlocks(), but rather
calls on self._block directly.
Parameters
----------
value : object
Value to set.
index : int, optional (default = -1)
Optional index for array variables.
Returns
-------
"""
try:
# Set value to block
self._set(value,index)
pr.startTransaction(self._block, type=rim.Post, forceWr=False, checkEach=True, variable=self, index=index)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error posting value '{}' to variable '{}' with type {}".format(value,self.path,self.typeStr))
raise e
[docs]
@pr.expose
def get(self, *, index: int = -1, read: bool = True, check: bool = True) -> Any:
"""Return the value, optionally reading from hardware.
Hardware read is blocking if check=True, otherwise non-blocking.
An error will result in a logged exception.
Listeners will be informed of the update.
Parameters
----------
index : int, optional (default = -1)
Optional index for array variables.
read : bool, optional (default = True)
If True, perform a read transaction.
check : bool, optional (default = True)
If True, check transaction completion.
"""
try:
if read:
self._parent.readBlocks(recurse=False, variable=self, index=index)
if check:
self._parent.checkBlocks(recurse=False, variable=self)
return self._get(index)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error reading value from variable '{}'".format(self.path))
raise e
[docs]
@pr.expose
def write(self, *, verify: bool = True, check: bool = True) -> None:
"""
Force a write of the variable.
Hardware write is blocking if check=True.
A verify will be performed according to self.verifyEn if verify=True
A verify will not be performed if verify=False
An error will result in a logged exception
Parameters
----------
verify : bool, optional (default = True)
If True, verify after write.
check : bool, optional (default = True)
If True, check transaction completion.
Returns
-------
"""
try:
self._parent.writeBlocks(force=True, recurse=False, variable=self)
if verify:
self._parent.verifyBlocks(recurse=False, variable=self)
if check:
self._parent.checkBlocks(recurse=False, variable=self)
except Exception as e:
pr.logException(self._log,e)
raise e
def _parseDispValue(self, sValue: str) -> Any:
"""Parse a string representation of a value into a Python object.
Parameters
----------
sValue : str
String representation of the value to parse.
Returns
-------
"""
if self.disp == 'enum':
return self.revEnum[sValue]
else:
return self._base.fromString(sValue)
def _genDocs(self, file: Any) -> None:
"""
Parameters
----------
file :
Returns
-------
"""
BaseVariable._genDocs(self,file)
for a in ['offset', 'numValues', 'bitSize', 'bitOffset', 'verifyEn', 'varBytes']:
astr = str(getattr(self,a))
if astr != 'None':
print(pr.genDocTableRow([a,astr],4,100),file=file)
[docs]
class LocalVariable(BaseVariable):
"""Local variable backed by an in-process value.
Parameters
----------
name : str
Variable name.
value : object, optional
Default value.
description : str, optional (default = "")
Human-readable description.
mode : str, optional (default = "RW")
Access mode: ``RW``, ``RO``, or ``WO``.
disp : object, optional (default = "{}")
Display formatter or mapping.
enum : dict, optional
Mapping from object values to display strings.
units : str, optional
Engineering units.
hidden : bool, optional (default = False)
If True, add the variable to the ``Hidden`` group.
groups : list[str], optional
Groups to assign.
minimum : object, optional
Minimum allowed value.
maximum : object, optional
Maximum allowed value.
lowWarning : object, optional
Low warning threshold.
lowAlarm : object, optional
Low alarm threshold.
highWarning : object, optional
High warning threshold.
highAlarm : object, optional
High alarm threshold.
localSet : callable, optional
Setter callback. Expected form:
``localSet(value, dev=None, var=None, changed=None)``.
The callback may accept any subset of these named arguments.
localGet : callable, optional
Getter callback. Expected form:
``localGet(dev=None, var=None)``.
The callback may accept any subset of these named arguments.
pollInterval : object, optional (default = 0)
Polling interval.
updateNotify : bool, optional (default = True)
Enable update notifications.
typeStr : str, optional (default = "Unknown")
Type string for display.
bulkOpEn : bool, optional (default = True)
Enable bulk operations.
guiGroup : str, optional
GUI grouping label.
**kwargs : Any
Additional arguments forwarded to ``BaseVariable``.
"""
def __init__(self, *,
name: str,
value: Any = None,
description: str = '',
mode: str = 'RW',
disp: Any = '{}',
enum: dict[object, str] | None = None,
units: str | None = None,
hidden: bool = False,
groups: list[str] | None = None,
minimum: Any | None = None,
maximum: Any | None = None,
lowWarning: Any | None = None,
lowAlarm: Any | None = None,
highWarning: Any | None = None,
highAlarm: Any | None = None,
localSet: LocalSetCallback | None = None,
localGet: LocalGetCallback | None = None,
pollInterval: Any = 0,
updateNotify: bool = True,
typeStr: str = 'Unknown',
bulkOpEn: bool = True,
guiGroup: str | None = None,
**kwargs: Any) -> None:
"""Initialize a local variable."""
if value is None and localGet is None:
raise VariableError(f'LocalVariable {self.path} without localGet() must specify value= argument in constructor')
BaseVariable.__init__(self, name=name, description=description,
mode=mode, value=value, disp=disp,
enum=enum, units=units, hidden=hidden, groups=groups,
minimum=minimum, maximum=maximum, typeStr=typeStr,
lowWarning=lowWarning, lowAlarm=lowAlarm,
highWarning=highWarning, highAlarm=highAlarm,
pollInterval=pollInterval,updateNotify=updateNotify, bulkOpEn=bulkOpEn,
guiGroup=guiGroup, **kwargs)
self._block = pr.LocalBlock(variable=self,
localSet=localSet,
localGet=localGet,
minimum=minimum,
maximum=maximum,
value=self._default)
[docs]
@pr.expose
def set(
self,
value: Any,
*,
index: int = -1,
write: bool = True,
verify: bool = True,
check: bool = True,
) -> None:
"""
Set the value and write to hardware if applicable
Writes to hardware are blocking. An error will result in a logged exception.
Parameters
----------
value : object
Value to set.
index : int, optional (default = -1)
Optional index for array variables.
write : bool, optional (default = True)
If True, perform a write transaction.
verify : bool, optional (default = True)
If True, verify after write.
check : bool, optional (default = True)
If True, check transaction completion.
Returns
-------
"""
self._log.debug("{}.set({})".format(self, value))
try:
# Set value to block
self._block.set(self, value, index)
if write:
self._block._checkTransaction()
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error setting value '{}' to variable '{}' with type {}. Exception={}".format(value,self.path,self.typeStr,e))
raise e
[docs]
@pr.expose
def post(self, value: Any, *, index: int = -1) -> None:
"""
Set the value and write to hardware if applicable using a posted write.
This method does not call through parent.writeBlocks(), but rather
calls on self._block directly.
Parameters
----------
value : object
Value to set.
index : int, optional (default = -1)
Optional index for array variables.
Returns
-------
"""
self._log.debug("{}.post({})".format(self, value))
try:
self._block.set(self, value, index)
self._block._checkTransaction()
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error posting value '{}' to variable '{}' with type {}".format(value,self.path,self.typeStr))
raise e
[docs]
@pr.expose
def get(self, *, index: int = -1, read: bool = True, check: bool = True) -> Any:
"""
Parameters
----------
index : int, optional (default = -1)
Optional index for array variables.
read : bool, optional (default = True)
If True, perform a read transaction.
check : bool, optional (default = True)
If True, check transaction completion.
Returns
-------
type
Hardware read is blocking. An error will result in a logged exception.
Listeners will be informed of the update.
"""
try:
if read and check:
self._block._checkTransaction()
return self._block.get(self,index)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error reading value from variable '{}'".format(self.path))
raise e
def __get__(self) -> Any:
"""Return the current local-variable value without hardware read."""
return self.get(read=False)
def __iadd__(self, other: Any) -> LocalVariable:
"""In-place add on the local variable value."""
self._block._iadd(other)
return self
def __isub__(self, other: Any) -> LocalVariable:
"""In-place subtract on the local variable value."""
self._block._isub(other)
return self
def __imul__(self, other: Any) -> LocalVariable:
"""In-place multiply on the local variable value."""
self._block._imul(other)
return self
def __imatmul__(self, other: Any) -> LocalVariable:
"""In-place matrix-multiply on the local variable value."""
self._block._imatmul(other)
return self
def __itruediv__(self, other: Any) -> LocalVariable:
"""In-place true-division on the local variable value."""
self._block._itruediv(other)
return self
def __ifloordiv__(self, other: Any) -> LocalVariable:
"""In-place floor-division on the local variable value."""
self._block._ifloordiv(other)
return self
def __imod__(self, other: Any) -> LocalVariable:
"""In-place modulo on the local variable value."""
self._block._imod(other)
return self
def __ipow__(self, other: Any) -> LocalVariable:
"""In-place exponentiation on the local variable value."""
self._block._ipow(other)
return self
def __ilshift__(self, other: Any) -> LocalVariable:
"""In-place left-shift on the local variable value."""
self._block._ilshift(other)
return self
def __irshift__(self, other: Any) -> LocalVariable:
"""In-place right-shift on the local variable value."""
self._block._irshift(other)
return self
def __iand__(self, other: Any) -> LocalVariable:
"""In-place bitwise-and on the local variable value."""
self._block._iand(other)
return self
def __ixor__(self, other: Any) -> LocalVariable:
"""In-place bitwise-xor on the local variable value."""
self._block._ixor(other)
return self
def __ior__(self, other: Any) -> LocalVariable:
"""In-place bitwise-or on the local variable value."""
self._block._ior(other)
return self
[docs]
class LinkVariable(BaseVariable):
"""Variable linked to another variable or custom functions.
Parameters
----------
name : str
Variable name.
variable : object, optional
Optional variable to link to.
This is a useful shortcut for linking to a variable without having to declare dependencies.
dependencies : iterable, optional
List of variables that this variable depends on.
linkedSet : callable, optional
Setter callback. Expected keyword arguments are
``dev, var, value, write, index, verify, check``.
These match the parameters of BaseVariable.set().
The callback may accept any subset of these names.
Only value is required.
If not provided, will infer mode = 'RO'.
linkedGet : callable, optional
Getter callback. Expected keyword arguments are
``dev, var, read, index, check``.
These match the parameters of BaseVariable.get().
The callback may accept any subset of these names.
minimum : object, optional
Minimum allowed value.
maximum : object, optional
Maximum allowed value.
**kwargs : Any
Additional arguments forwarded to ``BaseVariable``.
"""
def __init__(self, *,
name: str,
variable: BaseVariable | None = None,
dependencies: Iterable[BaseVariable] | None = None,
linkedSet: LinkedSetCallback | None = None,
linkedGet: LinkedGetCallback | None = None,
minimum: Any | None = None,
maximum: Any | None = None,
**kwargs: Any) -> None: # Args passed to BaseVariable
"""Initialize a link variable."""
# Set and get functions
self._linkedGet = linkedGet
self._linkedSet = linkedSet
if minimum is not None or maximum is not None:
raise VariableError("Invalid use of min or max values with LinkVariable")
if variable is not None:
# If directly linked to a variable, use it's value and set by defualt
# for linkedGet and linkedSet unless overridden
self._linkedGet = linkedGet if linkedGet else variable.get
self._linkedSet = linkedSet if linkedSet else variable.set
# Search the kwargs for overridden properties, otherwise the properties from the linked variable will be used
args = ['disp', 'enum', 'units', 'minimum', 'maximum']
for arg in args:
if arg not in kwargs:
kwargs[arg] = getattr(variable, arg)
# Why not inhertit mode from link variable?
if not self._linkedSet:
kwargs['mode'] = 'RO'
if not self._linkedGet:
kwargs['mode'] = 'WO'
# Wrap linked get and set functions
self._linkedGetWrap = pr.functionWrapper(function=self._linkedGet, callArgs=['dev', 'var', 'read', 'index', 'check'])
self._linkedSetWrap = pr.functionWrapper(function=self._linkedSet, callArgs=['dev', 'var', 'value', 'write', 'index', 'verify', 'check'])
# Call super constructor
BaseVariable.__init__(self, name=name, **kwargs)
# Dependency tracking
if variable is not None:
# Add the directly linked variable as a dependency
self.addDependency(variable)
if dependencies is not None:
for d in dependencies:
self.addDependency(d)
self.__depBlocks = []
def __getitem__(self, key: Any) -> Any:
"""Return dependency by index/key from ``dependencies``."""
# Allow dependencies to be accessed as indices of self
return self.dependencies[key]
[docs]
@pr.expose
def set(
self,
value: Any,
*,
write: bool = True,
index: int = -1,
verify: bool = True,
check: bool = True,
) -> None:
"""
Parameters
----------
value : object
Value to set.
write : bool, optional (default = True)
If True, perform a write transaction.
index : int, optional (default = -1)
Optional index for array variables.
verify : bool, optional (default = True)
If True, verify after write.
check : bool, optional (default = True)
If True, check transaction completion.
Returns
-------
"""
try:
self._linkedSetWrap(function=self._linkedSet, dev=self.parent, var=self, value=value, write=write, index=index, verify=verify, check=check)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error setting link variable '{}'".format(self.path))
raise e
[docs]
@pr.expose
def get(self, read: bool = True, index: int = -1, check: bool = True) -> Any:
"""
Parameters
----------
read : bool, optional (default = True)
If True, perform a read transaction.
index : int, optional (default = -1)
Optional index for array variables.
check : bool, optional (default = True)
If True, check transaction completion.
Returns
-------
"""
try:
return self._linkedGetWrap(function=self._linkedGet, dev=self.parent, var=self, read=read, index=index, check=check)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error getting link variable '{}'".format(self.path))
raise e
def __getBlocks(self) -> list[pr.Block]:
"""Collect dependency blocks recursively for this link variable."""
b = []
for d in self.dependencies:
if isinstance(d, LinkVariable):
b.extend(d.__getBlocks())
elif hasattr(d, '_block') and d._block is not None:
b.append(d._block)
return b
def _finishInit(self) -> None:
"""Resolve dependency blocks after tree attachment."""
super()._finishInit()
self.__depBlocks = self.__getBlocks()
@property
def depBlocks(self) -> list[pr.Block]:
"""Return a list of blocks that this LinkVariable depends on."""
return self.__depBlocks
@pr.expose
@property
def pollInterval(self) -> float:
"""Return the minimum non-zero poll interval across dependencies."""
depIntervals = [dep.pollInterval for dep in self.dependencies if dep.pollInterval > 0]
if len(depIntervals) == 0:
return 0
else:
return min(depIntervals)