#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Command Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import rogue.interfaces.memory
import pyrogue as pr
import inspect
import threading
class CommandError(Exception):
"""
Exception for command errors.
"""
pass
[docs]
class BaseCommand(pr.BaseVariable):
""" """
def __init__(self, *,
name=None,
description="",
value=0,
retValue=None,
enum=None,
hidden=False,
groups=None,
minimum=None,
maximum=None,
function=None,
background=False,
guiGroup=None,
**kwargs):
pr.BaseVariable.__init__(
self,
name=name,
description=description,
mode='WO',
value=value,
enum=enum,
hidden=hidden,
groups=groups,
minimum=minimum,
maximum=maximum,
bulkOpEn=False,
guiGroup=guiGroup,
**kwargs)
self._function = function
self._functionWrap = pr.functionWrapper(function=self._function, callArgs=['root', 'dev', 'cmd', 'arg'])
self._thread = None
self._lock = threading.Lock()
self._retDisp = "{}"
if retValue is None:
self._retTypeStr = None
elif isinstance(retValue, list):
self._retTypeStr = f'List[{retValue[0].__class__.__name__}]'
else:
self._retTypeStr = retValue.__class__.__name__
# args flag
try:
self._arg = 'arg' in inspect.getfullargspec(self._function).args
# C++ functions
except Exception:
self._arg = False
@pr.expose
@property
def arg(self):
""" """
return self._arg
@pr.expose
@property
def retTypeStr(self):
""" """
return self._retTypeStr
def __call__(self,arg=None):
return self._doFunc(arg)
def _doFunc(self,arg):
"""
Execute command: TODO: Update comments
Parameters
----------
arg :
Returns
-------
"""
if (self.parent.enable.value() is not True):
return
try:
# Convert arg
if arg is None:
arg = self._default
else:
arg = self.parseDisp(arg)
ret = self._functionWrap(function=self._function, root=self.root, dev=self.parent, cmd=self, arg=arg)
# Set arg to local variable if not a remote variable
if self._arg and not isinstance(self,RemoteCommand):
self._default = arg
self._queueUpdate()
return ret
except Exception as e:
pr.logException(self._log,e)
raise e
[docs]
@pr.expose
def call(self,arg=None):
"""
Parameters
----------
arg : str
(Default value = None)
Returns
-------
"""
return self.__call__(arg)
[docs]
@pr.expose
def callDisp(self,arg=None):
"""
Parameters
----------
arg : str
(Default value = None)
Returns
-------
"""
return self.genDisp(self.__call__(arg),useDisp=self._retDisp)
@staticmethod
def nothing():
""" """
pass
[docs]
@staticmethod
def read(cmd):
"""
Parameters
----------
cmd :
Returns
-------
"""
cmd.get(read=True)
[docs]
@staticmethod
def setArg(cmd, arg):
"""
Parameters
----------
cmd :
arg :
Returns
-------
"""
cmd.set(arg)
[docs]
@staticmethod
def setAndVerifyArg(cmd, arg):
"""
Parameters
----------
cmd :
arg :
Returns
-------
"""
cmd.set(arg)
ret = cmd.get()
if ret != arg:
raise CommandError(f'Verification failed for {cmd.path}. \nSet to {arg} but read back {ret}')
[docs]
@staticmethod
def createToggle(sets):
"""
Parameters
----------
sets :
Returns
-------
"""
def toggle(cmd):
"""
Parameters
----------
cmd :
Returns
-------
"""
for s in sets:
cmd.set(s)
return toggle
[docs]
@staticmethod
def toggle(cmd):
"""
Parameters
----------
cmd :
Returns
-------
"""
cmd.set(1)
cmd.set(0)
[docs]
@staticmethod
def createTouch(value):
"""
Parameters
----------
value :
Returns
-------
"""
def touch(cmd):
"""
Parameters
----------
cmd :
Returns
-------
"""
cmd.set(value)
return touch
[docs]
@staticmethod
def touch(cmd, arg):
"""
Parameters
----------
cmd :
arg :
Returns
-------
"""
if arg is not None:
cmd.set(arg)
else:
cmd.set(1)
[docs]
@staticmethod
def touchZero(cmd):
"""
Parameters
----------
cmd :
Returns
-------
"""
cmd.set(0)
[docs]
@staticmethod
def touchOne(cmd):
"""
Parameters
----------
cmd :
Returns
-------
"""
cmd.set(1)
[docs]
@staticmethod
def createPostedTouch(value):
"""
Parameters
----------
value :
Returns
-------
"""
def postedTouch(cmd):
"""
Parameters
----------
cmd :
Returns
-------
"""
cmd.post(value)
return postedTouch
[docs]
@staticmethod
def postedTouch(cmd, arg):
"""
Parameters
----------
cmd :
arg :
Returns
-------
"""
cmd.post(arg)
[docs]
@staticmethod
def postedTouchOne(cmd):
"""
Parameters
----------
cmd :
Returns
-------
"""
cmd.post(1)
[docs]
@staticmethod
def postedTouchZero(cmd):
"""
Parameters
----------
cmd :
Returns
-------
"""
cmd.post(0)
[docs]
def replaceFunction(self, function):
"""
Parameters
----------
function :
Returns
-------
"""
self._function = function
self._functionWrap = pr.functionWrapper(function=self._function, callArgs=['root', 'dev', 'cmd', 'arg'])
def _setDict(self,d,writeEach,modes,incGroups,excGroups,keys):
"""
Parameters
----------
d :
writeEach :
modes :
incGroups :
excGroups :
keys :
Returns
-------
"""
pass
def _getDict(self,modes,incGroups,excGroups,properties):
"""
Parameters
----------
modes :
incGroups :
excGroups :
Returns
-------
"""
return None
[docs]
def get(self,read=True, index=-1):
"""
Parameters
----------
read : bool
(Default value = True)
index : int
(Default value = -1)
Returns
-------
"""
return self._default
def _genDocs(self,file):
"""
Parameters
----------
file :
Returns
-------
"""
print(f".. topic:: {self.path}",file=file)
print('',file=file)
print(pr.genDocDesc(self.description,4),file=file)
print('',file=file)
print(pr.genDocTableHeader(['Field','Value'],4,100),file=file)
for a in ['name', 'path', 'enum', 'typeStr', 'disp']:
astr = str(getattr(self,a))
if astr != 'None':
print(pr.genDocTableRow([a,astr],4,100),file=file)
# LocalCommand is the same as BaseCommand
LocalCommand = BaseCommand
[docs]
class RemoteCommand(BaseCommand, pr.RemoteVariable):
""" """
def __init__(self, *,
name,
description='',
value=None,
retValue=None,
enum=None,
hidden=False,
groups=None,
minimum=None,
maximum=None,
function=None,
base=pr.UInt,
offset=None,
bitSize=32,
bitOffset=0,
overlapEn=False,
guiGroup=None,
**kwargs):
# RemoteVariable constructor will handle assignment of most params
BaseCommand.__init__(
self,
name=name,
retValue=retValue,
function=function,
guiGroup=guiGroup,
**kwargs)
pr.RemoteVariable.__init__(
self,
name=name,
description=description,
mode='WO',
value=value,
enum=enum,
hidden=hidden,
groups=groups,
minimum=minimum,
maximum=maximum,
base=base,
offset=offset,
bitSize=bitSize,
bitOffset=bitOffset,
overlapEn=overlapEn,
bulkOpEn=False,
verify=False,
guiGroup=guiGroup,
**kwargs)
[docs]
def set(self, value, *, index=-1, write=True):
"""
Parameters
----------
value :
* :
index : int
(Default value = -1)
write : bool
(Default value = True)
Returns
-------
"""
self._log.debug("{}.set({})".format(self, value))
try:
self._set(value,index)
if write:
pr.startTransaction(self._block, type=rogue.interfaces.memory.Write, forceWr=True, checkEach=True, variable=self, index=index)
except Exception as e:
pr.logException(self._log,e)
raise e
[docs]
def get(self, *, index=-1, read=True):
"""
Parameters
----------
* :
index : int
(Default value = -1)
read : bool
(Default value = True)
Returns
-------
"""
try:
if read:
pr.startTransaction(self._block, type=rogue.interfaces.memory.Read, forceWr=False, checkEach=True, variable=self, index=index)
return self._get(index)
except Exception as e:
pr.logException(self._log,e)
raise e
def _genDocs(self,file):
"""
Parameters
----------
file :
Returns
-------
"""
BaseCommand._genDocs(self,file)
for a in ['offset', 'bitSize', 'bitOffset', 'varBytes']:
astr = str(getattr(self,a))
if astr != 'None':
print(pr.genDocTableRow([a,astr],4,100),file=file)
# Alias, this should go away
Command = BaseCommand