Source code for pyrogue.interfaces._OsCommandMemorySlave

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# OS Command Slave Module
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------

import pyrogue
import rogue.interfaces.stream
from typing import Any, Callable


[docs] class OsCommandMemorySlave(rogue.interfaces.memory.Slave): """Memory slave that maps transactions to Python functions via address-based commands. Parameters ---------- minWidth : int, optional Minimum access width in bytes. maxSize : int, optional Maximum transaction size in bytes. """ def __init__(self, *, minWidth: int = 4, maxSize: int = 1024) -> None: rogue.interfaces.memory.Slave.__init__(self,minWidth,maxSize) self._cmdList = {} def _doTransaction(self, transaction: rogue.interfaces.memory.Transaction) -> None: """ Execute a memory transaction as a command read or write. Parameters ---------- transaction : object Rogue memory transaction to process. """ address = transaction.address() size = transaction.size() type = transaction.type() if address not in self._cmdList: transaction.error(f"Transaction address {address:#08x} not found in os command list") return # Write if type == rogue.interfaces.memory.Write or type == rogue.interfaces.memory.Post: try: ba = bytearray(size) transaction.getData(ba,0) arg = self._cmdList[address]['base'].fromBytes(ba) self._cmdList[address]['func'](self,arg=arg) transaction.done() except Exception as msg: transaction.error(f"Transaction write error in command at {address:#08x}: {msg}") # Read else: try: ret = self._cmdList[address]['func'](self,arg=None) ba = self._cmdList[address]['base'].toBytes(ret) transaction.setData(ba,0) transaction.done() except Exception as msg: transaction.error(f"Transaction read error in command at {address:#08x}: {msg}")
[docs] def command(self, addr: int, base: pyrogue.Model) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Decorator to register a function as a command at the given address. Parameters ---------- addr : int Memory-mapped address for this command. base : pyrogue.Model Object with fromBytes/toBytes for argument serialization. Returns ------- callable Decorator that registers the wrapped function. """ def _decorator(func: Callable[..., Any]) -> Callable[..., Any]: """Register the wrapped function at ``addr`` and return it.""" if addr in self._cmdList: raise pyrogue.CommandError("Duplicate address in command generation") self._cmdList[addr] = {'base' : base, 'func' : func} return func return _decorator