Source code for pyrogue.interfaces._Virtual

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue base module - Virtual Classes
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------

import os
import pyrogue as pr
import rogue.interfaces
import pickle
import time
import threading
from typing import Any, Callable


class VirtualProperty(object):
    """Descriptor for virtual properties that fetch values from a remote client.

    Parameters
    ----------
    node : VirtualNode
        Virtual node instance owning this property.
    attr : str
        Attribute name to fetch from the remote.
    """

    def __init__(self, node: "VirtualNode", attr: str) -> None:
        self._attr = attr
        self._node = node

    def __get__(self, obj: Any = None, objtype: Any = None) -> Any:
        """Fetch attribute value from the remote client."""
        return self._node._client._remoteAttr(self._node._path, self._attr)

    def __set__(self, obj: Any, value: Any) -> None:
        """No-op setter; virtual properties are read-only."""
        pass


class VirtualMethod(object):
    """Descriptor for virtual methods that invoke remote calls.

    Parameters
    ----------
    node : VirtualNode
        Virtual node instance owning this method.
    attr : str
        Attribute name to invoke on the remote.
    info : dict[str, Any]
        Method metadata including ``args`` and ``kwargs``.
    """

    def __init__(self, node: "VirtualNode", attr: str, info: dict[str, Any]) -> None:
        self._attr   = attr
        self._node   = node
        self._args   = info['args']
        self._kwargs = info['kwargs']

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """Invoke the remote attribute with the given arguments."""
        return self._node._client._remoteAttr(self._node._path, self._attr, *args, **kwargs)


def VirtualFactory(data: dict[str, Any]) -> Any:
    """Create a virtual class instance from serialized node data.

    Parameters
    ----------
    data : dict
        Serialized node data including 'funcs', 'props', 'bases', 'class', etc.

    Returns
    -------
    Any
        Instantiated virtual node with dynamic methods and properties.
    """

    def __init__(self, data: dict[str, Any]) -> None:
        # Add dynamic methods
        for k,v in data['funcs'].items():
            setattr(self.__class__,k,VirtualMethod(self,k,v))

        # Add dynamic properties
        for k in data['props']:
            setattr(self.__class__,k,VirtualProperty(self,k))

        # Add __call__ if command or Process
        if str(pr.BaseCommand) in data['bases'] or str(pr.Process) in data['bases']:
            setattr(self.__class__,'__call__',self._call)

        # Add getNode and addVarListener if root
        if str(pr.Root) in data['bases']:
            setattr(self.__class__,'getNode',self._getNode)
            setattr(self.__class__,'addVarListener',self._addVarListener)

        # Add addListener if Variable
        if str(pr.BaseVariable) in data['bases']:
            setattr(self.__class__,'addListener',self._addListener)
            setattr(self.__class__,'delListener',self._delListener)

        VirtualNode.__init__(self,data)

    newclass = type('Virtual' + data['class'], (VirtualNode,), {"__init__": __init__})
    return newclass(data)


class VirtualNode(pr.Node):
    """Virtual proxy node populated from remote tree metadata.

    Parameters
    ----------
    attrs : dict[str, Any]
        Serialized node attributes including ``name``, ``description``,
        ``path``, ``class``, ``bases``, and ``nodes``.
    """

    def __init__(self, attrs: dict[str, Any]) -> None:
        super().__init__(name=attrs['name'],
                         description=attrs['description'],
                         expand=attrs['expand'],
                         groups=attrs['groups'],
                         guiGroup=attrs['guiGroup'])

        self._path  = attrs['path']
        self._class = attrs['class']
        self._nodes = attrs['nodes']
        self._bases = attrs['bases']

        # Tracking
        self._parent    = None
        self._root      = None
        self._client    = None
        self._functions = []
        self._loaded    = False

        # Setup logging
        self._log = pr.logInit(cls=self,name=self.name,path=self._path)

    def addToGroup(self, group: str) -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('addToGroup not supported in VirtualNode')

    def removeFromGroup(self, group: str) -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('removeFromGroup not supported in VirtualNode')

    def add(self, node: "VirtualNode") -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('add not supported in VirtualNode')

    def callRecursive(self, func: Callable[..., Any], nodeTypes: Any = None, **kwargs: Any) -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('callRecursive not supported in VirtualNode')

    def __getattr__(self, name: str) -> Any:
        """Lazy-load children on first access, then resolve as a child attribute."""
        if not self._loaded:
            self._loadNodes()
        return pr.Node.__getattr__(self,name)

    @property
    def nodes(self) -> dict[str, Any]:
        """Child nodes keyed by name."""
        if not self._loaded:
            self._loadNodes()
        return self._nodes

    def node(self, name: str, load: bool = True) -> "VirtualNode | None":
        """Return child node by name, optionally loading children first.

        Parameters
        ----------
        name : str
            Child node name.
        load : bool, optional (default = True)
            If True, load child nodes before lookup.

        Returns
        -------
        Any
            Child node or None if not found.
        """
        if (not self._loaded) and load:
            self._loadNodes()

        if name in self._nodes:
            return self._nodes[name]
        else:
            return None

    def _call(self, *args: Any, **kwargs: Any) -> Any:
        """Invoke remote __call__ on this node."""
        return self._client._remoteAttr(self._path, '__call__', *args, **kwargs)

    def _addListener(self, listener: Callable[..., Any]) -> None:
        """Add a variable update listener."""
        if listener not in self._functions:
            self._functions.append(listener)

    def _delListener(self, listener: Callable[..., Any]) -> None:
        """Remove a variable update listener."""
        if listener in self._functions:
            self._functions.remove(listener)

    def _addVarListener(self, func: Callable[..., Any]) -> None:
        """Forward addVarListener to the client."""
        self._client._addVarListener(func)

    def _loadNodes(self) -> None:
        """Populate child nodes from remote metadata."""
        self._loaded = True

        for k,node in self._client._remoteAttr(self._path, 'nodes').items():
            if k in self._nodes:
                node._parent = self
                node._root   = self._root
                node._client = self._client

                self._nodes[k] = node
                self._addArrayNode(node)

    def _getNode(self, path: str, load: bool = True) -> "VirtualNode | None":
        """Resolve a dotted path to a node.

        Parameters
        ----------
        path : str
            Dotted path (e.g. 'root.Child.GrandChild').
        load : bool, optional (default = True)
            If True, load child nodes as needed.

        Returns
        -------
        Any
            Node at path or None if not found.
        """
        obj = self

        if '.' in path:
            lst = path.split('.')

            if lst[0] != self.name and lst[0] != 'root':
                return None

            for a in lst[1:]:
                if not hasattr(obj,'node'):
                    return None
                obj = obj.node(a,load)

        elif path != self.name and path != 'root':
            return None

        return obj

    def isinstance(self, typ: type["VirtualNode"]) -> bool:
        """Check if this node's type string matches the given type."""
        cs = str(typ)
        return cs in self._bases

    def _rootAttached(self, parent: "VirtualNode", root: "VirtualNode") -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('_rootAttached not supported in VirtualNode')

    def _getDict(self, modes: list[str]) -> Any:
        """Not supported; raises NodeError."""
        raise pr.NodeError('_getDict not supported in VirtualNode')

    def _setDict(self, *args: Any, **kwargs: Any) -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('_setDict not supported in VirtualNode')

    def printYaml(
        self,
        readFirst: bool = False,
        modes: list[str] = ['RW', 'RO', 'WO'],
        incGroups: str | list[str] | None = None,
        excGroups: str | list[str] | None = ['Hidden'],
        recurse: bool = False,
    ) -> None:
        """Print remote YAML with selected access modes.

        Parameters
        ----------
        readFirst : bool, optional (default = False)
            If ``True``, perform a read before generating YAML output.
        modes : list['RW' | 'WO' | 'RO'], optional (default = ['RW','RO','WO'])
            Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
        incGroups : str or list[str], optional
            Group name or group names to include.
        excGroups : str or list[str], optional
            Group name or group names to exclude.
        recurse : bool, optional (default = False)
            If ``True``, include child nodes recursively.
        """
        print(self.getYaml(readFirst=readFirst, modes=modes, incGroups=incGroups, excGroups=excGroups, recurse=recurse))

    def _doUpdate(self, val: Any) -> None:
        """Notify listeners of a value update."""
        for func in self._functions:
            func(self.path,val)


[docs] class VirtualClient(rogue.interfaces.ZmqClient): """ A full featured client interface for Rogue. This can be used in scripts or other clients which require remote access to the running Rogue instance. This class ues a custom factory ensuring that only one instance of this class is created in a python script for a given remote connection. Parameters ---------- addr : str host address port : int host port linkTimeout : float, optional Idle timeout in seconds before the client marks the link down when no publish updates or successful replies have been observed. This is the primary operational timeout knob for both hardware and simulation applications. requestStallTimeout : float | None, optional Optional policy timeout for how long a single in-flight request may remain outstanding before the client declares the connection stalled. This is disabled by default and is typically only useful when a deployment has a well-defined upper bound for legitimate request time. Attributes ---------- linked: bool link state root: obj root class reference """ ClientCache = {} @staticmethod def _defaultLinkTimeout() -> float: """Return the default idle link timeout in seconds.""" return float(os.getenv("ROGUE_VIRTUAL_LINK_TIMEOUT", "10.0")) @staticmethod def _defaultRequestStallTimeout() -> float | None: """Return the default request-stall timeout in seconds, if configured.""" value = os.getenv("ROGUE_VIRTUAL_REQUEST_STALL_TIMEOUT") if value is None or value == "": return None stall = float(value) return None if stall <= 0 else stall @staticmethod def _defaultConnectTimeout() -> float: """Return the startup handshake timeout in seconds.""" return max(0.1, float(os.getenv("ROGUE_VIRTUAL_CONNECT_TIMEOUT", "5.0"))) def __new__( cls: type["VirtualClient"], addr: str = "localhost", port: int = 9099, linkTimeout: float | None = None, requestStallTimeout: float | None = None, ) -> "VirtualClient": """Return cached client instances keyed by ``(addr, port)``. Timeout arguments configure the cached instance for that server. If a client already exists for the same ``(addr, port)``, new timeout values are applied to the shared instance. """ newHash = hash((addr, port)) if newHash in cls.ClientCache: return VirtualClient.ClientCache[newHash] else: return super(VirtualClient, cls).__new__(cls, addr, port) def __init__( self, addr: str = "localhost", port: int = 9099, linkTimeout: float | None = None, requestStallTimeout: float | None = None, ) -> None: """Create or reconfigure a cached virtual client instance. Parameters ---------- addr : str, optional Server host name or address. port : int, optional Base ZMQ server port. linkTimeout : float | None, optional Idle timeout in seconds before the link is considered down. If not provided, the default is 10 seconds or the value from ``ROGUE_VIRTUAL_LINK_TIMEOUT``. requestStallTimeout : float | None, optional Optional maximum age of an in-flight request before the client treats it as stalled. ``None`` or non-positive values disable this policy. In practice this is usually left disabled unless a system has a known upper bound for valid request duration. """ if getattr(self, "_vcInitialized", False): if linkTimeout is not None or requestStallTimeout is not None: self.setTimeoutConfig(linkTimeout=linkTimeout, requestStallTimeout=requestStallTimeout) return self._cacheKey = hash((addr, port)) self._monEnable = False self._monThread = None self._vcInitialized = True VirtualClient.ClientCache[self._cacheKey] = self try: rogue.interfaces.ZmqClient.__init__(self,addr,port,False) except Exception: self._removeFromCache() self._vcInitialized = False raise self._varListeners = [] self._monitors = [] self._root = None self._link = False self._ltime = time.time() self._reqLock = threading.Lock() self._reqCount = 0 self._reqSince = None self._linkTimeout = self._defaultLinkTimeout() self._requestStallTimeout = self._defaultRequestStallTimeout() self.setTimeoutConfig(linkTimeout=linkTimeout, requestStallTimeout=requestStallTimeout) # Setup logging self._log = pr.logInit(cls=self,name="VirtualClient",path=None) # Get root name as a connection test self._root = None self.setTimeout(1000,False) try: self._root = self._waitForRoot() except Exception: self._cleanupFailedInit() error_message = ( f"\n\nFailed to connect to {addr}:{port}!\n\n" "Possible causes for the issue:\n" "- ZeroMQ Server not included in the root class\n" "- Root process not running\n" "- Mismatch between Client address and Server address\n" "- Mismatch between Client port and Server port\n" "- Server ports being blocked\n" ) print(error_message) return print("Connected to {} at {}:{}".format(self._root.name,addr,port)) self.setTimeout(1000,True) self._root._parent = self._root self._root._root = self._root self._root._client = self setattr(self,self._root.name,self._root) # Link tracking self._link = True self._ltime = self._root.Time.value() # Create monitoring thread self._monEnable = True self._monThread = threading.Thread(target=self._monWorker) self._monThread.start() def _removeFromCache(self) -> None: """Remove this client from the shared cache when it is no longer valid.""" if getattr(self, "_cacheKey", None) in VirtualClient.ClientCache and VirtualClient.ClientCache[self._cacheKey] is self: del VirtualClient.ClientCache[self._cacheKey] def _cleanupFailedInit(self) -> None: """Release transport resources after a failed bootstrap handshake.""" self._monEnable = False self._removeFromCache() self._vcInitialized = False try: rogue.interfaces.ZmqClient._stop(self) except Exception: pass def _waitForRoot(self) -> "VirtualNode": """Retry the initial ``__ROOT__`` handshake for a bounded startup window.""" deadline = time.monotonic() + self._defaultConnectTimeout() last_error = None while time.monotonic() < deadline: try: return self._remoteAttr('__ROOT__',None) except Exception as exc: last_error = exc if last_error is not None: raise last_error raise Exception("Timed out waiting for initial root handshake")
[docs] def addLinkMonitor(self, function: Callable[[bool], None]) -> None: """ Add a link monitor callback function. This function will be called any time the link state changes. A single boolean argument will be passed to the callback function containing the current link state. Parameters ---------- function : callable Callback function with the form ``function(linkState: bool)``. """ if function not in self._monitors: self._monitors.append(function)
[docs] def remLinkMonitor(self, function: Callable[[bool], None]) -> None: """ Remove a previously added link monitor function. Parameters ---------- function : callable Previously registered callback function. """ if function in self._monitors: self._monitors.remove(function)
@property def linked(self) -> bool: """Whether the client is currently linked to the server.""" return self._link @property def linkTimeout(self) -> float: """Idle timeout in seconds before the link is considered down.""" return self._linkTimeout @property def requestStallTimeout(self) -> float | None: """Optional maximum in-flight request age before it is treated as stalled.""" return self._requestStallTimeout
[docs] def setTimeoutConfig( self, *, linkTimeout: float | None = None, requestStallTimeout: float | None = None, ) -> None: """Update link and request-stall timeout settings. ``linkTimeout`` is the normal tuning knob for clients that need to tolerate longer busy periods. ``requestStallTimeout`` is an optional policy threshold and is most useful only when a deployment has a clear, finite upper bound for legitimate request duration. """ if linkTimeout is not None: self._linkTimeout = float(linkTimeout) if requestStallTimeout is not None: stall = float(requestStallTimeout) self._requestStallTimeout = None if stall <= 0 else stall
def _monWorker(self) -> None: """Monitor link heartbeat and emit link-state callbacks.""" while self._monEnable: time.sleep(1) self._checkLinkState() def _requestStart(self) -> None: """Record that a request/reply transaction is in flight.""" with self._reqLock: if self._reqCount == 0: self._reqSince = time.time() self._reqCount += 1 def _requestDone(self, success: bool) -> None: """Record request completion and treat successful replies as activity.""" with self._reqLock: self._reqCount = max(0, self._reqCount - 1) if self._reqCount == 0: self._reqSince = None if success: self._ltime = time.time() if not self._link: self._link = True self._log.warning(f"I have finally heard from {self._root.name}. All is good!") for mon in self._monitors: mon(self._link) def _requestPending(self) -> bool: """Whether at least one request/reply transaction is in flight.""" with self._reqLock: return self._reqCount > 0 def _requestAge(self) -> float | None: """Return the age of the oldest in-flight request, if any.""" with self._reqLock: if self._reqCount == 0 or self._reqSince is None: return None return time.time() - self._reqSince def _checkLinkState(self) -> None: """Update link state from recent activity while tolerating busy requests.""" delta = time.time() - self._ltime if self._link and delta > self._linkTimeout: if self._requestPending(): reqAge = self._requestAge() if self._requestStallTimeout is None or reqAge is None or reqAge <= self._requestStallTimeout: return self._link = False self._log.warning( f"Request has been pending for {reqAge:0.1f} seconds without updates. " f"Declaring link to {self._root.name} stalled." ) for mon in self._monitors: mon(self._link) return self._link = False self._log.warning( f"I have not heard from {self._root.name} in {self._linkTimeout:0.1f} seconds. " "It may be busy, continuing to wait..." ) for mon in self._monitors: mon(self._link) elif (not self._link) and delta < self._linkTimeout: self._link = True self._log.warning(f"I have finally heard from {self._root.name}. All is good!") for mon in self._monitors: mon(self._link) def _remoteAttr(self, path: str, attr: str, *args: Any, **kwargs: Any) -> Any: """Invoke a remote attribute on the server.""" self._requestStart() try: ret = pickle.loads(self._send(pickle.dumps({ 'path':path, 'attr':attr, 'args':args, 'kwargs':kwargs }))) self._requestDone(True) except Exception as e: self._requestDone(False) raise Exception(f"ZMQ Interface Exception: {e}") if isinstance(ret,Exception): raise ret return ret def _addVarListener(self, func: Callable[..., Any]) -> None: """Register a variable update listener.""" if func not in self._varListeners: self._varListeners.append(func) def _doUpdate(self, data: bytes) -> None: """Process variable update data from the server.""" self._ltime = time.time() if self._root is None: return d = pickle.loads(data) for k,val in d.items(): n = self._root.getNode(k,False) if n is not None: n._doUpdate(val) # Call listener functions, for func in self._varListeners: func(k,val)
[docs] def stop(self) -> None: """Stop the monitor thread and release resources.""" self._monEnable = False
@property def root(self) -> "VirtualNode": """Return the connected virtual root node.""" return self._root def __hash__(self) -> int: """Hash based on host and port.""" return hash((self._host, self._port)) def __eq__(self, other: "VirtualClient") -> bool: """Compare by host and port.""" return (self.host, self.port) == (other._host, other._port) def __ne__(self, other: "VirtualClient") -> bool: """Compare by host and port.""" return not (self == other) def __enter__(self) -> "VirtualClient": """Return self for context-manager use.""" return self def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: """Stop monitoring when leaving context-manager scope.""" self.stop()