Source code for pyrogue.protocols.gpib

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# Module to interface to GPIB devices
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------

# The following is required to use this Device:
#
# https://gist.github.com/ochococo/8362414fff28fa593bc8f368ba94d46a
#
# Install the gpib_cytpes python package from https://pypi.org/project/gpib-ctypes/
#    pip install gpib_ctypes

from gpib_ctypes import Gpib # pip install gpib_ctypes
import pyrogue
import rogue.interfaces.memory
import queue
import threading
from typing import Any


[docs] class GpibController(rogue.interfaces.memory.Slave): """ GPIB-based memory slave interface for SCPI-style instrument control. Translates memory-mapped read/write transactions into GPIB SCPI commands using variable 'key' attributes as command prefixes. Parameters ---------- gpibAddr : int GPIB primary address of the device. gpibBoard : int, optional GPIB interface board number. timeout : object, optional GPIB timeout constant. Common values include ``T1000s``, ``T300s``, ``T100s``, ``T30s``, ``T10s``, ``T3s``, ``T1s``, ``T300ms``, ``T100ms``, ``T30ms``, ``T10ms``, ``T3ms``, ``T1ms``, ``T300us``, ``T100us``, ``T30us``, ``T10us``, and ``TNONE``. """ def __init__( self, *, gpibAddr: int, gpibBoard: int = 0, timeout: Any = Gpib.gpib.T1s, ) -> None: super().__init__(1, 4096) self._log = pyrogue.logInit(cls=self, name=f'GPIB.{gpibBoard}.{gpibAddr}') self._gpib = Gpib.Gpib(gpibBoard,gpibAddr,0,timeout) self._workerQueue = queue.Queue() self._workerThread = threading.Thread(target=self._worker) self._workerThread.start() self._map = {} def _addVariable(self, var: pyrogue.RemoteVariable) -> None: """Register a variable for GPIB translation by offset.""" self._map[var.offset] = var def _stop(self) -> None: """Stop the worker thread.""" self._workerQueue.put(None) self._workerThread.join() def _doTransaction(self, transaction: rogue.interfaces.memory.Transaction) -> None: """Queue a memory transaction for the worker thread.""" self._workerQueue.put(transaction) def _worker(self) -> None: """Worker thread that processes GPIB transactions.""" while True: transaction = self._workerQueue.get() if transaction is None: return with transaction.lock(): # First lookup address addr = transaction.address() if addr not in self._map: transaction.error(f'Unknown address: {addr}') continue var = self._map[addr] byteSize = pyrogue.byteCount(var.base.bitSize) readLen = 1024 if byteSize < 1024 else byteSize+10 # Check transaction size if byteSize != transaction.size(): transaction.error(f'Transaction size mismatch: Got={transaction.size()}, Exp={byteSize}') continue # Write path if transaction.type() == rogue.interfaces.memory.Write: valBytes = bytearray(byteSize) transaction.getData(valBytes, 0) val = var.base.fromBytes(valBytes) send = var.getExtraAttribute('key') + " " + var.genDisp(val) self._log.debug(f"Write Sending {send}") self._gpib.write(send.encode('UTF-8')) transaction.done() # Read Path elif (transaction.type() == rogue.interfaces.memory.Read or transaction.type() == rogue.interfaces.memory.Verify): send = var.getExtraAttribute('key') + "?" self._log.debug(f"Read Sending {send}") self._gpib.write(send.encode('UTF-8')) valStr = self._gpib.read(readLen).decode('UTF-8').rstrip() self._log.debug(f"Read Got: {valStr}") val = var.parseDisp(valStr) valBytes = var.base.toBytes(val) transaction.setData(valBytes, 0) transaction.done() else: # Posted writes not supported (for now) transaction.error(f'Unsupported transaction type: {transaction.type()}')
[docs] class GpibDevice(pyrogue.Device): """ PyRogue Device wrapper for GPIB instruments. Provides a Device interface with automatic protocol and variable registration for GPIB-controlled instruments using SCPI-style commands. Parameters ---------- gpibAddr : int GPIB primary address of the instrument. gpibBoard : int, optional GPIB interface board number. timeout : int, optional Timeout value for GPIB operations. **kwargs : Any Additional arguments passed to :class:`pyrogue.Device`. """ def __init__( self, *, gpibAddr: int, gpibBoard: int = 0, timeout: int = 11, **kwargs: Any, ) -> None: self._gpib = GpibController(gpibAddr=gpibAddr, gpibBoard=gpibBoard, timeout=timeout) pyrogue.Device.__init__(self, memBase=self._gpib, **kwargs) self.addProtocol(self._gpib) self._nextAddr = 0 @property def nextAddr(self) -> int: """Get the next available address for variable registration.""" return self._nextAddr
[docs] def add(self, node: pyrogue.Node) -> None: """Add a node and register GPIB variables that have a 'key' attribute.""" super().add(node) if node.getExtraAttribute('key') is not None: self._gpib._addVariable(node) self._nextAddr += (node.offset + node.varBytes)