#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Virtual Classes
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import os
import pyrogue as pr
import rogue.interfaces
import pickle
import time
import threading
from typing import Any, Callable
class VirtualProperty(object):
"""Descriptor for virtual properties that fetch values from a remote client.
Parameters
----------
node : VirtualNode
Virtual node instance owning this property.
attr : str
Attribute name to fetch from the remote.
"""
def __init__(self, node: "VirtualNode", attr: str) -> None:
self._attr = attr
self._node = node
def __get__(self, obj: Any = None, objtype: Any = None) -> Any:
"""Fetch attribute value from the remote client."""
return self._node._client._remoteAttr(self._node._path, self._attr)
def __set__(self, obj: Any, value: Any) -> None:
"""No-op setter; virtual properties are read-only."""
pass
class VirtualMethod(object):
"""Descriptor for virtual methods that invoke remote calls.
Parameters
----------
node : VirtualNode
Virtual node instance owning this method.
attr : str
Attribute name to invoke on the remote.
info : dict[str, Any]
Method metadata including ``args`` and ``kwargs``.
"""
def __init__(self, node: "VirtualNode", attr: str, info: dict[str, Any]) -> None:
self._attr = attr
self._node = node
self._args = info['args']
self._kwargs = info['kwargs']
def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Invoke the remote attribute with the given arguments."""
return self._node._client._remoteAttr(self._node._path, self._attr, *args, **kwargs)
def VirtualFactory(data: dict[str, Any]) -> Any:
"""Create a virtual class instance from serialized node data.
Parameters
----------
data : dict
Serialized node data including 'funcs', 'props', 'bases', 'class', etc.
Returns
-------
Any
Instantiated virtual node with dynamic methods and properties.
"""
def __init__(self, data: dict[str, Any]) -> None:
# Add dynamic methods
for k,v in data['funcs'].items():
setattr(self.__class__,k,VirtualMethod(self,k,v))
# Add dynamic properties
for k in data['props']:
setattr(self.__class__,k,VirtualProperty(self,k))
# Add __call__ if command or Process
if str(pr.BaseCommand) in data['bases'] or str(pr.Process) in data['bases']:
setattr(self.__class__,'__call__',self._call)
# Add getNode and addVarListener if root
if str(pr.Root) in data['bases']:
setattr(self.__class__,'getNode',self._getNode)
setattr(self.__class__,'addVarListener',self._addVarListener)
# Add addListener if Variable
if str(pr.BaseVariable) in data['bases']:
setattr(self.__class__,'addListener',self._addListener)
setattr(self.__class__,'delListener',self._delListener)
VirtualNode.__init__(self,data)
newclass = type('Virtual' + data['class'], (VirtualNode,), {"__init__": __init__})
return newclass(data)
class VirtualNode(pr.Node):
"""Virtual proxy node populated from remote tree metadata.
Parameters
----------
attrs : dict[str, Any]
Serialized node attributes including ``name``, ``description``,
``path``, ``class``, ``bases``, and ``nodes``.
"""
def __init__(self, attrs: dict[str, Any]) -> None:
super().__init__(name=attrs['name'],
description=attrs['description'],
expand=attrs['expand'],
groups=attrs['groups'],
guiGroup=attrs['guiGroup'])
self._path = attrs['path']
self._class = attrs['class']
self._nodes = attrs['nodes']
self._bases = attrs['bases']
# Tracking
self._parent = None
self._root = None
self._client = None
self._functions = []
self._loadLock = threading.Lock()
self._loaded = False
# Setup logging
self._log = pr.logInit(cls=self,name=self.name,path=self._path)
def addToGroup(self, group: str) -> None:
"""Not supported; raises NodeError."""
raise pr.NodeError('addToGroup not supported in VirtualNode')
def removeFromGroup(self, group: str) -> None:
"""Not supported; raises NodeError."""
raise pr.NodeError('removeFromGroup not supported in VirtualNode')
def add(self, node: "VirtualNode") -> None:
"""Not supported; raises NodeError."""
raise pr.NodeError('add not supported in VirtualNode')
def callRecursive(self, func: Callable[..., Any], nodeTypes: Any = None, **kwargs: Any) -> None:
"""Not supported; raises NodeError."""
raise pr.NodeError('callRecursive not supported in VirtualNode')
def __getattr__(self, name: str) -> Any:
"""Lazy-load children on first access, then resolve as a child attribute."""
# Double-checked locking: outer check skips the lock once loaded.
if not self._loaded:
with self._loadLock:
if not self._loaded:
self._loadNodes()
return pr.Node.__getattr__(self,name)
@property
def nodes(self) -> dict[str, Any]:
"""Child nodes keyed by name."""
if not self._loaded:
with self._loadLock:
if not self._loaded:
self._loadNodes()
return self._nodes
def node(self, name: str, load: bool = True) -> "VirtualNode | None":
"""Return child node by name, optionally loading children first.
Parameters
----------
name : str
Child node name.
load : bool, optional (default = True)
If True, load child nodes before lookup.
Returns
-------
Any
Child node or None if not found.
"""
if (not self._loaded) and load:
with self._loadLock:
if not self._loaded:
self._loadNodes()
if name in self._nodes:
return self._nodes[name]
else:
return None
def _call(self, *args: Any, **kwargs: Any) -> Any:
"""Invoke remote __call__ on this node."""
return self._client._remoteAttr(self._path, '__call__', *args, **kwargs)
def _addListener(self, listener: Callable[..., Any]) -> None:
"""Add a variable update listener."""
if listener not in self._functions:
self._functions.append(listener)
def _delListener(self, listener: Callable[..., Any]) -> None:
"""Remove a variable update listener."""
if listener in self._functions:
self._functions.remove(listener)
def _addVarListener(self, func: Callable[..., Any]) -> None:
"""Forward addVarListener to the client."""
self._client._addVarListener(func)
def _loadNodes(self) -> None:
"""Populate child nodes from remote metadata."""
for k,node in self._client._remoteAttr(self._path, 'nodes').items():
if k in self._nodes:
node._parent = self
node._root = self._root
node._client = self._client
self._nodes[k] = node
self._addArrayNode(node)
self._loaded = True
def _getNode(self, path: str, load: bool = True) -> "VirtualNode | None":
"""Resolve a dotted path to a node.
Parameters
----------
path : str
Dotted path (e.g. 'root.Child.GrandChild').
load : bool, optional (default = True)
If True, load child nodes as needed.
Returns
-------
Any
Node at path or None if not found.
"""
obj = self
if '.' in path:
lst = path.split('.')
if lst[0] != self.name and lst[0] != 'root':
return None
for a in lst[1:]:
if not hasattr(obj,'node'):
return None
obj = obj.node(a,load)
elif path != self.name and path != 'root':
return None
return obj
def isinstance(self, typ: type["VirtualNode"]) -> bool:
"""Check if this node's type string matches the given type."""
cs = str(typ)
return cs in self._bases
def _rootAttached(self, parent: "VirtualNode", root: "VirtualNode") -> None:
"""Not supported; raises NodeError."""
raise pr.NodeError('_rootAttached not supported in VirtualNode')
def _getDict(self, modes: list[str]) -> Any:
"""Not supported; raises NodeError."""
raise pr.NodeError('_getDict not supported in VirtualNode')
def _setDict(self, *args: Any, **kwargs: Any) -> None:
"""Not supported; raises NodeError."""
raise pr.NodeError('_setDict not supported in VirtualNode')
def printYaml(
self,
readFirst: bool = False,
modes: list[str] = ['RW', 'RO', 'WO'],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = ['Hidden'],
recurse: bool = False,
) -> None:
"""Print remote YAML with selected access modes.
Parameters
----------
readFirst : bool, optional (default = False)
If ``True``, perform a read before generating YAML output.
modes : list['RW' | 'WO' | 'RO'], optional (default = ['RW','RO','WO'])
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
recurse : bool, optional (default = False)
If ``True``, include child nodes recursively.
"""
print(self.getYaml(readFirst=readFirst, modes=modes, incGroups=incGroups, excGroups=excGroups, recurse=recurse))
def _doUpdate(self, val: Any) -> None:
"""Notify listeners of a value update."""
for func in self._functions:
func(self.path,val)
[docs]
class VirtualClient(rogue.interfaces.ZmqClient):
"""
A full featured client interface for Rogue. This can be used in
scripts or other clients which require remote access to the running
Rogue instance.
This class ues a custom factory ensuring that only one instance of this
class is created in a python script for a given remote connection.
Parameters
----------
addr : str
host address
port : int
host port
linkTimeout : float, optional
Idle timeout in seconds before the client marks the link down when no
publish updates or successful replies have been observed. This is the
primary operational timeout knob for both hardware and simulation
applications.
requestStallTimeout : float | None, optional
Optional policy timeout for how long a single in-flight request may
remain outstanding before the client declares the connection stalled.
This is disabled by default and is typically only useful when a
deployment has a well-defined upper bound for legitimate request time.
Attributes
----------
linked: bool
link state
root: obj
root class reference
"""
ClientCache = {}
@staticmethod
def _defaultLinkTimeout() -> float:
"""Return the default idle link timeout in seconds."""
return float(os.getenv("ROGUE_VIRTUAL_LINK_TIMEOUT", "10.0"))
@staticmethod
def _defaultRequestStallTimeout() -> float | None:
"""Return the default request-stall timeout in seconds, if configured."""
value = os.getenv("ROGUE_VIRTUAL_REQUEST_STALL_TIMEOUT")
if value is None or value == "":
return None
stall = float(value)
return None if stall <= 0 else stall
@staticmethod
def _defaultConnectTimeout() -> float:
"""Return the startup handshake timeout in seconds."""
return max(0.1, float(os.getenv("ROGUE_VIRTUAL_CONNECT_TIMEOUT", "5.0")))
def __new__(
cls: type["VirtualClient"],
addr: str = "localhost",
port: int = 9099,
linkTimeout: float | None = None,
requestStallTimeout: float | None = None,
) -> "VirtualClient":
"""Return cached client instances keyed by ``(addr, port)``.
Timeout arguments configure the cached instance for that server. If a
client already exists for the same ``(addr, port)``, new timeout values
are applied to the shared instance.
"""
newHash = hash((addr, port))
if newHash in cls.ClientCache:
return VirtualClient.ClientCache[newHash]
else:
return super(VirtualClient, cls).__new__(cls, addr, port)
def __init__(
self,
addr: str = "localhost",
port: int = 9099,
linkTimeout: float | None = None,
requestStallTimeout: float | None = None,
) -> None:
"""Create or reconfigure a cached virtual client instance.
Parameters
----------
addr : str, optional
Server host name or address.
port : int, optional
Base ZMQ server port.
linkTimeout : float | None, optional
Idle timeout in seconds before the link is considered down. If not
provided, the default is 10 seconds or the value from
``ROGUE_VIRTUAL_LINK_TIMEOUT``.
requestStallTimeout : float | None, optional
Optional maximum age of an in-flight request before the client
treats it as stalled. ``None`` or non-positive values disable this
policy. In practice this is usually left disabled unless a system
has a known upper bound for valid request duration.
Raises
------
ConnectionError
If the bootstrap ``__ROOT__`` handshake fails within the window
defined by ``ROGUE_VIRTUAL_CONNECT_TIMEOUT`` (default 5 s).
Causes include the server process not running, an address/port
mismatch, or ports blocked by firewall. The handshake failure
is preserved via standard exception chaining: the immediate
``__cause__`` is the wrapping ``Exception`` raised by
``_remoteAttr``, and its own ``__cause__`` is the underlying
transport error (typically a ``rogue.GeneralError`` from
``ZmqClient::send``). Sockets and the ZMQ context are torn
down before the raise, so no partially-initialised instance
is left in the singleton cache.
"""
if getattr(self, "_vcInitialized", False):
if linkTimeout is not None or requestStallTimeout is not None:
self.setTimeoutConfig(linkTimeout=linkTimeout, requestStallTimeout=requestStallTimeout)
return
self._cacheKey = hash((addr, port))
self._monEnable = False
self._monThread = None
self._vcInitialized = True
VirtualClient.ClientCache[self._cacheKey] = self
# ZmqClient.__init__ spawns the SUB thread, so every attribute its
# callbacks (_doUpdate / _requestDone / _checkLinkState) can touch must
# exist before the base ctor runs.
self._varListeners = []
self._monitors = []
self._root = None
self._link = False
self._ltime = time.time()
self._reqLock = threading.Lock()
self._reqCount = 0
self._reqSince = None
self._linkTimeout = self._defaultLinkTimeout()
self._requestStallTimeout = self._defaultRequestStallTimeout()
self.setTimeoutConfig(linkTimeout=linkTimeout, requestStallTimeout=requestStallTimeout)
try:
rogue.interfaces.ZmqClient.__init__(self,addr,port,False)
except Exception:
self._removeFromCache()
self._vcInitialized = False
raise
# Setup logging
self._log = pr.logInit(cls=self,name="VirtualClient",path=None)
# Get root name as a connection test
self.setTimeout(1000,False)
try:
self._root = self._waitForRoot()
except Exception as exc:
# Tear down sockets/context before raising so the caller cannot
# inadvertently use a half-initialised instance. Pre-fix this
# path printed the banner and returned normally, leaving the
# caller with a dead VirtualClient whose first _remoteAttr
# crashed in ZmqClient::send (issue #1234).
self._cleanupFailedInit()
raise ConnectionError(
f"Failed to connect to {addr}:{port}. "
"Possible causes: ZeroMQ Server not included in the root "
"class; Root process not running; client/server address "
"mismatch; client/server port mismatch; server ports "
"blocked by firewall."
) from exc
print("Connected to {} at {}:{}".format(self._root.name,addr,port))
self.setTimeout(1000,True)
self._root._parent = self._root
self._root._root = self._root
self._root._client = self
setattr(self,self._root.name,self._root)
# Link tracking
self._link = True
self._ltime = self._root.Time.value()
# Create monitoring thread. daemon=True so the interpreter can exit
# cleanly even when ClientCache still pins this instance (issue #1238);
# without it, an ipython/host script that forgot to call stop() hangs
# on quit().
self._monEnable = True
self._monThread = threading.Thread(target=self._monWorker, daemon=True)
self._monThread.start()
def _removeFromCache(self) -> None:
"""Remove this client from the shared cache when it is no longer valid."""
if getattr(self, "_cacheKey", None) in VirtualClient.ClientCache and VirtualClient.ClientCache[self._cacheKey] is self:
del VirtualClient.ClientCache[self._cacheKey]
def _cleanupFailedInit(self) -> None:
"""Release transport resources after a failed bootstrap handshake."""
self._monEnable = False
self._removeFromCache()
self._vcInitialized = False
try:
rogue.interfaces.ZmqClient._stop(self)
except Exception:
pass
def _waitForRoot(self) -> "VirtualNode":
"""Retry the initial ``__ROOT__`` handshake for a bounded startup window."""
deadline = time.monotonic() + self._defaultConnectTimeout()
last_error = None
while time.monotonic() < deadline:
try:
return self._remoteAttr('__ROOT__',None)
except Exception as exc:
last_error = exc
if last_error is not None:
raise last_error
raise Exception("Timed out waiting for initial root handshake")
[docs]
def addLinkMonitor(self, function: Callable[[bool], None]) -> None:
"""
Add a link monitor callback function. This function will be called
any time the link state changes. A single boolean argument will be passed to
the callback function containing the current link state.
Parameters
----------
function : callable
Callback function with the form ``function(linkState: bool)``.
"""
if function not in self._monitors:
self._monitors.append(function)
[docs]
def remLinkMonitor(self, function: Callable[[bool], None]) -> None:
"""
Remove a previously added link monitor function.
Parameters
----------
function : callable
Previously registered callback function.
"""
if function in self._monitors:
self._monitors.remove(function)
@property
def linked(self) -> bool:
"""Whether the client is currently linked to the server."""
with self._reqLock:
return self._link
@property
def linkTimeout(self) -> float:
"""Idle timeout in seconds before the link is considered down."""
return self._linkTimeout
@property
def requestStallTimeout(self) -> float | None:
"""Optional maximum in-flight request age before it is treated as stalled."""
return self._requestStallTimeout
[docs]
def setTimeoutConfig(
self,
*,
linkTimeout: float | None = None,
requestStallTimeout: float | None = None,
) -> None:
"""Update link and request-stall timeout settings.
``linkTimeout`` is the normal tuning knob for clients that need to
tolerate longer busy periods. ``requestStallTimeout`` is an optional
policy threshold and is most useful only when a deployment has a clear,
finite upper bound for legitimate request duration.
"""
if linkTimeout is not None:
self._linkTimeout = float(linkTimeout)
if requestStallTimeout is not None:
stall = float(requestStallTimeout)
self._requestStallTimeout = None if stall <= 0 else stall
def _monWorker(self) -> None:
"""Monitor link heartbeat and emit link-state callbacks."""
while self._monEnable:
time.sleep(1)
self._checkLinkState()
def _requestStart(self) -> None:
"""Record that a request/reply transaction is in flight."""
with self._reqLock:
if self._reqCount == 0:
self._reqSince = time.time()
self._reqCount += 1
def _requestDone(self, success: bool) -> None:
"""Record request completion and treat successful replies as activity."""
# Defer monitor callbacks until after _reqLock is released: a re-entrant
# callback would deadlock, and a slow one would stall other requests.
notify = False
with self._reqLock:
self._reqCount = max(0, self._reqCount - 1)
if self._reqCount == 0:
self._reqSince = None
if success:
self._ltime = time.time()
if not self._link:
self._link = True
notify = True
# _waitForRoot triggers the first _requestDone(True) before __init__
# assigns self._root; suppress the notify in that window.
if notify and self._root is not None:
self._log.warning(f"I have finally heard from {self._root.name}. All is good!")
for mon in self._monitors:
mon(True)
def _checkLinkState(self) -> None:
"""Update link state from recent activity while tolerating busy requests."""
# Snapshot all link state under one lock acquisition so the decision
# below is coherent; emit logs and monitor callbacks after releasing.
log_message = None
notify_link: bool | None = None
with self._reqLock:
now = time.time()
delta = now - self._ltime
link = self._link
reqPending = self._reqCount > 0
reqAge = (now - self._reqSince) if (reqPending and self._reqSince is not None) else None
new_link = link
if link and delta > self._linkTimeout:
if reqPending:
if (self._requestStallTimeout is None
or reqAge is None
or reqAge <= self._requestStallTimeout):
return
new_link = False
log_message = (
f"Request has been pending for {reqAge:0.1f} seconds without updates. "
f"Declaring link to {self._root.name} stalled."
)
else:
new_link = False
log_message = (
f"I have not heard from {self._root.name} in {self._linkTimeout:0.1f} seconds. "
"It may be busy, continuing to wait..."
)
elif (not link) and delta < self._linkTimeout:
new_link = True
log_message = f"I have finally heard from {self._root.name}. All is good!"
if new_link != link:
self._link = new_link
notify_link = new_link
if log_message is not None:
self._log.warning(log_message)
if notify_link is not None:
for mon in self._monitors:
mon(notify_link)
def _remoteAttr(self, path: str, attr: str | None, *args: Any, **kwargs: Any) -> Any:
"""Invoke a remote attribute on the server."""
# Liveness guard: fail fast on a dead instance (stop() was called or
# a prior init failed). Without this guard the call slips through to
# ZmqClient::send on a torn-down socket and surfaces as the
# "zmq_sendmsg failed" crash reported in issue #1234.
if not getattr(self, "_vcInitialized", False):
raise RuntimeError(
"VirtualClient is not connected: __init__ failed or stop() "
"was called. Construct a new VirtualClient before issuing "
"remote attribute calls."
)
self._requestStart()
try:
ret = pickle.loads(self._send(pickle.dumps({ 'path':path, 'attr':attr, 'args':args, 'kwargs':kwargs })))
self._requestDone(True)
except Exception as e:
self._requestDone(False)
# Chain explicitly so the underlying transport error (e.g. the
# rogue.GeneralError from ZmqClient::send) is reachable via the
# standard __cause__ chain from any wrapping exception higher up
# the stack (notably the ConnectionError raised by __init__).
raise Exception(f"ZMQ Interface Exception: {e}") from e
if isinstance(ret,Exception):
raise ret
return ret
def _addVarListener(self, func: Callable[..., Any]) -> None:
"""Register a variable update listener."""
if func not in self._varListeners:
self._varListeners.append(func)
def _doUpdate(self, data: bytes) -> None:
"""Process variable update data from the server."""
# Publishes count as link activity; mutate _ltime under _reqLock.
with self._reqLock:
self._ltime = time.time()
if self._root is None:
return
d = pickle.loads(data)
for k,val in d.items():
n = self._root.getNode(k,False)
if n is not None:
n._doUpdate(val)
# Call listener functions,
for func in self._varListeners:
func(k,val)
[docs]
def stopMonitor(self) -> None:
"""Stop the link-monitor thread without releasing ZMQ resources.
Use this when a long-lived client must drop link-heartbeat polling
but the ZMQ transport must remain open. The intended caller is a
host script that hands a ``VirtualClient`` to an interactive
wrapper (for example ``pysmurf``) and wants to silence the monitor
on shutdown without losing the connection. ``stop()`` is the
wrong choice for that case because it also closes the C++
``ZmqClient`` sockets and removes the instance from
``ClientCache``.
After ``stopMonitor()`` returns the client still services
``_remoteAttr`` calls and SUB publish updates; only the periodic
link-state heartbeat is disabled. Idempotent; safe to call from
any thread other than ``_monThread`` itself.
"""
self._monEnable = False
thr = self._monThread
# is_alive() guards against join() on a Thread that was never started.
if (thr is not None
and hasattr(thr, 'is_alive') and thr.is_alive()
and hasattr(thr, 'join')
and threading.current_thread() is not thr):
thr.join(timeout=3.0)
if thr.is_alive():
self._log.warning("Monitor thread did not stop within timeout")
[docs]
def stop(self) -> None:
"""Stop the monitor thread and release the underlying ZMQ resources.
After ``stop()`` returns the C++ sockets and ZMQ context are released
and this instance is removed from ``ClientCache``; a subsequent
``VirtualClient(addr, port)`` therefore constructs a fresh, fully
connected instance instead of returning the torn-down one.
"""
self.stopMonitor()
# ClientCache pins this instance, so the C++ stop() must run explicitly
# to release SUB/REQ sockets and the ZMQ context. _stop() is idempotent.
try:
rogue.interfaces.ZmqClient._stop(self)
except Exception:
pass
# Drop the cache entry so a follow-on VirtualClient(addr, port) gets
# a fresh instance instead of this torn-down one.
self._removeFromCache()
self._vcInitialized = False
@property
def root(self) -> "VirtualNode":
"""Return the connected virtual root node."""
return self._root
def __hash__(self) -> int:
"""Hash based on host and port."""
return hash((self._host, self._port))
def __eq__(self, other: "VirtualClient") -> bool:
"""Compare by host and port."""
return (self.host, self.port) == (other._host, other._port)
def __ne__(self, other: "VirtualClient") -> bool:
"""Compare by host and port."""
return not (self == other)
def __enter__(self) -> "VirtualClient":
"""Return self for context-manager use."""
return self
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
"""Stop monitoring when leaving context-manager scope."""
self.stop()