Source code for pyrogue.interfaces.simulation

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# Module containing simulation support classes and routines
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------

import threading
import pyrogue
import rogue.interfaces.stream
import zmq
from typing import Any, Callable


[docs] class SideBandSim(): """Sideband simulator using paired ZMQ sockets. Parameters ---------- host : str Hostname or IP address. port : int Base sideband port. """ def __init__(self, host: str, port: int) -> None: self._log = pyrogue.logInit(cls=self, name=f'{host}.{port}') self._ctx = zmq.Context() self._sbPush = self._ctx.socket(zmq.PUSH) self._sbPush.connect(f"tcp://{host}:{port+1}") self._sbPull = self._ctx.socket(zmq.PULL) self._sbPull.connect(f"tcp://{host}:{port}") self._log.info("Connected to port {} on host {}".format(port,host)) self._recvCb = self._defaultRecvCb self._lock = threading.Lock() self._run = True self._recvThread = threading.Thread(target=self._recvWorker) self._recvThread.start() def _defaultRecvCb(self, opCode: int | None, remData: int | None) -> None: """Default callback that prints received opCode and remData.""" if opCode is not None: print(f'Received opCode: {opCode:02x}') if remData is not None: print(f'Received remData: {remData:02x}')
[docs] def setRecvCb(self, cbFunc: Callable[[int | None, int | None], Any]) -> None: """Set the callback for received sideband data. Parameters ---------- cbFunc : callable Callback of the form ``cbFunc(opCode, remData)``. """ self._recvCb = cbFunc
[docs] def send(self, opCode: int | None = None, remData: int | None = None) -> None: """Send opCode and/or remData on the sideband channel. Parameters ---------- opCode : int, optional Operation code to send. remData : int, optional Remote data byte to send. """ ba = bytearray(4) if opCode is not None: ba[0] = 0x01 ba[1] = opCode if remData is not None: ba[2] = 0x01 ba[3] = remData self._sbPush.send(ba) self._log.debug(f'Sent opCode: {opCode} remData: {remData}')
def _stop(self) -> None: """Stop the receive thread.""" with self._lock: self._log.debug('Stopping receive thread') self._run = False def __enter__(self) -> "SideBandSim": """Return self for context-manager use.""" return self def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: """Stop receive worker on context-manager exit.""" self._stop() def _recvWorker(self) -> None: """Worker thread that receives sideband data and invokes the callback.""" while True: # Exit thread when stop() called with self._lock: if self._run is False: self._log.debug('Exiting receive thread') return # Wait for new data socks, x, y = zmq.select([self._sbPull], [], [], 1.0) if self._sbPull in socks: ba = self._sbPull.recv() if len(ba) != 4: self._log.error(f'Got bad size frame: {ba} size: {len(ba)}') # Got normal data opCode = None remData = None if ba[0] == 0x01: opCode = ba[1] if ba[2] == 0x01: remData = ba[3] self._log.debug(f'Received opCode: {opCode}, remData {remData}') self._recvCb(opCode, remData)
[docs] class Pgp2bSim(): """Simulated PGP2B with virtual channels and sideband. Parameters ---------- vcCount : int Number of virtual channels. host : str Host address. port : int Base port number. """ def __init__(self, vcCount: int, host: str, port: int) -> None: # virtual channels self.vc = [rogue.interfaces.stream.TcpClient(host, p) for p in range(port, port+(vcCount*2), 2)] # sideband self.sb = SideBandSim(host, port+8) def _stop(self) -> None: """Stop the sideband simulation.""" self.sb._stop() def __enter__(self) -> "Pgp2bSim": """Return self for context-manager use.""" return self def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: """Stop simulator resources on context-manager exit.""" self._stop()
def connectPgp2bSim(pgpA: "Pgp2bSim", pgpB: "Pgp2bSim") -> None: """Connect two PGP2B simulators bidirectionally. Parameters ---------- pgpA : Pgp2bSim First PGP2B simulator. pgpB : Pgp2bSim Second PGP2B simulator. """ for a,b in zip(pgpA.vc, pgpB.vc): pyrogue.streamConnectBiDir(a, b) pgpA.sb.setRecvCb(pgpB.sb.send) pgpB.sb.setRecvCb(pgpA.sb.send)
[docs] class MemEmulate(rogue.interfaces.memory.Slave): """In-memory emulation of a memory slave device. Parameters ---------- minWidth : int, optional Minimum access width in bytes. maxSize : int, optional Maximum transaction size in bytes. dropCount : int, optional Number of transactions to drop before accepting (for testing). """ def __init__( self, *, minWidth: int = 4, maxSize: int = 0xFFFFFFFF, dropCount: int = 0, ) -> None: rogue.interfaces.memory.Slave.__init__(self,4,4) self._minWidth = minWidth self._maxSize = maxSize self._data = {} self._cb = {} self._count = 0 self._dropCount = dropCount def _checkRange(self, address: int, size: int) -> int: """Check if address range is valid. Returns 0 for valid.""" return 0 def _doMaxAccess(self) -> int: """Return maximum access size.""" return self._maxSize def _doMinAccess(self) -> int: """Return minimum access width.""" return self._minWidth def _doTransaction(self, transaction: Any) -> None: """Process a memory read or write transaction.""" address = transaction.address() size = transaction.size() type = transaction.type() if self._count < self._dropCount: self._count += 1 return self._count = 0 if (address % self._minWidth) != 0: transaction.error("Transaction address {address:#x} is not aligned to min width {self._minWidth:#x}") return elif size > self._maxSize: transaction.error("Transaction size {size} exceeds max {self._maxSize}") return for i in range (0, size): if not (address+i) in self._data: self._data[address+i] = 0 ba = bytearray(size) if type == rogue.interfaces.memory.Write or type == rogue.interfaces.memory.Post: transaction.getData(ba,0) for i in range(0, size): self._data[address+i] = ba[i] transaction.done() else: for i in range(0, size): ba[i] = self._data[address+i] transaction.setData(ba,0) transaction.done()