#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Variable Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import pyrogue as pr
import rogue.interfaces.memory as rim
import threading
import re
import time
import shlex
import numpy as np
import ast
import sys
from collections import OrderedDict as odict
from collections.abc import Iterable
class VariableError(Exception):
""" """
pass
class VariableWaitClass(object):
"""
Wait for a number of variable conditions to be true.
Pass a variable or list of variables, and a test function.
The test function is passed a list containing the current
variableValue state indexed by position as passed in the wait list.
Each variableValue entry has an additional field updated which indicates
if the variable has refreshed while waiting. This can be used to trigger
on any update to the variable, regardless of value.
i.e. w = VariableWaitClass([root.device.var1,root.device.var2],
lambda varValues: varValues[0].value >= 10 and \
varValues[1].value >= 20)
w.wait()
i.e. w = VariableWaitClass([root.device.var1,root.device.var2],
lambda varValues: varValues[0].updated and \
varValues[1].updated)
w.wait()
If no function is provided, the class will return when all variables are updated,
regardless of value.
The routine wait class can be used multiple times with subsequent calls using arm() to trigger:
w.wait()
w.arm()
w.wait()
getValues() can be called to get the final version of the tiles after the test passed.
Parameters
----------
varList :
List of variables to monitor.
testFunction :
Function which will test the state of the values, or None to trigger on update
timeout : int
(Default value = 0)
"""
def __init__(self, varList, testFunction=None, timeout=0):
self._values = odict()
self._updated = odict()
self._cv = threading.Condition()
self._testFunc = testFunction
self._timeout = timeout
# Convert single variable to a list
if not isinstance(varList,list):
self._vlist = [varList]
else:
self._vlist = varList
self.arm()
def arm(self):
with self._cv:
for v in self._vlist:
v.addListener(self._varUpdate)
self._values[v.path] = v.getVariableValue(read=False)
self._updated[v.path] = False
def wait(self):
start = time.time()
with self._cv:
ret = self._check()
# Run until timeout or all conditions have been met
while (not ret) and ((self._timeout == 0) or ((time.time()-start) < self._timeout)):
self._cv.wait(0.5)
ret = self._check()
# Cleanup
for v in self._vlist:
v.delListener(self._varUpdate)
return ret
def get_values(self):
return {k: self._values[k] for k in self._vlist}
def _varUpdate(self, path, varValue):
with self._cv:
if path in self._values:
self._values[path] = varValue
self._updated[path] = True
self._cv.notify()
def _check(self):
if self._testFunc is not None:
return (self._testFunc(list(self._values.values())))
else:
return (all(self._updated.values()))
def VariableWait(varList, testFunction=None, timeout=0):
"""
Wait for a number of variable conditions to be true.
Pass a variable or list of variables, and a test function.
The test function is passed a list containing the current
variableValue state indexed by position as passed in the wait list.
Each variableValue entry has an additional field updated which indicates
if the variable has refreshed while waiting. This can be used to trigger
on any update to the variable, regardless of value.
i.e. w = VariableWait([root.device.var1,root.device.var2],
lambda varValues: varValues[0].value >= 10 and \
varValues[1].value >= 20)
i.e. w = VariableWait([root.device.var1,root.device.var2],
lambda varValues: varValues[0].updated and \
varValues[1].updated)
If no function is provided, the class will return when all variables are updated,
regardless of value.
Parameters
----------
varList :
List of variables to monitor.
testFunction :
Function which will test the state of the values, or None to trigger on update
timeout : int
(Default value = 0)
Returns
-------
True if conditions were met, false if there was a timeout
"""
wc = VariableWaitClass(varList, testFunction, timeout)
return wc.wait()
class VariableValue(object):
""" """
def __init__(self, var, read=False, index=-1):
self.value = var.get(read=read,index=index)
self.valueDisp = var.genDisp(self.value)
self.disp = var.disp
self.enum = var.enum
self.status, self.severity = var._alarmState(self.value)
def __repr__(self):
return f'{str(self.__class__)}({self.__dict__})'
class VariableListData(object):
""" """
def __init__(self, numValues, valueBits, valueStride):
self.numValues = numValues
self.valueBits = valueBits
self.valueStride = valueStride
def __repr__(self):
return f'{self.__class__}({self.__dict__})'
[docs]
class BaseVariable(pr.Node):
"""
Documentation string for __init__ goes here
(under the 'class' definition)
Fill in the paremeters.
"""
PROPS = ['name', 'path', 'mode', 'typeStr', 'enum',
'disp', 'precision', 'units', 'minimum', 'maximum',
'nativeType', 'ndType', 'updateNotify',
'hasAlarm', 'lowWarning', 'highWarning', 'lowAlarm',
'highAlarm', 'alarmStatus', 'alarmSeverity', 'pollInterval']
def __init__(self, *,
name,
description='',
mode='RW',
value=None,
disp='{}',
enum=None,
units=None,
hidden=False,
groups=None,
minimum=None,
maximum=None,
lowWarning=None,
lowAlarm=None,
highWarning=None,
highAlarm=None,
pollInterval=0,
updateNotify=True,
typeStr='Unknown',
bulkOpEn=True,
offset=0,
guiGroup=None,
**kwargs):
# Public Attributes
self._bulkOpEn = bulkOpEn
self._updateNotify = updateNotify
self._mode = mode
self._units = units
self._minimum = minimum
self._maximum = maximum
self._lowWarning = lowWarning
self._lowAlarm = lowAlarm
self._highWarning = highWarning
self._highAlarm = highAlarm
self._default = value
self._typeStr = typeStr
self._block = None
self._pollInterval = pollInterval
self._nativeType = None
self._ndType = None
self._extraAttr = kwargs
self._listeners = []
self.__functions = []
self.__dependencies = []
# Build enum if specified
self._disp = disp
self._enum = enum
if isinstance(disp, dict):
self._enum = disp
elif isinstance(disp, list):
self._enum = {k:str(k) for k in disp}
elif isinstance(value, bool) and enum is None:
self._enum = {False: 'False', True: 'True'}
if self._enum is not None:
self._disp = 'enum'
# Create inverted enum
self._revEnum = None
if self._enum is not None:
self._revEnum = {v:k for k,v in self._enum.items()}
# Auto determine types
if value is not None:
self._nativeType = type(value)
if self._enum is not None and (self._nativeType is np.ndarray or self._nativeType is list or self._nativeType is dict):
raise VariableError(f"Invalid use of enum with value of type {self._nativeType}")
if self._nativeType is np.ndarray:
self._ndType = value.dtype
self._typeStr = f'{value.dtype}{value.shape}'
elif self._typeStr == 'Unknown':
self._typeStr = value.__class__.__name__
# Check modes
if (self._mode != 'RW') and (self._mode != 'RO') and \
(self._mode != 'WO'):
raise VariableError(f'Invalid variable mode {self._mode}. Supported: RW, RO, WO')
# Call super constructor
pr.Node.__init__(self, name=name, description=description, hidden=hidden, groups=groups, guiGroup=guiGroup)
@pr.expose
@property
def enum(self):
""" """
return self._enum
@property
def enumYaml(self):
""" """
return pr.dataToYaml(self._enum)
@pr.expose
@property
def revEnum(self):
""" """
return self._revEnum
@pr.expose
@property
def typeStr(self):
""" """
if self._typeStr == 'Unknown':
v = self.value()
if self.nativeType is np.ndarray:
self._typeStr = f'{v.dtype}{v.shape}'
else:
self._typeStr = v.__class__.__name__
return self._typeStr
@pr.expose
@property
def disp(self):
""" """
return self._disp
@pr.expose
@property
def precision(self):
""" """
if self.nativeType is float or self.nativeType is np.ndarray:
res = re.search(r':([0-9])\.([0-9]*)f',self._disp)
try:
return int(res[2])
except Exception:
return 8
else:
return 0
@pr.expose
@property
def mode(self):
""" """
return self._mode
@pr.expose
@property
def units(self):
""" """
return self._units
@pr.expose
@property
def minimum(self):
""" """
return self._minimum
@pr.expose
@property
def maximum(self):
""" """
return self._maximum
@pr.expose
@property
def hasAlarm(self):
""" """
return (self._lowWarning is not None or self._lowAlarm is not None or self._highWarning is not None or self._highAlarm is not None)
@pr.expose
@property
def lowWarning(self):
""" """
return self._lowWarning
@pr.expose
@property
def lowAlarm(self):
""" """
return self._lowAlarm
@pr.expose
@property
def highWarning(self):
""" """
return self._highWarning
@pr.expose
@property
def highAlarm(self):
""" """
return self._highAlarm
@pr.expose
@property
def alarmStatus(self):
""" """
stat,sevr = self._alarmState(self.value())
return stat
@pr.expose
@property
def alarmSeverity(self):
""" """
stat,sevr = self._alarmState(self.value())
return sevr
def properties(self):
d = odict()
d['value'] = self.value()
d['valueDisp'] = self.valueDisp()
d['class'] = self.__class__.__name__
for p in self.PROPS:
d[p] = getattr(self, p)
return d
def getExtraAttribute(self, name):
if name in self._extraAttr:
return self._extraAttr[name]
else:
return None
[docs]
def addDependency(self, dep):
"""
Parameters
----------
dep :
Returns
-------
"""
if dep not in self.__dependencies:
self.__dependencies.append(dep)
dep.addListener(self)
@pr.expose
@property
def pollInterval(self):
""" """
return self._pollInterval
@pr.expose
def setPollInterval(self, interval):
self._log.debug(f'{self.path}.setPollInterval({interval}]')
self._pollInterval = interval
self._updatePollInterval()
@pr.expose
@property
def lock(self):
""" """
if self._block is not None:
return self._block._lock
else:
return None
@pr.expose
@property
def updateNotify(self):
""" """
return self._updateNotify
@property
def dependencies(self):
""" """
return self.__dependencies
[docs]
def addListener(self, listener):
"""
Add a listener Variable or function to call when variable changes.
This is useful when chaining variables together. (ADC conversions, etc)
The variable and value class are passed as an arg: func(path,varValue)
Parameters
----------
listener :
Returns
-------
"""
if isinstance(listener, BaseVariable):
if listener not in self._listeners:
self._listeners.append(listener)
else:
if listener not in self.__functions:
self.__functions.append(listener)
def _addListenerCpp(self, func):
self.addListener(lambda path, varValue: func(path, varValue.valueDisp))
[docs]
def delListener(self, listener):
"""
Remove a listener Variable or function
Parameters
----------
listener :
Returns
-------
"""
if isinstance(listener, BaseVariable):
if listener in self._listeners:
self._listeners.remove(listener)
else:
if listener in self.__functions:
self.__functions.remove(listener)
[docs]
@pr.expose
def set(self, value, *, index=-1, write=True, verify=True, check=True):
"""
Set the value and write to hardware if applicable
Writes to hardware are blocking. An error will result in a logged exception.
Parameters
----------
value :
* :
index : int
(Default value = -1)
write : bool
(Default value = True)
verify : bool
(Default value = True)
check : bool
(Default value = True)
Returns
-------
"""
pass
[docs]
@pr.expose
def post(self, value, *, index=-1):
"""
Set the value and write to hardware if applicable using a posted write.
This method does not call through parent.writeBlocks(), but rather
calls on self._block directly.
Parameters
----------
value :
* :
index : int
(Default value = -1)
Returns
-------
"""
pass
[docs]
@pr.expose
def get(self, *, index=-1, read=True, check=True):
"""
Return the value after performing a read from hardware if applicable.
Hardware read is blocking. An error will result in a logged exception.
Listeners will be informed of the update.
Parameters
----------
* :
index : int
(Default value = -1)
read : bool
(Default value = True)
check : bool
(Default value = True)
Returns
-------
"""
return None
[docs]
@pr.expose
def write(self, *, verify=True, check=True):
"""
Force a write of the variable.
Parameters
----------
* :
verify : bool
(Default value = True)
check : bool
(Default value = True)
Returns
-------
"""
pass
[docs]
@pr.expose
def getVariableValue(self,read=True,index=-1):
"""
Return the value after performing a read from hardware if applicable.
Hardware read is blocking. An error will result in a logged exception.
Listeners will be informed of the update.
Parameters
----------
read : bool
(Default value = True)
index : int
(Default value = -1)
Returns
-------
type
Hardware read is blocking. An error will result in a logged exception.
Listeners will be informed of the update.
"""
return VariableValue(self,read=read,index=index)
@pr.expose
def value(self, index=-1):
""" """
return self.get(read=False, index=index)
[docs]
@pr.expose
def genDisp(self, value, *, useDisp=None):
"""
Parameters
----------
value :
Returns
-------
"""
try:
if useDisp is None:
useDisp=self.disp
if useDisp == '':
return ''
elif isinstance(value,np.ndarray):
return np.array2string(value,
separator=', ',
formatter={'all':useDisp.format},
threshold=sys.maxsize,
max_line_width=1000)
elif useDisp == 'enum':
if value in self.enum:
return self.enum[value]
else:
self._log.warning("Invalid enum value {} in variable '{}'".format(value,self.path))
return f'INVALID: {value}'
else:
return useDisp.format(value)
except Exception as e:
pr.logException(self._log,e)
self._log.error(f"Error generating disp for value {value} with type {type(value)} in variable {self.path}")
raise e
[docs]
@pr.expose
def getDisp(self, read=True, index=-1):
"""
Parameters
----------
read : bool
(Default value = True)
index : int
(Default value = -1)
Returns
-------
"""
return self.genDisp(self.get(read=read,index=index))
[docs]
@pr.expose
def valueDisp(self, index=-1): #, read=True, index=-1):
"""
Parameters
----------
read : bool
(Default value = True)
index : int
(Default value = -1)
Returns
-------
"""
return self.getDisp(read=False, index=index)
[docs]
@pr.expose
def parseDisp(self, sValue):
"""
Parameters
----------
sValue :
Returns
-------
"""
try:
if not isinstance(sValue,str):
return sValue
elif self.nativeType is np.ndarray:
return np.array(ast.literal_eval(sValue),self._ndType)
elif self.disp == 'enum':
return self.revEnum[sValue]
elif self.nativeType is str:
return sValue
else:
return ast.literal_eval(sValue)
except Exception as e:
msg = "Invalid value {} for variable {} with type {}: {}".format(sValue,self.name,self.nativeType,e)
self._log.error(msg)
raise VariableError(msg)
[docs]
@pr.expose
def setDisp(self, sValue, write=True, index=-1):
"""
Parameters
----------
sValue :
write : bool
(Default value = True)
index : int
(Default value = -1)
Returns
-------
"""
self.set(self.parseDisp(sValue), write=write, index=index)
@property
def nativeType(self):
""" """
return self._nativeType
@property
def ndType(self):
""" """
return self._ndType
@property
def ndTypeStr(self):
""" """
return str(self.ndType)
def _setDefault(self):
""" """
if self._default is not None:
self.setDisp(self._default, write=False)
def _updatePollInterval(self):
""" """
if self.root is not None and self.root._pollQueue is not None:
self.root._pollQueue.updatePollInterval(self)
def _finishInit(self):
""" """
# Set the default value but dont write
self._setDefault()
self._updatePollInterval()
# Setup native type
if self._nativeType is None:
v = self.value()
self._nativeType = type(v)
if self._nativeType is np.ndarray:
self._ndType = v.dtype
def _setDict(self,d,writeEach,modes,incGroups,excGroups,keys):
"""
Parameters
----------
d :
writeEach :
modes :
incGroups :
excGroups :
keys :
Returns
-------
"""
# If keys is not none, it should only contain one entry
# and the variable should be a list variable
if keys is not None:
if len(keys) != 1 or (self.nativeType is not list and self.nativeType is not np.ndarray):
self._log.error(f"Entry {self.name} with key {keys} not found")
elif self._mode in modes:
if keys[0] == '*' or keys[0] == ':':
for i in range(self._numValues()):
self.setDisp(d, write=writeEach, index=i)
else:
idxSlice = eval(f'[i for i in range(self._numValues())][{keys[0]}]')
# Single entry item
if ':' not in keys[0]:
self.setDisp(d, write=writeEach, index=idxSlice[0])
# Multi entry item
else:
if isinstance(d,str):
s = shlex.shlex(" " + d.lstrip('[').rstrip(']') + " ",posix=True)
s.whitespace_split=True
s.whitespace=','
else:
s = d
for val,i in zip(s,idxSlice):
self.setDisp(val.strip(), write=writeEach, index=i)
else:
self._log.warning(f"Skipping set for Entry {self.name} with mode {self._mode}. Enabled Modes={modes}.")
# Standard set
elif self._mode in modes:
# Array variables can be set with a dict of index/value pairs
if isinstance(d, dict):
for k,v in d.items():
self.setDisp(v, index=k, write=writeEach)
else:
self.setDisp(d,writeEach)
else:
self._log.warning(f"Skipping set for Entry {self.name} with mode {self._mode}. Enabled Modes={modes}.")
def _getDict(self, modes=['RW', 'RO', 'WO'], incGroups=None, excGroups=None, properties=False):
"""
Parameters
----------
modes :
incGroups :
excGroups :
Returns
-------
"""
if self._mode in modes:
if properties is False:
return VariableValue(self)
else:
return self.properties()
else:
return None
def _queueUpdate(self):
""" """
self._root._queueUpdates(self)
def _doUpdate(self):
""" """
val = VariableValue(self)
for func in self.__functions:
func(self.path,val)
return val
def _alarmState(self,value):
"""
Parameters
----------
value :
Returns
-------
type
"""
if isinstance(value,list) or isinstance(value,dict):
return 'None','None'
if (self.hasAlarm is False):
return "None", "None"
elif (self._lowAlarm is not None and value < self._lowAlarm):
return 'AlarmLoLo', 'AlarmMajor'
elif (self._highAlarm is not None and value > self._highAlarm):
return 'AlarmHiHi', 'AlarmMajor'
elif (self._lowWarning is not None and value < self._lowWarning):
return 'AlarmLow', 'AlarmMinor'
elif (self._highWarning is not None and value > self._highWarning):
return 'AlarmHigh', 'AlarmMinor'
else:
return 'Good','Good'
def _genDocs(self,file):
"""
Parameters
----------
file :
Returns
-------
"""
print(f".. topic:: {self.path}",file=file)
print('',file=file)
print(pr.genDocDesc(self.description,4),file=file)
print('',file=file)
print(pr.genDocTableHeader(['Field','Value'],4,100),file=file)
for a in ['name', 'path', 'enum',
'typeStr', 'disp', 'precision', 'mode', 'units', 'minimum',
'maximum', 'lowWarning', 'lowAlarm', 'highWarning',
'highAlarm', 'alarmStatus', 'alarmSeverity']:
astr = str(getattr(self,a))
if astr != 'None':
print(pr.genDocTableRow([a,astr],4,100),file=file)
[docs]
class RemoteVariable(BaseVariable,rim.Variable):
""" """
PROPS = BaseVariable.PROPS + [
'address', 'overlapEn', 'offset', 'bitOffset', 'bitSize',
'verifyEn', 'numValues', 'valueBits', 'valueStride', 'retryCount',
'varBytes', 'bulkEn']
def __init__(self, *,
name,
description='',
mode='RW',
value=None,
disp=None,
enum=None,
units=None,
hidden=False,
groups=None,
minimum=None,
maximum=None,
lowWarning=None,
lowAlarm=None,
highWarning=None,
highAlarm=None,
base=pr.UInt,
offset,
numValues=0,
valueBits=0,
valueStride=0,
bitSize=32,
bitOffset=0,
pollInterval=0,
updateNotify=True,
overlapEn=False,
bulkOpEn=True,
verify=True,
retryCount=0,
guiGroup=None,
**kwargs):
if disp is None:
disp = base.defaultdisp
self._block = None
# Convert the address parameters into lists
addrParams = [offset, bitOffset, bitSize] # Make a copy
addrParams = [list(x) if isinstance(x, Iterable) else [x] for x in addrParams] # convert to lists
length = max((len(x) for x in addrParams))
addrParams = [x*length if len(x)==1 else x for x in addrParams] # Make single element lists as long as max
offset, bitOffset, bitSize = addrParams # Assign back
# Verify the the list lengths match
if len(offset) != len(bitOffset) != len(bitSize):
raise VariableError('Lengths of offset: {}, bitOffset: {}, bitSize {} must match'.format(offset, bitOffset, bitSize))
# Check for invalid values
if 0 in bitSize:
raise VariableError('BitSize of 0 is invalid')
# Normalize bitOffsets relative to the smallest offset
baseAddr = min(offset)
bitOffset = [x+((y-baseAddr)*8) for x,y in zip(bitOffset, offset)]
offset = baseAddr
if isinstance(base, pr.Model):
self._base = base
elif numValues != 0:
self._base = base(valueBits)
else:
self._base = base(sum(bitSize))
# Apply default min and max
if minimum is None or (self._base.minValue() is not None and minimum < self._base.minValue()):
minimum = self._base.minValue()
if maximum is None or (self._base.maxValue() is not None and maximum > self._base.maxValue()):
maximum = self._base.maxValue()
BaseVariable.__init__(self, name=name, description=description,
mode=mode, value=value, disp=disp,
enum=enum, units=units, hidden=hidden, groups=groups,
minimum=minimum, maximum=maximum, bulkOpEn=bulkOpEn,
lowWarning=lowWarning, lowAlarm=lowAlarm,
highWarning=highWarning, highAlarm=highAlarm,
pollInterval=pollInterval,updateNotify=updateNotify,
guiGroup=guiGroup, **kwargs)
# If numValues > 0 the bit size array must only have one entry
# Auto calculate the total number of bits
if numValues != 0:
self._nativeType = np.ndarray
self._ndType = self._base.ndType
self._typeStr = f'{self.ndType}({numValues},)'
if len(bitSize) != 1:
raise VariableError('BitSize array must have a length of one when numValues > 0')
if valueBits > valueStride:
raise VariableError(f'ValueBits {valueBits} is greater than valueStrude {valueStride}')
# Override the bitSize
bitSize[0] = numValues * valueBits
if self._ndType is None:
raise VariableError(f'Invalid base type {self._base} with numValues = {numValues}')
else:
self._typeStr = self._base.name
listData = VariableListData(numValues,valueBits,valueStride)
# Setup C++ Base class
rim.Variable.__init__(
self,
self._name,
self._mode,
self._minimum,
self._maximum,
offset,
bitOffset,
bitSize,
overlapEn,
verify,
self._bulkOpEn,
self._updateNotify,
self._base,
listData,
retryCount)
##############################
# Properties held by C++ class
##############################
@pr.expose
@property
def address(self):
return self._block.address
@property
def numValues(self):
return self._numValues()
@property
def valueBits(self):
return self._valueBits()
@property
def valueStride(self):
return self._valueStride()
@property
def retryCount(self):
return self._retryCount()
@pr.expose
@property
def varBytes(self):
""" """
return self._varBytes()
@pr.expose
@property
def offset(self):
""" """
return self._offset()
@pr.expose
@property
def overlapEn(self):
""" """
return self._overlapEn()
@pr.expose
@property
def bitSize(self):
""" """
return self._bitSize()
@pr.expose
@property
def bitOffset(self):
""" """
return self._bitOffset()
@pr.expose
@property
def verifyEn(self):
""" """
return self._verifyEn()
########################
# Local Properties
########################
@pr.expose
@property
def base(self):
""" """
return self._base
@pr.expose
@property
def bulkEn(self):
""" """
return self._bulkOpEn
[docs]
@pr.expose
def set(self, value, *, index=-1, write=True, verify=True, check=True):
"""
Set the value and write to hardware if applicable
Writes to hardware are blocking if check=True, otherwise non-blocking.
A verify will be performed according to self.verifyEn if verify=True
A verify will not be performed if verify=False
An error will result in a logged exception.
Parameters
----------
value :
* :
index : int
(Default value = -1)
write : bool
(Default value = True)
verify : bool
(Default value = True)
check : bool
(Default value = True)
Returns
-------
"""
try:
# Set value to block
self._set(value,index)
if write:
self._parent.writeBlocks(force=True, recurse=False, variable=self, index=index)
if verify:
self._parent.verifyBlocks(recurse=False, variable=self)
if check:
self._parent.checkBlocks(recurse=False, variable=self)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error setting value '{}' to variable '{}' with type {}. Exception={}".format(value,self.path,self.typeStr,e))
raise e
[docs]
@pr.expose
def post(self, value, *, index=-1):
"""
Set the value and write to hardware if applicable using a posted write.
This method does not call through parent.writeBlocks(), but rather
calls on self._block directly.
Parameters
----------
value :
* :
index : int
(Default value = -1)
Returns
-------
"""
try:
# Set value to block
self._set(value,index)
pr.startTransaction(self._block, type=rim.Post, forceWr=False, checkEach=True, variable=self, index=index)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error posting value '{}' to variable '{}' with type {}".format(value,self.path,self.typeStr))
raise e
[docs]
@pr.expose
def get(self, *, index=-1, read=True, check=True):
"""
Parameters
----------
* :
index : int
(Default value = -1)
read : bool
(Default value = True)
check : bool
(Default value = True)
Returns
-------
type
Hardware read is blocking if check=True, otherwise non-blocking.
An error will result in a logged exception.
Listeners will be informed of the update.
"""
try:
if read:
self._parent.readBlocks(recurse=False, variable=self, index=index)
if check:
self._parent.checkBlocks(recurse=False, variable=self)
return self._get(index)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error reading value from variable '{}'".format(self.path))
raise e
[docs]
@pr.expose
def write(self, *, verify=True, check=True):
"""
Force a write of the variable.
Hardware write is blocking if check=True.
A verify will be performed according to self.verifyEn if verify=True
A verify will not be performed if verify=False
An error will result in a logged exception
Parameters
----------
* :
verify : bool
(Default value = True)
check : bool
(Default value = True)
Returns
-------
"""
try:
self._parent.writeBlocks(force=True, recurse=False, variable=self)
if verify:
self._parent.verifyBlocks(recurse=False, variable=self)
if check:
self._parent.checkBlocks(recurse=False, variable=self)
except Exception as e:
pr.logException(self._log,e)
raise e
def _parseDispValue(self, sValue):
"""
Parameters
----------
sValue :
Returns
-------
"""
if self.disp == 'enum':
return self.revEnum[sValue]
else:
return self._base.fromString(sValue)
def _genDocs(self,file):
"""
Parameters
----------
file :
Returns
-------
"""
BaseVariable._genDocs(self,file)
for a in ['offset', 'numValues', 'bitSize', 'bitOffset', 'verifyEn', 'varBytes']:
astr = str(getattr(self,a))
if astr != 'None':
print(pr.genDocTableRow([a,astr],4,100),file=file)
[docs]
class LocalVariable(BaseVariable):
""" """
def __init__(self, *,
name,
value=None,
description='',
mode='RW',
disp='{}',
enum=None,
units=None,
hidden=False,
groups=None,
minimum=None,
maximum=None,
lowWarning=None,
lowAlarm=None,
highWarning=None,
highAlarm=None,
localSet=None,
localGet=None,
pollInterval=0,
updateNotify=True,
typeStr='Unknown',
bulkOpEn=True,
guiGroup=None,
**kwargs):
if value is None and localGet is None:
raise VariableError(f'LocalVariable {self.path} without localGet() must specify value= argument in constructor')
BaseVariable.__init__(self, name=name, description=description,
mode=mode, value=value, disp=disp,
enum=enum, units=units, hidden=hidden, groups=groups,
minimum=minimum, maximum=maximum, typeStr=typeStr,
lowWarning=lowWarning, lowAlarm=lowAlarm,
highWarning=highWarning, highAlarm=highAlarm,
pollInterval=pollInterval,updateNotify=updateNotify, bulkOpEn=bulkOpEn,
guiGroup=guiGroup, **kwargs)
self._block = pr.LocalBlock(variable=self,
localSet=localSet,
localGet=localGet,
minimum=minimum,
maximum=maximum,
value=self._default)
[docs]
@pr.expose
def set(self, value, *, index=-1, write=True, verify=True, check=True):
"""
Set the value and write to hardware if applicable
Writes to hardware are blocking. An error will result in a logged exception.
Parameters
----------
value :
* :
index : int
(Default value = -1)
write : bool
(Default value = True)
verify : bool
(Default value = True)
check : bool
(Default value = True)
Returns
-------
"""
self._log.debug("{}.set({})".format(self, value))
try:
# Set value to block
self._block.set(self, value, index)
if write:
self._block._checkTransaction()
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error setting value '{}' to variable '{}' with type {}. Exception={}".format(value,self.path,self.typeStr,e))
raise e
[docs]
@pr.expose
def post(self,value, *, index=-1):
"""
Set the value and write to hardware if applicable using a posted write.
This method does not call through parent.writeBlocks(), but rather
calls on self._block directly.
Parameters
----------
value :
* :
index : int
(Default value = -1)
Returns
-------
"""
self._log.debug("{}.post({})".format(self, value))
try:
self._block.set(self, value, index)
self._block._checkTransaction()
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error posting value '{}' to variable '{}' with type {}".format(value,self.path,self.typeStr))
raise e
[docs]
@pr.expose
def get(self, *, index=-1, read=True, check=True):
"""
Parameters
----------
* :
index : int
(Default value = -1)
read : bool
(Default value = True)
check : bool
(Default value = True)
Returns
-------
type
Hardware read is blocking. An error will result in a logged exception.
Listeners will be informed of the update.
"""
try:
if read and check:
self._block._checkTransaction()
return self._block.get(self,index)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error reading value from variable '{}'".format(self.path))
raise e
def __get__(self):
return self.get(read=False)
def __iadd__(self, other):
self._block._iadd(other)
return self
def __isub__(self, other):
self._block._isub(other)
return self
def __imul__(self, other):
self._block._imul(other)
return self
def __imatmul__(self, other):
self._block._imatmul(other)
return self
def __itruediv__(self, other):
self._block._itruediv(other)
return self
def __ifloordiv__(self, other):
self._block._ifloordiv(other)
return self
def __imod__(self, other):
self._block._imod(other)
return self
def __ipow__(self, other):
self._block._ipow(other)
return self
def __ilshift__(self, other):
self._block._ilshift(other)
return self
def __irshift__(self, other):
self._block._irshift(other)
return self
def __iand__(self, other):
self._block._iand(other)
return self
def __ixor__(self, other):
self._block._ixor(other)
return self
def __ior__(self, other):
self._block._ior(other)
return self
[docs]
class LinkVariable(BaseVariable):
""" """
def __init__(self, *,
name,
variable=None,
dependencies=None,
linkedSet=None,
linkedGet=None,
minimum=None,
maximum=None,
**kwargs): # Args passed to BaseVariable
# Set and get functions
self._linkedGet = linkedGet
self._linkedSet = linkedSet
if minimum is not None or maximum is not None:
raise VariableError("Invalid use of min or max values with LinkVariable")
if variable is not None:
# If directly linked to a variable, use it's value and set by defualt
# for linkedGet and linkedSet unless overridden
self._linkedGet = linkedGet if linkedGet else variable.get
self._linkedSet = linkedSet if linkedSet else variable.set
# Search the kwargs for overridden properties, otherwise the properties from the linked variable will be used
args = ['disp', 'enum', 'units', 'minimum', 'maximum']
for arg in args:
if arg not in kwargs:
kwargs[arg] = getattr(variable, arg)
# Why not inhertit mode from link variable?
if not self._linkedSet:
kwargs['mode'] = 'RO'
if not self._linkedGet:
kwargs['mode'] = 'WO'
# Wrap linked get and set functions
self._linkedGetWrap = pr.functionWrapper(function=self._linkedGet, callArgs=['dev', 'var', 'read', 'index', 'check'])
self._linkedSetWrap = pr.functionWrapper(function=self._linkedSet, callArgs=['dev', 'var', 'value', 'write', 'index', 'verify', 'check'])
# Call super constructor
BaseVariable.__init__(self, name=name, **kwargs)
# Dependency tracking
if variable is not None:
# Add the directly linked variable as a dependency
self.addDependency(variable)
if dependencies is not None:
for d in dependencies:
self.addDependency(d)
self.__depBlocks = []
def __getitem__(self, key):
# Allow dependencies to be accessed as indices of self
return self.dependencies[key]
[docs]
@pr.expose
def set(self, value, *, write=True, index=-1, verify=True, check=True):
"""
Parameters
----------
value :
* :
write : bool
(Default value = True)
index : int
(Default value = -1)
verify : bool
(Default value = True)
check : bool
(Default value = True)
Returns
-------
"""
try:
self._linkedSetWrap(function=self._linkedSet, dev=self.parent, var=self, value=value, write=write, index=index, verify=verify, check=check)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error setting link variable '{}'".format(self.path))
raise e
[docs]
@pr.expose
def get(self, read=True, index=-1, check=True):
"""
Parameters
----------
read : bool
(Default value = True)
index : int
(Default value = -1)
check : bool
(Default value = True)
Returns
-------
"""
try:
return self._linkedGetWrap(function=self._linkedGet, dev=self.parent, var=self, read=read, index=index, check=check)
except Exception as e:
pr.logException(self._log,e)
self._log.error("Error getting link variable '{}'".format(self.path))
raise e
def __getBlocks(self):
b = []
for d in self.dependencies:
if isinstance(d, LinkVariable):
b.extend(d.__getBlocks())
elif hasattr(d, '_block') and d._block is not None:
b.append(d._block)
return b
def _finishInit(self):
super()._finishInit()
self.__depBlocks = self.__getBlocks()
@property
def depBlocks(self):
""" Return a list of Blocks that this LinkVariable depends on """
return self.__depBlocks
@pr.expose
@property
def pollInterval(self):
depIntervals = [dep.pollInterval for dep in self.dependencies if dep.pollInterval > 0]
if len(depIntervals) == 0:
return 0
else:
return min(depIntervals)