#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Block Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations
import threading
from typing import Any, Callable, Iterable
import numpy as np
import pyrogue as pr
import rogue.interfaces.memory as rim
def startTransaction(
block: rim.Block,
*,
type: int,
forceWr: bool = False,
check: bool = False,
variable: pr.BaseVariable = None,
index: int = -1,
**kwargs: Any,
) -> None:
"""Helper function for starting a transaction on a block.
This helper function ensures future changes to the API do not break custom
code in a Device's writeBlocks, readBlocks and checkBlocks functions.
Parameters
----------
block : rim.Block
Block instance to start a transaction on.
type : {rim.Read, rim.Write, rim.Post, rim.Verify}
Transaction type
forceWr : bool, optional (default = False)
Force the write even if block values are unchanged.
check : bool, optional (default = False)
Wait for transaction to complete before returning
variable : pr.BaseVariable, optional
Variable associated with the transaction, None for pure Block transactions.
index : int, optional (default = -1)
Optional index for array variables.
**kwargs : Any, optional
Unused, provided for future compatibility.
"""
block._startTransaction(type, forceWr, check, variable, index)
def checkTransaction(block: rim.Block, **kwargs: Any) -> None:
"""Wait for completion of transaction on a block.
Helper function for calling the checkTransaction function in a block. This helper
function ensures future changes to the API do not break custom code in a Device's
writeBlocks, readBlocks and checkBlocks functions.
Parameters
----------
block : rim.Block
Block instance to operate on.
**kwargs : Any, optional
Unused, provided for future compatibility.
"""
block._checkTransaction()
def writeBlocks(
blocks: Iterable[rim.Block],
force: bool = False,
checkEach: bool = False,
index: int = -1,
**kwargs: Any,
) -> None:
"""Write a list of blocks efficiently.
Helper function for writing and verifying a list of blocks. Allows a custom list
of blocks to be efficiently written, similar to ``Device.writeBlocks()``.
Parameters
----------
blocks : iterable[rim.Block]
Blocks to write.
force : bool, optional (default = False)
Force the write even if values are unchanged.
checkEach : bool, optional (default = False)
Wait for completion of each block transaction before initiating the next one
index : int, optional (default = -1)
Optional index for array variables.
**kwargs : Any
Additional arguments passed through to startTransaction().
"""
for b in blocks:
startTransaction(b, type=rim.Write, forceWr=force, check=checkEach, index=index, **kwargs)
def verifyBlocks(
blocks: Iterable[rim.Block],
checkEach: bool = False,
**kwargs: Any,
) -> None:
"""Verify a list of blocks efficiently.
Helper function for verifying a list of blocks. Allows a custom list of blocks
to be efficiently verified without blocking between each read, similar to
``Device.verifyBlocks()``.
Parameters
----------
blocks : iterable[rim.Block]
Blocks to verify.
checkEach : bool, optional (default = False)
Wait for completion of each block transation before initiating the next one
**kwargs : Any
Additional arguments passed through to startTransaction().
"""
for b in blocks:
startTransaction(b, type=rim.Verify, check=checkEach, **kwargs)
def readBlocks(
blocks: Iterable[rim.Block],
checkEach: bool = False,
**kwargs: Any,
) -> None:
"""Read a list of blocks efficiently.
Helper function for reading a list of blocks. Allows a custom list of blocks
to be efficiently read without blocking between each read, similar to
``Device.readBlocks()``.
Parameters
----------
blocks : iterable[rim.Block]
Blocks to read.
checkEach : bool, optional (default = False)
Wait for completion of each block transation before initiating the next one
**kwargs : Any
Additional arguments passed through to startTransaction()
"""
for b in blocks:
startTransaction(b, type=rim.Read, check=checkEach, **kwargs)
def checkBlocks(blocks: Iterable[rim.Block], **kwargs: Any) -> None:
"""Wait on block transactions for a list of blocks.
Helper function for waiting on block transactions to complete.
Allows a custom list of blocks to be efficiently operated on
without blocking between each transaction, similar to ``Device.checkBlocks()``.
Parameters
----------
blocks : iterable[rim.Block]
Blocks to check.
**kwargs : Any
Unused
"""
for b in blocks:
checkTransaction(b)
def writeAndVerifyBlocks(
blocks: Iterable[rim.Block],
force: bool = False,
checkEach: bool = False,
index: int = -1,
**kwargs: Any,
) -> None:
"""Write and verify a list of blocks efficiently.
Helper function for writing and verifying a list of blocks. Allows a custom
list of blocks to be efficiently written and verified similar to
``Device.writeAndVerifyBlocks()``.
Parameters
----------
blocks : iterable[rim.Block]
Blocks to write and verify.
force : bool, optional (default = False)
Force the write even if values are unchanged.
checkEach : bool, optional (default = False)
Wait for each block transaction to complete before starting the next one
index : int, optional (default = -1)
Optional index for array variables.
**kwargs : Any
Additional arguments passed through to the block transaction.
"""
writeBlocks(blocks, force=force, checkEach=checkEach, index=index, **kwargs)
verifyBlocks(blocks, checkEach=checkEach, **kwargs)
checkBlocks(blocks)
def readAndCheckBlocks(
blocks: Iterable[rim.Block],
checkEach: bool = False,
**kwargs: Any,
) -> None:
"""Read blocks and wait for completion.
Helper function for reading a list of blocks. Allows a custom list of blocks
to be efficiently read without blocking between each read, similar to
``Device.readAndCheckBlocks()``.
Parameters
----------
blocks : iterable[rim.Block]
Blocks to read.
checkEach : bool, optional (default = False)
Wait for each block transaction to complete before starting the next one
**kwargs : Any
Additional arguments passed through to the block transaction.
"""
readBlocks(blocks, checkEach, **kwargs)
checkBlocks(blocks)
class MemoryError(Exception):
"""Raised when a memory access fails.
Parameters
----------
name : str
Name of the device or memory target.
address : int
Address where the error occurred.
msg : str, optional
Additional error message.
size : int, optional (default = 0)
Size of the attempted access.
"""
def __init__(
self,
*,
name: str,
address: int,
msg: str | None = None,
size: int = 0,
) -> None:
"""Initialize a memory-access exception message."""
self._value = f'Memory Error for {name} at address {address:#08x}'
if msg is not None:
self._value += " " + msg
def __str__(self) -> str:
"""Return the formatted exception string."""
return repr(self._value)
[docs]
class LocalBlock(object):
"""Back-end Block class used by LocalVariable.
Stores value in local memory.
Parameters
----------
variable : pr.LocalVariable
Variable associated with this block.
localSet : callable
Setter callback. Expected signature:
``localSet(value, dev=None, var=None, changed=None)``.
localGet : callable
Getter callback. Expected signature:
``localGet(dev=None, var=None)``.
minimum : object
Minimum allowed value (may be ``None``).
maximum : object
Maximum allowed value (may be ``None``).
value : object
Initial value stored in the block.
"""
def __init__(
self,
*,
variable: pr.LocalVariable,
localSet: Callable[[Any, pr.Device | None, pr.BaseVariable | None, bool | None], Any] | None,
localGet: Callable[[pr.Device | None, pr.BaseVariable | None], Any] | None,
minimum: Any | None,
maximum: Any | None,
value: Any,
) -> None:
"""Initialize the local block wrapper for a variable."""
self._path = variable.path
self._mode = variable.mode
self._device = variable.parent
self._localSet = localSet
self._localGet = localGet
self._minimum = minimum
self._maximum = maximum
self._variable = variable
self._variables = [variable] # Used by poller
self._value = value
self._lock = threading.RLock()
self._enable = True
# Setup logging
self._log = pr.logInit(cls=self,name=self._path)
# Wrap local functions
self._localSetWrap = pr.functionWrapper(function=self._localSet, callArgs=['dev', 'var', 'value', 'changed'])
self._localGetWrap = pr.functionWrapper(function=self._localGet, callArgs=['dev', 'var'])
def __repr__(self) -> str:
"""Return the block path representation."""
return repr(self._path)
@property
def path(self) -> str:
"""Return the block path."""
return self._path
@property
def mode(self) -> str:
"""Return the variable access mode. ('RW', 'RO', etc)."""
return self._mode
@property
def bulkOpEn(self) -> bool:
"""Return True to enable bulk operations."""
return True
[docs]
def forceStale(self) -> None:
"""Mark the block as stale (no-op for local blocks)."""
pass
[docs]
def setEnable(self, value: bool) -> None:
"""Enable or disable local access.
Parameters
----------
value : bool
True to enable local access, False to disable.
"""
with self._lock:
self._enable = value
def _setTimeout(self, value: Any) -> None:
"""Set the timeout (not used for local blocks).
Parameters
----------
value : object
Timeout value, currently ignored for local blocks.
"""
pass
@property
def variables(self) -> list:
"""Return the list of variables in this block."""
return self._variables
[docs]
def set(self, var: pr.LocalVariable, value: Any, index: int = -1) -> None:
"""Set the block value, optionally for an index.
Parameters
----------
var : pr.LocalVariable
Variable associated with this block.
value : object
Value to write.
index : int, optional (default = -1)
Index for array variables.
"""
with self._lock:
if index < 0 and (isinstance(value, list) or isinstance(value, dict)):
changed = True
elif index >= 0:
changed = self._value[index] != value
elif isinstance(value, np.ndarray):
changed = np.array_equal(self._value, value)
else:
if (self._minimum is not None and value < self._minimum) or \
(self._maximum is not None and value > self._maximum):
raise pr.VariableError(f'Value range error for {self._path}. Value={value}, Min={self._minimum}, Max={self._maximum}')
changed = self._value != value
# Ignore type check if value is accessed with an index
if index >= 0:
self._value[index] = value
else:
if not isinstance(value, var._nativeType):
self._log.warning( f'{var.path}: Expecting {var._nativeType}: Currently a warning but in the future this will be an error' )
#raise TypeError(f"Error - {var.path}: Expecting {var._nativeType} but got {type(value)}")
self._value = value
# If a setFunction exists, call it (Used by local variables)
if self._enable and self._localSet is not None:
self._localSetWrap(function=self._localSet, dev=self._device, var=self._variable, value=self._value, changed=changed)
[docs]
def get(self, var: pr.LocalVariable, index: int = -1) -> Any:
"""Get the block value, optionally for an index.
Parameters
----------
var : pr.LocalVariable
Variable associated with this block.
index : int, optional (default = -1)
Index for array variables.
Returns
-------
object
Current value.
"""
if self._enable and self._localGet is not None:
with self._lock:
self._value = self._localGetWrap(function=self._localGet, dev=self._device, var=self._variable)
if index >= 0:
return self._value[index]
else:
return self._value
def _startTransaction(
self,
type: Any,
forceWr: bool,
check: bool,
variable: pr.LocalVariable,
index: int,
) -> None:
"""Start a transaction (local blocks perform no hardware IO)."""
pass
def _checkTransaction(self) -> None:
"""Notify listeners of transaction completion"""
if self._enable and self._variable._updateNotify:
self._variable._queueUpdate()
def _iadd(self, other: Any) -> None:
"""In-place add helper for local blocks."""
with self._lock:
self.set(None, self.get(None) + other)
if self._enable:
self._variable._queueUpdate()
def _isub(self, other: Any) -> None:
"""In-place subtract helper for local blocks."""
with self._lock:
self.set(None, self.get(None) - other)
if self._enable:
self._variable._queueUpdate()
def _imul(self, other: Any) -> None:
"""In-place multiply helper for local blocks."""
with self._lock:
self.set(None, self.get(None) * other)
if self._enable:
self._variable._queueUpdate()
def _imatmul(self, other: Any) -> None:
"""In-place matrix multiply helper for local blocks."""
with self._lock:
self.set(None, self.get(None) @ other)
self._variable._queueUpdate()
def _itruediv(self, other: Any) -> None:
"""In-place true division helper for local blocks."""
with self._lock:
self.set(None, self.get(None) / other)
if self._enable:
self._variable._queueUpdate()
def _ifloordiv(self, other: Any) -> None:
"""In-place floor division helper for local blocks."""
with self._lock:
self.set(None, self.get(None) // other)
if self._enable:
self._variable._queueUpdate()
def _imod(self, other: Any) -> None:
"""In-place modulo helper for local blocks."""
with self._lock:
self.set(None, self.get(None) % other)
if self._enable:
self._variable._queueUpdate()
def _ipow(self, other: Any) -> None:
"""In-place power helper for local blocks."""
with self._lock:
self.set(None, self.get(None) ** other)
if self._enable:
self._variable._queueUpdate()
def _ilshift(self, other: Any) -> None:
"""In-place left shift helper for local blocks."""
with self._lock:
self.set(None, self.get(None) << other)
if self._enable:
self._variable._queueUpdate()
def _irshift(self, other: Any) -> None:
"""In-place right shift helper for local blocks."""
with self._lock:
self.set(None, self.get(None) >> other)
if self._enable:
self._variable._queueUpdate()
def _iand(self, other: Any) -> None:
"""In-place bitwise-and helper for local blocks."""
with self._lock:
self.set(None, self.get(None) & other)
if self._enable:
self._variable._queueUpdate()
def _ixor(self, other: Any) -> None:
"""In-place bitwise-xor helper for local blocks."""
with self._lock:
self.set(None, self.get(None) ^ other)
if self._enable:
self._variable._queueUpdate()
def _ior(self, other: Any) -> None:
"""In-place bitwise-or helper for local blocks."""
with self._lock:
self.set(None, self.get(None) | other)
if self._enable:
self._variable._queueUpdate()