#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Command Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations
import inspect
import threading
from typing import Any, Callable, Iterable
import pyrogue as pr
import rogue.interfaces.memory
class CommandError(Exception):
"""Raised when command execution or verification fails."""
pass
[docs]
class BaseCommand(pr.BaseVariable):
"""Base class for PyRogue commands.
Parameters
----------
name : str, optional
Command name.
description : str, optional (default = "")
Human-readable description.
value : object, optional (default = 0)
Default command value.
retValue : object, optional
Example return value used to infer display type.
enum : dict, optional
Mapping from object values to display strings.
hidden : bool, optional (default = False)
If True, add the command to the ``Hidden`` group.
groups : list[str], optional
Groups to assign.
minimum : object, optional
Minimum allowed value.
maximum : object, optional
Maximum allowed value.
function : callable, optional
Callback executed when the command is invoked. The wrapper provides
keyword arguments ``root``, ``dev``, ``cmd``, and ``arg``; the
callback may accept any subset of these names.
Default to no-op for command if None.
background : bool, optional (default = False)
Reserved for background execution.
guiGroup : str, optional
GUI grouping label.
**kwargs : Any
Additional arguments forwarded to ``BaseVariable``.
"""
def __init__(
self,
*,
name: str | None = None,
description: str = "",
value: Any = 0,
retValue: Any | None = None,
enum: dict[object, str] | None = None,
hidden: bool = False,
groups: list[str] | None = None,
minimum: Any | None = None,
maximum: Any | None = None,
function: Callable[..., Any] | None = None,
background: bool = False,
guiGroup: str | None = None,
**kwargs: Any,
) -> None:
"""Initialize a command variable."""
pr.BaseVariable.__init__(
self,
name=name,
description=description,
mode='WO',
value=value,
enum=enum,
hidden=hidden,
groups=groups,
minimum=minimum,
maximum=maximum,
bulkOpEn=False,
guiGroup=guiGroup,
**kwargs)
self._function = function
self._functionWrap = pr.functionWrapper(function=self._function, callArgs=['root', 'dev', 'cmd', 'arg'])
self._thread = None
self._lock = threading.Lock()
self._retDisp = "{}"
if retValue is None:
self._retTypeStr = None
elif isinstance(retValue, list):
self._retTypeStr = f'List[{retValue[0].__class__.__name__}]'
else:
self._retTypeStr = retValue.__class__.__name__
# args flag
try:
self._arg = 'arg' in inspect.getfullargspec(self._function).args
# C++ functions
except Exception:
self._arg = False
@pr.expose
@property
def arg(self) -> bool:
"""Return True if the command accepts an argument."""
return self._arg
@pr.expose
@property
def retTypeStr(self) -> str | None:
"""Return the display string for the return type."""
return self._retTypeStr
def __call__(self, arg: Any = None) -> Any:
"""Invoke the command.
Parameters
----------
arg : object, optional
Command argument. If ``None``, uses the default value.
"""
return self._doFunc(arg)
def _doFunc(self, arg: Any) -> Any:
"""Execute command callback.
Execute command: TODO: Update comments.
Parameters
----------
arg : object
Command argument.
"""
if (self.parent.enable.value() is not True):
return
try:
# Convert arg
if arg is None:
arg = self._default
else:
arg = self.parseDisp(arg)
ret = self._functionWrap(function=self._function, root=self.root, dev=self.parent, cmd=self, arg=arg)
# Set arg to local variable if not a remote variable
if self._arg and not isinstance(self,RemoteCommand):
self._default = arg
self._queueUpdate()
return ret
except Exception as e:
pr.logException(self._log,e)
raise e
[docs]
@pr.expose
def call(self, arg: Any = None) -> Any:
"""Invoke the command and return the raw result.
Parameters
----------
arg : object, optional
Command argument.
"""
return self.__call__(arg)
[docs]
@pr.expose
def callDisp(self, arg: Any = None) -> str:
"""Invoke the command and return the display-formatted result.
Parameters
----------
arg : object, optional
Command argument.
"""
return self.genDisp(self.__call__(arg),useDisp=self._retDisp)
[docs]
@staticmethod
def nothing() -> None:
"""No-op command handler."""
pass
[docs]
@staticmethod
def read(cmd: BaseCommand) -> None:
"""Read the command variable.
Parameters
----------
cmd : BaseCommand
Command to read.
"""
cmd.get(read=True)
[docs]
@staticmethod
def setArg(cmd: BaseCommand, arg: Any) -> None:
"""Set the command argument.
Parameters
----------
cmd : BaseCommand
Command to write.
arg : object
Value to write.
"""
cmd.set(arg)
[docs]
@staticmethod
def setAndVerifyArg(cmd: BaseCommand, arg: Any) -> None:
"""Set the argument and verify by reading it back.
Parameters
----------
cmd : BaseCommand
Command to write.
arg : object
Value to write and verify.
"""
cmd.set(arg)
ret = cmd.get()
if ret != arg:
raise CommandError(f'Verification failed for {cmd.path}. \nSet to {arg} but read back {ret}')
[docs]
@staticmethod
def createToggle(sets: Iterable[Any]) -> Callable[[BaseCommand], None]:
"""Create a toggle function that iterates over provided values.
Parameters
----------
sets : iterable
Values to write sequentially.
"""
def toggle(cmd: BaseCommand) -> None:
"""Write each configured toggle value to ``cmd``."""
for s in sets:
cmd.set(s)
return toggle
[docs]
@staticmethod
def toggle(cmd: BaseCommand) -> None:
"""Toggle a command by writing 1 then 0.
Parameters
----------
cmd : BaseCommand
Command to toggle.
"""
cmd.set(1)
cmd.set(0)
[docs]
@staticmethod
def createTouch(value: Any) -> Callable[[BaseCommand], None]:
"""Create a touch function that writes a fixed value.
Parameters
----------
value : object
Value to write when the touch function is called.
"""
def touch(cmd: BaseCommand) -> None:
"""Write the captured fixed value to ``cmd``."""
cmd.set(value)
return touch
[docs]
@staticmethod
def touch(cmd: BaseCommand, arg: Any) -> None:
"""Touch the command with a provided argument or 1.
Parameters
----------
cmd : BaseCommand
Command to touch.
arg : object
Value to write. If ``None``, writes ``1``.
"""
if arg is not None:
cmd.set(arg)
else:
cmd.set(1)
[docs]
@staticmethod
def touchZero(cmd: BaseCommand) -> None:
"""Touch the command with 0.
Parameters
----------
cmd : BaseCommand
Command to touch.
"""
cmd.set(0)
[docs]
@staticmethod
def touchOne(cmd: BaseCommand) -> None:
"""Touch the command with 1.
Parameters
----------
cmd : BaseCommand
Command to touch.
"""
cmd.set(1)
[docs]
@staticmethod
def createPostedTouch(value: Any) -> Callable[[BaseCommand], None]:
"""Create a posted touch function for asynchronous writes.
Parameters
----------
value : object
Value to post.
"""
def postedTouch(cmd: BaseCommand) -> None:
"""Post the captured fixed value to ``cmd`` asynchronously."""
cmd.post(value)
return postedTouch
[docs]
@staticmethod
def postedTouch(cmd: BaseCommand, arg: Any) -> None:
"""Post a command value without waiting for completion.
Parameters
----------
cmd : BaseCommand
Command to post.
arg : object
Value to post.
"""
cmd.post(arg)
[docs]
@staticmethod
def postedTouchOne(cmd: BaseCommand) -> None:
"""Post a value of 1.
Parameters
----------
cmd : BaseCommand
Command to post.
"""
cmd.post(1)
[docs]
@staticmethod
def postedTouchZero(cmd: BaseCommand) -> None:
"""Post a value of 0.
Parameters
----------
cmd : BaseCommand
Command to post.
"""
cmd.post(0)
[docs]
def replaceFunction(self, function: Callable[..., Any]) -> None:
"""Replace the command callback function.
Parameters
----------
function : callable
New callback for this command. The wrapper provides keyword
arguments ``root``, ``dev``, ``cmd``, and ``arg``; the callback
may accept any subset of these names.
"""
self._function = function
self._functionWrap = pr.functionWrapper(function=self._function, callArgs=['root', 'dev', 'cmd', 'arg'])
def _setDict(
self,
d: dict[Any, Any],
writeEach: bool,
modes: list[str],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
keys: Any = None,
) -> None:
"""Commands do not support dict writes."""
pass
def _getDict(
self,
modes: list[str],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
properties: Any = None,
) -> None:
"""Commands do not support dict reads."""
return None
[docs]
def get(self, read: bool = True, index: int = -1) -> Any:
"""Return the cached command argument value.
Parameters
----------
read : bool, optional (default = True)
Unused for commands.
index : int, optional (default = -1)
Unused for commands.
"""
return self._default
def _genDocs(self, file: Any) -> None:
"""Emit Sphinx documentation for this command."""
print(f".. topic:: {self.path}",file=file)
print('',file=file)
print(pr.genDocDesc(self.description,4),file=file)
print('',file=file)
print(pr.genDocTableHeader(['Field','Value'],4,100),file=file)
for a in ['name', 'path', 'enum', 'typeStr', 'disp']:
astr = str(getattr(self,a))
if astr != 'None':
print(pr.genDocTableRow([a,astr],4,100),file=file)
# LocalCommand is the same as BaseCommand
LocalCommand = BaseCommand
[docs]
class RemoteCommand(BaseCommand, pr.RemoteVariable):
"""Remote command backed by a memory-mapped variable.
Parameters
----------
name : str
Command name.
description : str, optional (default = "")
Human-readable description.
value : object, optional
Default value.
retValue : object, optional
Example return value used to infer display type.
enum : dict, optional
Mapping from object values to display strings.
hidden : bool, optional (default = False)
If True, add the command to the ``Hidden`` group.
groups : list[str], optional
Groups to assign.
minimum : object, optional
Minimum allowed value.
maximum : object, optional
Maximum allowed value.
function : callable, optional
Callback executed when the command is invoked. The wrapper provides
keyword arguments ``root``, ``dev``, ``cmd``, and ``arg``; the
callback may accept any subset of these names.
Default to no-op for command if None.
base : object, optional (default = ``pr.UInt``)
Base data type for the underlying remote variable.
offset : int, optional
Memory offset.
bitSize : int, optional (default = 32)
Bit width of the value.
bitOffset : int, optional (default = 0)
Bit offset of the value.
overlapEn : bool, optional (default = False)
Allow overlapping remote variables.
guiGroup : str, optional
GUI grouping label.
**kwargs : Any
Additional arguments forwarded to ``RemoteVariable``.
"""
def __init__(
self,
*,
name: str,
description: str = '',
value: Any = None,
retValue: Any = None,
enum: dict[object, str] | None = None,
hidden: bool = False,
groups: list[str] | None = None,
minimum: Any | None = None,
maximum: Any | None = None,
function: Callable[..., Any] | None = None,
base: Any = pr.UInt,
offset: int | None = None,
bitSize: int = 32,
bitOffset: int = 0,
overlapEn: bool = False,
guiGroup: str | None = None,
**kwargs: Any,
) -> None:
"""Initialize a remote command."""
# RemoteVariable constructor will handle assignment of most params
BaseCommand.__init__(
self,
name=name,
retValue=retValue,
function=function,
guiGroup=guiGroup,
**kwargs)
pr.RemoteVariable.__init__(
self,
name=name,
description=description,
mode='WO',
value=value,
enum=enum,
hidden=hidden,
groups=groups,
minimum=minimum,
maximum=maximum,
base=base,
offset=offset,
bitSize=bitSize,
bitOffset=bitOffset,
overlapEn=overlapEn,
bulkOpEn=False,
verify=False,
guiGroup=guiGroup,
**kwargs)
[docs]
def set(self, value: Any, *, index: int = -1, write: bool = True) -> None:
"""Write a value to the remote command variable.
Parameters
----------
value : object
Value to write.
index : int, optional (default = -1)
Optional index for array variables.
write : bool, optional (default = True)
If True, perform a hardware write transaction.
"""
self._log.debug("{}.set({})".format(self, value))
try:
self._set(value,index)
if write:
pr.startTransaction(self._block, type=rogue.interfaces.memory.Write, forceWr=True, checkEach=True, variable=self, index=index)
except Exception as e:
pr.logException(self._log,e)
raise e
[docs]
def get(self, *, index: int = -1, read: bool = True) -> Any:
"""Read a value from the remote command variable.
Parameters
----------
index : int, optional (default = -1)
Optional index for array variables.
read : bool, optional (default = True)
If True, perform a hardware read transaction.
Returns
-------
object
Retrieved value.
"""
try:
if read:
pr.startTransaction(self._block, type=rogue.interfaces.memory.Read, forceWr=False, checkEach=True, variable=self, index=index)
return self._get(index)
except Exception as e:
pr.logException(self._log,e)
raise e
def _genDocs(self, file: Any) -> None:
"""Emit Sphinx documentation for this remote command."""
BaseCommand._genDocs(self,file)
for a in ['offset', 'bitSize', 'bitOffset', 'varBytes']:
astr = str(getattr(self,a))
if astr != 'None':
print(pr.genDocTableRow([a,astr],4,100),file=file)
# Alias, this should go away
Command = BaseCommand