Source code for pyrogue.protocols._uart

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
#       PyRogue protocols / UART register protocol
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------

import rogue.interfaces.memory
import pyrogue
import serial
import queue
import threading
from typing import Any


[docs] class UartMemory(rogue.interfaces.memory.Slave): """ UART-based memory slave interface for register access over serial. Implements a simple text-based protocol for reading and writing 32-bit register values over a serial UART connection. Parameters ---------- device : str Serial device path (for example ``/dev/ttyUSB0`` or ``COM1``). baud : int Baud rate for serial communication. timeout : float, optional Read timeout in seconds. **kwargs : Any Additional keyword arguments passed to :class:`serial.Serial`. """ def __init__( self, device: str, baud: int, timeout: float = 1, **kwargs: Any, ) -> None: super().__init__(4,4096) # Set min and max size to 4 bytes self._log = pyrogue.logInit(cls=self, name=f'{device}') self.serialPort = serial.Serial(device, baud, timeout=timeout, **kwargs) self._workerQueue = queue.Queue() self._workerThread = threading.Thread(target=self._worker) self._workerThread.start() def _stop(self) -> None: """Stop the worker thread and close the serial port.""" self._workerQueue.put(None) self._workerQueue.join() self.serialPort.close() def __enter__(self) -> "UartMemory": """Context manager entry.""" return self def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: """Context manager exit; stops the interface.""" self._stop()
[docs] def readline(self) -> str: """ Read a line from the serial port until newline or carriage return. Returns ------- str Line read from the serial port (empty on timeout). """ line = [] while True: ch = self.serialPort.read().decode('ASCII') # Break and return if timeout occurs if len(ch) == 0: break line.append(ch) if ch == '\n' or ch == '\r': break return ''.join(line)
def _doTransaction(self, transaction: rogue.interfaces.memory.Transaction) -> None: """ Queue a memory transaction for processing by the worker thread. Parameters ---------- transaction : object Memory transaction to execute. """ self._workerQueue.put(transaction) def _doWrite(self, transaction: rogue.interfaces.memory.Transaction) -> None: """Execute a write transaction over the UART register protocol.""" address = transaction.address() dataBa = bytearray(transaction.size()) transaction.getData(dataBa, 0) dataWords = [int.from_bytes(dataBa[i:i+4], 'little', signed=False) for i in range(0, len(dataBa), 4)] # Need to issue a UART command for each 32 bit word for i, (addr, data) in enumerate(zip(range(address, address+len(dataBa), 4), dataWords)): sendString = f'w {addr:08x} {data:08x} \n'.encode('ASCII') # self._log.debug(f'Sending write transaction part {i}: {repr(sendString)}') self.serialPort.write(sendString) response = self.readline() #self.serialPort.readline().decode('ASCII') # If response is empty, a timeout occurred if len(response) == 0: transaction.error(f'Empty transaction response (likely timeout) for transaction part {i}: {repr(sendString)}') return # parse the response string parts = response.split() # Check for correct response if (len(parts) != 4 or parts[0].lower() != 'w' or int(parts[1], 16) != addr or int(parts[2],16) != data): transaction.error(f'Malformed response for part {i}: {repr(response)} to transaction: {repr(sendString)}') return # else: # self._log.debug(f'Transaction part {i}: {repr(sendString)} completed successfully') # Check response code resp = int(parts[3],16) if resp != 0: transaction.error(f"Non zero status message returned on axi bus in hardware: {resp:#x}") return transaction.done() def _doRead(self, transaction: rogue.interfaces.memory.Transaction) -> None: """Execute a read transaction over the UART register protocol.""" address = transaction.address() size = transaction.size() for i, addr in enumerate(range(address, address+size, 4)): sendString = f'r {addr:08x} \n'.encode('ASCII') # self._log.debug(f'Sending read transaction part {i}: {repr(sendString)}') self.serialPort.write(sendString) response = self.readline() #self.serialPort.readline().decode('ASCII') # If response is empty, a timeout occurred if len(response) == 0: transaction.error(f'Empty transaction response (likely timeout) for transaction part {i}: {repr(sendString)}') return # parse the response string parts = response.split() if (len(parts) != 4 or parts[0].lower() != 'r' or int(parts[1], 16) != addr): transaction.error(f'Malformed response part {i}: {repr(response)} to transaction: {repr(sendString)}') else: dataInt = int(parts[2], 16) rdData = bytearray(dataInt.to_bytes(4, 'little', signed=False)) transaction.setData(rdData, i*4) # self._log.debug(f'Transaction part {i}: {repr(sendString)} with response data: {dataInt:#08x} completed successfully') # Check response code resp = int(parts[3],16) if resp != 0: transaction.error(f"Non zero status message returned on axi bus in hardware: {resp:#x}") return transaction.done() def _worker(self) -> None: """Worker thread that processes queued memory transactions.""" while True: transaction = self._workerQueue.get() if transaction is None: break with transaction.lock(): # Write path if transaction.type() == rogue.interfaces.memory.Write: self._doWrite(transaction) elif (transaction.type() == rogue.interfaces.memory.Read or transaction.type() == rogue.interfaces.memory.Verify): self._doRead(transaction) else: # Posted writes not supported (for now) transaction.error(f'Unsupported transaction type: {transaction.type()}')