Source code for pyrogue.protocols._Network
#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue protocols / Network wrappers
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import pyrogue as pr
import rogue.protocols.udp
import rogue.protocols.rssi
import rogue.protocols.packetizer
import time
import threading
from typing import Any
[docs]
class UdpRssiPack(pr.Device):
"""
UDP-based network device with RSSI and Packetizer protocols.
Combines UDP transport with reliable stream protocol (rUDP) and packetizer
for communication over network links.
Parameters
----------
port : int
UDP port number for the connection.
host : str, optional
Host address.
jumbo : bool, optional
Enable jumbo frames.
wait : bool, optional
Wait for link-up before finishing start.
packVer : int, optional
Packetizer version (1 or 2).
pollInterval : int, optional
Poll interval in seconds for status variables.
enSsi : bool, optional
Enable SSI in the packetizer.
server : bool, optional
Run as server instead of client.
**kwargs : Any
Additional arguments passed to :class:`pyrogue.Device`.
"""
def __init__(
self,
*,
port: int,
host: str = '127.0.0.1',
jumbo: bool = False,
wait: bool = True,
packVer: int = 1,
pollInterval: int = 1,
enSsi: bool = True,
server: bool = False,
**kwargs: Any,
) -> None:
super(self.__class__, self).__init__(**kwargs)
# Local copy host/port arg values
self._host = host
self._port = port
self._wait = wait
self._jumbo = jumbo
self._server = server
self._rxBufferConfigured = False
self._rxBufferThreadStop = False
# Check if running as server
if server:
self._udp = rogue.protocols.udp.Server(port,jumbo)
self._rssi = rogue.protocols.rssi.Server(self._udp.maxPayload()-8)
# Else running as client
else:
self._udp = rogue.protocols.udp.Client(host,port,jumbo)
self._rssi = rogue.protocols.rssi.Client(self._udp.maxPayload()-8)
# Check if Packeterizer Version 2: https://confluence.slac.stanford.edu/x/3nh4DQ
if packVer == 2:
self._pack = rogue.protocols.packetizer.CoreV2(False,True,enSsi) # ibCRC = False, obCRC = True
# Else using Packeterizer Version 1: https://confluence.slac.stanford.edu/x/1oyfD
else:
self._pack = rogue.protocols.packetizer.Core(enSsi)
# Connect the streams together
self._udp == self._rssi.transport()
self._rssi.application() == self._pack.transport()
# Set any user override of defaults
defaults = kwargs.get('defaults', {})
param_setters = {
'locMaxBuffers' : self._rssi.setLocMaxBuffers,
'locCumAckTout' : self._rssi.setLocCumAckTout,
'locRetranTout' : self._rssi.setLocRetranTout,
'locNullTout' : self._rssi.setLocNullTout,
'locMaxRetran' : self._rssi.setLocMaxRetran,
'locMaxCumAck' : self._rssi.setLocMaxCumAck,
}
for key, setter in param_setters.items():
if key in defaults:
setter(defaults[key])
# Add variables
self.add(pr.LocalVariable(
name = 'rssiOpen',
mode = 'RO',
value = False,
localGet = lambda: self._rssi.getOpen(),
pollInterval= pollInterval,
))
self.add(pr.LocalVariable(
name = 'rssiDownCount',
mode = 'RO',
value = 0,
typeStr = 'UInt32',
localGet = lambda: self._rssi.getDownCount(),
pollInterval= pollInterval,
))
self.add(pr.LocalVariable(
name = 'rssiDropCount',
mode = 'RO',
value = 0,
typeStr = 'UInt32',
localGet = lambda: self._rssi.getDropCount(),
pollInterval= pollInterval,
))
self.add(pr.LocalVariable(
name = 'rssiRetranCount',
mode = 'RO',
value = 0,
typeStr = 'UInt32',
localGet = lambda: self._rssi.getRetranCount(),
pollInterval= pollInterval,
))
self.add(pr.LocalVariable(
name = 'locBusy',
mode = 'RO',
value = True,
localGet = lambda: self._rssi.getLocBusy(),
hidden = True,
pollInterval= pollInterval,
))
self.add(pr.LocalVariable(
name = 'locBusyCnt',
mode = 'RO',
value = 0,
typeStr = 'UInt32',
localGet = lambda: self._rssi.getLocBusyCnt(),
pollInterval= pollInterval,
))
self.add(pr.LocalVariable(
name = 'remBusy',
mode = 'RO',
value = True,
localGet = lambda: self._rssi.getRemBusy(),
hidden = True,
pollInterval= pollInterval,
))
self.add(pr.LocalVariable(
name = 'remBusyCnt',
mode = 'RO',
value = 0,
typeStr = 'UInt32',
localGet = lambda: self._rssi.getRemBusyCnt(),
pollInterval= pollInterval,
))
self.add(pr.LocalVariable(
name = 'locTryPeriod',
mode = 'RW',
value = self._rssi.getLocTryPeriod(),
typeStr = 'UInt32',
localGet = lambda: self._rssi.getLocTryPeriod(),
localSet = lambda value: self._rssi.setLocTryPeriod(value)
))
self.add(pr.LocalVariable(
name = 'locMaxBuffers',
mode = 'RW',
value = self._rssi.getLocMaxBuffers(),
typeStr = 'UInt8',
localGet = lambda: self._rssi.getLocMaxBuffers(),
localSet = lambda value: self._rssi.setLocMaxBuffers(value)
))
self.add(pr.LocalVariable(
name = 'locMaxSegment',
mode = 'RW',
value = self._rssi.getLocMaxSegment(),
typeStr = 'UInt16',
localGet = lambda: self._rssi.getLocMaxSegment(),
localSet = lambda value: self._rssi.setLocMaxSegment(value)
))
self.add(pr.LocalVariable(
name = 'locCumAckTout',
mode = 'RW',
value = self._rssi.getLocCumAckTout(),
typeStr = 'UInt16',
localGet = lambda: self._rssi.getLocCumAckTout(),
localSet = lambda value: self._rssi.setLocCumAckTout(value)
))
self.add(pr.LocalVariable(
name = 'locRetranTout',
mode = 'RW',
value = self._rssi.getLocRetranTout(),
typeStr = 'UInt16',
localGet = lambda: self._rssi.getLocRetranTout(),
localSet = lambda value: self._rssi.setLocRetranTout(value)
))
self.add(pr.LocalVariable(
name = 'locNullTout',
mode = 'RW',
value = self._rssi.getLocNullTout(),
typeStr = 'UInt16',
localGet = lambda: self._rssi.getLocNullTout(),
localSet = lambda value: self._rssi.setLocNullTout(value)
))
self.add(pr.LocalVariable(
name = 'locMaxRetran',
mode = 'RW',
value = self._rssi.getLocMaxRetran(),
typeStr = 'UInt8',
localGet = lambda: self._rssi.getLocMaxRetran(),
localSet = lambda value: self._rssi.setLocMaxRetran(value)
))
self.add(pr.LocalVariable(
name = 'locMaxCumAck',
mode = 'RW',
value = self._rssi.getLocMaxCumAck(),
typeStr = 'UInt8',
localGet = lambda: self._rssi.getLocMaxCumAck(),
localSet = lambda value: self._rssi.setLocMaxCumAck(value)
))
self.add(pr.LocalVariable(
name = 'curMaxBuffers',
mode = 'RO',
value = 0,
typeStr = 'UInt8',
localGet = lambda: self._rssi.curMaxBuffers(),
pollInterval= pollInterval
))
self.add(pr.LocalVariable(
name = 'curMaxSegment',
mode = 'RO',
value = 0,
typeStr = 'UInt16',
localGet = lambda: self._rssi.curMaxSegment(),
pollInterval= pollInterval
))
self.add(pr.LocalVariable(
name = 'curCumAckTout',
mode = 'RO',
value = 0,
typeStr = 'UInt16',
localGet = lambda: self._rssi.curCumAckTout(),
pollInterval= pollInterval
))
self.add(pr.LocalVariable(
name = 'curRetranTout',
mode = 'RO',
value = 0,
typeStr = 'UInt16',
localGet = lambda: self._rssi.curRetranTout(),
pollInterval= pollInterval
))
self.add(pr.LocalVariable(
name = 'curNullTout',
mode = 'RO',
value = 0,
typeStr = 'UInt16',
localGet = lambda: self._rssi.curNullTout(),
pollInterval= pollInterval
))
self.add(pr.LocalVariable(
name = 'curMaxRetran',
mode = 'RO',
value = 0,
typeStr = 'UInt8',
localGet = lambda: self._rssi.curMaxRetran(),
pollInterval= pollInterval
))
self.add(pr.LocalVariable(
name = 'curMaxCumAck',
mode = 'RO',
value = 0,
typeStr = 'UInt8',
localGet = lambda: self._rssi.curMaxCumAck(),
pollInterval= pollInterval
))
self.add(pr.LocalCommand(
name = 'stop',
function = self._stop
))
self.add(pr.LocalCommand(
name = 'start',
function = lambda: self._rssi._start()
))
[docs]
def application(self, dest: int) -> rogue.protocols.packetizer.Application:
"""
Get the application interface for connecting to a destination.
Parameters
----------
dest : object
Destination protocol/device to connect to.
Returns
-------
object
Application interface for the packetizer.
"""
return self._pack.application(dest)
[docs]
def countReset(self) -> None:
"""Reset RSSI connection counters."""
self._rssi.resetCounters()
def _start(self) -> None:
"""Start RSSI/UDP transport and optionally wait for link-up."""
# Start the RSSI connection
self._rxBufferThreadStop = False
self._rssi._start()
if self._wait and not self._server:
curr = int(time.time())
last = curr
cnt = 0
while not self._rssi.getOpen():
time.sleep(.0001)
curr = int(time.time())
if last != curr:
last = curr
if self._jumbo:
cnt += 1
if cnt < 10:
self._log.warning("host=%s, port=%d -> Establishing link ..." % (self._host,self._port))
else:
self._log.warning('host=%s, port=%d -> Failing to connect using jumbo frames! Be sure to check interface MTU settings with ifconig -a' % (self._host,self._port))
# On the client side, getOpen() only returns after negotiation is
# complete, so curMaxBuffers() is stable here. On the server side,
# waiting in _start() can deadlock if both endpoints are managed in the
# same Root, so defer buffer programming until the link actually opens.
if self._server:
self._armServerRxBufferUpdate()
else:
self._applyNegotiatedRxBufferCount()
super()._start()
def _applyNegotiatedRxBufferCount(self) -> None:
"""Program UDP RX buffers from the negotiated RSSI max-buffer count."""
if self._rxBufferConfigured:
return
self._udp.setRxBufferCount(self._rssi.curMaxBuffers())
self._rxBufferConfigured = True
def _armServerRxBufferUpdate(self) -> None:
"""Wait for server-side RSSI open, then program the negotiated RX depth."""
self._rxBufferConfigured = False
def _wait_open() -> None:
while not self._rxBufferThreadStop and not self._rssi.getOpen():
time.sleep(0.0001)
if not self._rxBufferThreadStop and self._rssi.getOpen():
self._applyNegotiatedRxBufferCount()
threading.Thread(target=_wait_open, daemon=True).start()
def _stop(self) -> None:
"""Stop the UDP/RSSI connection and clean up resources."""
self._rxBufferThreadStop = True
self._rssi._stop()
self._rxBufferConfigured = False
# This Device may not necessarily be added to a tree
# So check if it has a parent first
if self.parent is not None:
self.rssiOpen.get()