#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Device Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import rogue.interfaces.memory as rim
import collections
import functools as ft
import pyrogue as pr
import threading
class EnableVariable(pr.BaseVariable):
""" """
def __init__(self, *, enabled, deps=None):
pr.BaseVariable.__init__(
self,
description='Determines if device is enabled for hardware access',
name='enable',
mode='RW',
value=enabled,
groups='Enable',
disp={False: 'False', True: 'True', 'parent': 'ParentFalse', 'deps': 'ExtDepFalse'})
if deps is None:
self._deps = []
self._depDis = False
else:
self._deps = deps
self._depDis = True
for d in self._deps:
d.addListener(self)
self._value = enabled
self._lock = threading.Lock()
@pr.expose
def get(self, read=False, index=-1):
"""
Parameters
----------
read : bool
(Default value = False)
index : int
(Default value = -1)
Returns
ret :
-------
"""
ret = self._value
with self._lock:
if self._value is False:
ret = False
elif self._depDis:
ret = 'deps'
elif self._parent is self._root:
#print("Root enable = {}".format(self._value))
ret = self._value
else:
if self._parent._parent.enable.value() is not True:
ret = 'parent'
else:
ret = True
return ret
@pr.expose
def set(self, value, write=True, index=-1):
"""
Parameters
----------
value :
(Default value = enabled)
write : bool
(Default value = True)
index : int
(Default value = -1)
Returns
-------
"""
if value != 'parent' and value != 'deps':
old = self.value()
with self._lock:
self._value = value
if old != value and old != 'parent' and old != 'deps':
self.parent._updateBlockEnable()
self.parent.enableChanged(value)
with self.parent.root.updateGroup():
self._queueUpdate()
# The following concept will trigger enable listeners
# directly. This is causing lock contentions in practice
# (epics4 as an example)
#self._doUpdate()
#for var in self._listeners:
# var._doUpdate()
def _doUpdate(self):
""" """
if len(self._deps) != 0:
oldEn = (self.value() is True)
with self._lock:
self._depDis = not all(x.value() for x in self._deps)
newEn = (self.value() is True)
if oldEn != newEn:
self.parent._updateBlockEnable()
self.parent.enableChanged(newEn)
return super()._doUpdate()
def _rootAttached(self,parent,root):
"""
Parameters
----------
parent :
root :
Returns
-------
"""
pr.Node._rootAttached(self,parent,root)
if parent is not root:
parent._parent.enable.addListener(self)
class DeviceError(Exception):
""" """
pass
[docs]
class Device(pr.Node,rim.Hub):
"""Device class holder. TODO: Update comments"""
def __init__(self, *,
name=None,
description='',
memBase=None,
offset=0,
hidden=False,
groups=None,
expand=False,
enabled=True,
defaults=None,
enableDeps=None,
hubMin=0,
hubMax=0,
guiGroup=None):
"""Initialize device class"""
if name is None:
name = self.__class__.__name__
# Hub.__init__ must be called first for _setSlave to work below
rim.Hub.__init__(self,offset,hubMin,hubMax)
# Blocks
self._blocks = []
self._custBlocks = []
self._memBase = memBase
self._memLock = threading.RLock()
self._defaults = defaults if defaults is not None else {}
self._ifAndProto = []
self.forceCheckEach = False
# Connect to memory slave
if memBase:
self._setSlave(memBase)
# Node.__init__ can't be called until after self._memBase is created
pr.Node.__init__(self, name=name, hidden=hidden, groups=groups, description=description, expand=expand, guiGroup=guiGroup)
self._log.info("Making device {:s}".format(name))
# Convenience methods
self.addRemoteCommands = ft.partial(self.addNodes, pr.RemoteCommand)
# Variable interface to enable flag
self.add(EnableVariable(enabled=enabled, deps=enableDeps))
self.add(pr.LocalCommand(name='ReadDevice', value=False, hidden=True,
function=lambda arg: self.readAndCheckBlocks(recurse=arg),
description='Force read of device without recursion'))
self.add(pr.LocalCommand(name='WriteDevice', value='', hidden=True,
function=lambda arg: self.writeAndVerifyBlocks(force=True,recurse=arg),
description='Force write of device without recursion'))
@pr.expose
@property
def address(self):
""" """
return self._getAddress()
@pr.expose
@property
def offset(self):
""" """
return self._getOffset()
[docs]
def addCustomBlock(self, block):
"""
Parameters
----------
block :
Returns
-------
"""
self._custBlocks.append(block)
self._custBlocks.sort(key=lambda x: (x.offset, x.size))
[docs]
def add(self,node):
"""
Parameters
----------
node :
Returns
-------
"""
# Call node add
pr.Node.add(self,node)
# Adding device
if isinstance(node,Device):
# Device does not have a membase
if node._memBase is None:
node._setSlave(self)
[docs]
def addInterface(self, *interfaces):
"""
Add one or more rogue.interfaces.stream.Master or rogue.interfaces.memory.Master
Also accepts iterables for adding multiple at once
Parameters
----------
*interfaces :
Returns
-------
"""
for interface in interfaces:
if isinstance(interface, collections.abc.Iterable):
self._ifAndProto.extend(interface)
else:
self._ifAndProto.append(interface)
[docs]
def addProtocol(self, *protocols):
"""
Add a protocol entity.
Also accepts iterables for adding multiple at once
Parameters
----------
*protocols :
Returns
-------
"""
self.addInterface(protocols)
[docs]
def manage(self, *interfaces):
"""
Parameters
----------
*interfaces :
Returns
-------
"""
self._ifAndProto.extend(interfaces)
def _start(self):
""" Called recursively from Root.start when starting """
for intf in self._ifAndProto:
if hasattr(intf,"_start"):
intf._start()
for d in self.devices.values():
d._start()
def _stop(self):
"""Called recursively from Root.stop when exiting"""
for intf in self._ifAndProto:
if hasattr(intf,"_stop"):
intf._stop()
for d in self.devices.values():
d._stop()
@property
def running(self):
""" Check if Device._start() has been called """
return self.root is not None and self.root.running
[docs]
def addRemoteVariables(self, number, stride, pack=False, **kwargs):
"""
Parameters
----------
number :
stride :
pack : bool
(Default value = False)
**kwargs :
Returns
-------
"""
if pack:
hidden=True
else:
hidden=kwargs.pop('hidden', False)
self.addNodes(pr.RemoteVariable, number, stride, hidden=hidden, **kwargs)
# If pack specified, create a linked variable to combine everything
if pack:
varList = getattr(self, kwargs['name']).values()
def linkedSet(dev, var, val, write):
"""
Parameters
----------
dev :
var :
val :
write :
Returns
-------
"""
if val == '':
return
values = reversed(val.split('_'))
for variable, value in zip(varList, values):
variable.setDisp(value, write=write)
def linkedGet(dev, var, read):
"""
Parameters
----------
dev :
var :
read :
Returns
-------
"""
values = [v.getDisp(read=read) for v in varList]
return '_'.join(reversed(values))
name = kwargs.pop('name')
kwargs.pop('value', None)
lv = pr.LinkVariable(name=name, value='', dependencies=varList, linkedGet=linkedGet, linkedSet=linkedSet, **kwargs)
self.add(lv)
[docs]
def setPollInterval(self, interval, variables=None):
"""
Set the poll interval for a group of variables.
The variables param is an Iterable of strings
If variables=None, set interval for all variables that currently have nonzero pollInterval
Parameters
----------
interval :
variables : str
(Default value = None)
Returns
-------
"""
if variables is None:
variables = [k for k,v in self.variables.items() if v.pollInterval != 0]
for x in variables:
self.node(x).setPollInterval(interval)
[docs]
def hideVariables(self, hidden, variables=None):
"""
Hide a list of Variables (or Variable names)
Parameters
----------
hidden :
variables : str
(Default value = None)
Returns
-------
"""
if variables is None:
variables=self.variables.values()
for v in variables:
if isinstance(v, pr.BaseVariable):
v.hidden = hidden
elif isinstance(variables[0], str):
self.variables[v].hidden = hidden
def initialize(self):
""" """
for key,value in self.devices.items():
value.initialize()
def hardReset(self):
""" """
for key,value in self.devices.items():
value.hardReset()
def countReset(self):
""" """
for key,value in self.devices.items():
value.countReset()
[docs]
def enableChanged(self,value):
"""
Parameters
----------
value :
Returns
-------
"""
pass
#if value is True:
# self.writeAndVerifyBlocks(force=True, recurse=True, variable=None)
[docs]
def writeBlocks(self, *, force=False, recurse=True, variable=None, checkEach=False, index=-1, **kwargs):
"""
Write all of the blocks held by this Device to memory
Parameters
----------
* :
force : bool
(Default value = False)
recurse : bool
(Default value = True)
variable : str
(Default value = None)
checkEach : bool
(Default value = False)
index : int
(Default value = -1)
**kwargs :
Returns
-------
"""
checkEach = checkEach or self.forceCheckEach
if variable is not None:
pr.startTransaction(variable._block, type=rim.Write, forceWr=force, checkEach=checkEach, variable=variable, index=index, **kwargs)
else:
for block in self._blocks:
if block.bulkOpEn:
pr.startTransaction(block, type=rim.Write, forceWr=force, checkEach=checkEach, **kwargs)
if recurse:
for key,value in self.devices.items():
value.writeBlocks(force=force, recurse=True, checkEach=checkEach, **kwargs)
[docs]
def verifyBlocks(self, *, recurse=True, variable=None, checkEach=False, **kwargs):
"""
Perform background verify
Parameters
----------
* :
recurse : bool
(Default value = True)
variable : str
(Default value = None)
checkEach : bool
(Default value = False)
**kwargs :
Returns
-------
"""
checkEach = checkEach or self.forceCheckEach
if variable is not None:
pr.startTransaction(variable._block, type=rim.Verify, checkEach=checkEach, **kwargs) # Verify range is set by previous write
else:
for block in self._blocks:
if block.bulkOpEn:
pr.startTransaction(block, type=rim.Verify, checkEach=checkEach, **kwargs)
if recurse:
for key,value in self.devices.items():
value.verifyBlocks(recurse=True, checkEach=checkEach, **kwargs)
[docs]
def readBlocks(self, *, recurse=True, variable=None, checkEach=False, index=-1, **kwargs):
"""
Perform background reads
Parameters
----------
* :
recurse : bool
(Default value = True)
variable : str
(Default value = None)
checkEach : bool
(Default value = False)
index : int
(Default value = -1)
**kwargs :
Returns
-------
"""
checkEach = checkEach or self.forceCheckEach
if variable is not None:
pr.startTransaction(variable._block, type=rim.Read, checkEach=checkEach, variable=variable, index=index, **kwargs)
else:
for block in self._blocks:
if block.bulkOpEn:
pr.startTransaction(block, type=rim.Read, checkEach=checkEach, **kwargs)
if recurse:
for key,value in self.devices.items():
value.readBlocks(recurse=True, checkEach=checkEach, **kwargs)
[docs]
def checkBlocks(self, *, recurse=True, variable=None, **kwargs):
"""
Check errors in all blocks and generate variable update notifications
Parameters
----------
* :
recurse : bool
(Default value = True)
variable : str
(Default value = None)
**kwargs :
Returns
-------
"""
if variable is not None:
pr.checkTransaction(variable._block, **kwargs)
else:
for block in self._blocks:
pr.checkTransaction(block, **kwargs)
if recurse:
for key,value in self.devices.items():
value.checkBlocks(recurse=True, **kwargs)
[docs]
def writeAndVerifyBlocks(self, force=False, recurse=True, variable=None, checkEach=False):
"""
Perform a write, verify and check. Useful for committing any stale variables
Parameters
----------
force : bool
(Default value = False)
recurse : bool
(Default value = True)
variable : str
(Default value = None)
checkEach : bool
(Default value = False)
Returns
-------
"""
self.writeBlocks(force=force, recurse=recurse, variable=variable, checkEach=checkEach)
self.verifyBlocks(recurse=recurse, variable=variable, checkEach=checkEach)
self.checkBlocks(recurse=recurse, variable=variable)
[docs]
def readAndCheckBlocks(self, recurse=True, variable=None, checkEach=False):
"""
Perform a read and check.
Parameters
----------
recurse : bool
(Default value = True)
variable : str
(Default value = None)
checkEach : bool
(Default value = False)
Returns
-------
"""
self.readBlocks(recurse=recurse, variable=variable, checkEach=checkEach)
self.checkBlocks(recurse=recurse, variable=variable)
def _updateBlockEnable(self):
""" """
for block in self._blocks:
block.setEnable(self.enable.value() is True)
for key,value in self.devices.items():
value._updateBlockEnable()
def _buildBlocks(self):
""" """
remVars = []
# Use min block size, larger blocks can be pre-created
blkSize = self._blkMinAccess()
# Process all of the variables
for k,n in self.nodes.items():
# Local variables have a 1:1 block association
if isinstance(n,pr.LocalVariable):
self._blocks.append(n._block)
# Align to min access, create list of remote variables
elif isinstance(n,pr.RemoteVariable) and n.offset is not None:
self._log.info(f"Before Shift variable {n.name} offset={n.offset} bitSize={n.bitSize} bytes={n.varBytes}")
n._updatePath(n.path)
n._shiftOffsetDown(n.offset % blkSize, blkSize)
remVars += [n]
self._log.info(f"Creating variable {n.name} offset={n.offset} bitSize={n.bitSize} bytes={n.varBytes}")
# Sort var list by offset, size
remVars.sort(key=lambda x: (x.offset, x.varBytes))
blocks = []
blk = None
# Go through sorted variable list, look for overlaps, group into blocks
for n in remVars:
if blk is not None and ( (blk['offset'] + blk['size']) > n.offset):
self._log.info("Overlap detected var offset={} block offset={} block bytes={}".format(n.offset,blk['offset'],blk['size']))
n._shiftOffsetDown(n.offset - blk['offset'], blkSize)
blk['vars'].append(n)
if n.varBytes > blk['size']:
blk['size'] = n.varBytes
# We need a new block for this variable
else:
blk = None
# Look for pre-made block which overlaps
for b in self._custBlocks:
if ( (n.offset >= b.offset) and ((b.offset + b.size) > n.offset)):
# Just in case a variable extends past the end of pre-made block, user mistake
if n.varBytes > b.size:
msg = 'Failed to add variable {n.name} to pre-made block with offset {b.offset} and size {b.size}'
raise pr.MemoryError(name=self.path, address=self.address, msg=msg)
blk = {'offset':b.offset, 'size':b.size, 'vars':[n], 'block':b}
break
# Block not found
if blk is None:
blk = {'offset':n.offset, 'size':n.varBytes, 'vars':[n], 'block':None}
blocks.append(blk)
# Clear pre-made list
self._custBlocks = []
# Create new blocks and add new and pre-made blocks to device
# Add variables to the block
for b in blocks:
# Create new block
if b['block'] is None:
newBlock = rim.Block(b['offset'], b['size'])
self._log.debug("Adding new block at offset {:#02x}, size {}".format(b['offset'], b['size']))
else:
newBlock = b['block']
# Set memory slave
newBlock._setSlave(self)
# Verify the block is not too small or large for the memory interface
if newBlock.size > self._blkMaxAccess() or newBlock.size < self._blkMinAccess():
msg = f'Block size {newBlock.size} is not in the range: {self._blkMinAccess()} - {self._blkMaxAccess()}'
raise pr.MemoryError(name=self.path, address=self.address, msg=msg)
# Add variables to the block
newBlock.addVariables(b['vars'])
# Set varible block links
for v in b['vars']:
v._block = newBlock
# Add to device
self._blocks.append(newBlock)
newBlock.setEnable(self.enable.value() is True)
def _rootAttached(self, parent, root):
"""
Parameters
----------
parent :
root :
Returns
-------
"""
pr.Node._rootAttached(self, parent, root)
for key,value in self._nodes.items():
value._rootAttached(self,root)
self._buildBlocks()
# Override defaults as dictated by the _defaults dict
for varName, defValue in self._defaults.items():
nodes,keys = self.nodeMatch(varName)
if keys is None:
for var in nodes:
var._default = defValue
def _setTimeout(self,timeout):
"""
Set timeout value on all devices & blocks
Parameters
----------
timeout :
Returns
-------
"""
for block in self._blocks:
block._setTimeout(int(timeout*1000000))
rim.Master._setTimeout(self, int(timeout*1000000))
for key,value in self._nodes.items():
if isinstance(value,Device):
value._setTimeout(timeout)
[docs]
def command(self, **kwargs):
"""
A Decorator to add inline constructor functions as commands
Parameters
----------
**kwargs :
Returns
-------
"""
def _decorator(func):
"""
Parameters
----------
func :
Returns
-------
"""
if 'name' not in kwargs:
kwargs['name'] = func.__name__
self.add(pr.LocalCommand(function=func, **kwargs))
return func
return _decorator
[docs]
def linkVariableGet(self, **kwargs):
"""
Decorator to add inline constructor functions as LinkVariable.linkedGet functions
Parameters
----------
**kwargs :
Returns
-------
"""
def _decorator(func):
"""
Parameters
----------
func :
Returns
-------
"""
if 'name' not in kwargs:
kwargs['name'] = func.__name__
self.add(pr.LinkVariable(linkedGet=func, **kwargs))
return func
return _decorator
[docs]
def genDocuments(self,path,incGroups, excGroups):
"""
Parameters
----------
path :
incGroups :
excGroups :
Returns
-------
"""
with open(path + '/' + self.path.replace('.','_') + '.rst','w') as file:
print("****************************",file=file)
print(self.name,file=file)
print("****************************",file=file)
print('',file=file)
print(pr.genDocDesc(self.description,0),file=file)
print('',file=file)
plist = self.getNodes(typ=pr.Process,incGroups=incGroups,excGroups=excGroups)
dlist = self.devicesByGroup(incGroups=incGroups,excGroups=excGroups)
vlist = self.variablesByGroup(incGroups=incGroups,excGroups=excGroups)
clist = self.commandsByGroup(incGroups=incGroups,excGroups=excGroups)
tlist = {k:v for k,v in dlist.items() if k not in plist}
if len(tlist) > 0:
print(".. toctree::",file=file)
print(" :maxdepth: 1",file=file)
print(" :caption: Sub Devices:",file=file)
print('',file=file)
for k,v in tlist.items():
print(' ' + v.path.replace('.','_'),file=file)
v.genDocuments(path,incGroups,excGroups)
print('',file=file)
if len(plist) > 0:
print(".. toctree::",file=file)
print(" :maxdepth: 1",file=file)
print(" :caption: Processes:",file=file)
print('',file=file)
for k,v in plist.items():
print(' ' + v.path.replace('.','_'),file=file)
v.genDocuments(path,incGroups,excGroups)
print('',file=file)
print("Summary",file=file)
print("#######",file=file)
print('',file=file)
if len(vlist):
print("Variable List",file=file)
print("*************",file=file)
print('',file=file)
for k,v in vlist.items():
print(f"* {k}",file=file)
print('',file=file)
if len(clist):
print("Command List",file=file)
print("*************",file=file)
print('',file=file)
for k,v in clist.items():
print(f"* {k}",file=file)
print('',file=file)
print("Details",file=file)
print("#######",file=file)
print('',file=file)
if len(vlist):
print("Variables",file=file)
print("*********",file=file)
print('',file=file)
for k,v in vlist.items():
v._genDocs(file=file)
print('',file=file)
if len(clist):
print("Commands",file=file)
print("********",file=file)
print('',file=file)
for k,v in clist.items():
v._genDocs(file=file)
print('',file=file)
class ArrayDevice(Device):
""" """
def __init__(self, *, arrayClass, number, stride=0, arrayArgs=None, **kwargs):
if 'name' not in kwargs:
kwargs['name'] = f'{arrayClass.__name__}Array'
super().__init__(**kwargs)
if arrayArgs is None:
arrayArgs = [{} for x in range(number)]
elif isinstance(arrayArgs, dict):
arrayArgs = [arrayArgs.copy() for x in range(number)]
for i in range(number):
args = arrayArgs[i]
if 'name' in args:
name = args.pop('name')
else:
name = f'{arrayClass.__name__}[{i}]'
self.add(arrayClass(
name=name,
offset=i*stride,
**args))