Source code for pyrogue.interfaces._Virtual

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue base module - Virtual Classes
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------

import pyrogue as pr
import rogue.interfaces
import pickle
import time
import threading
from typing import Any, Callable


class VirtualProperty(object):
    """Descriptor for virtual properties that fetch values from a remote client.

    Parameters
    ----------
    node : VirtualNode
        Virtual node instance owning this property.
    attr : str
        Attribute name to fetch from the remote.
    """

    def __init__(self, node: "VirtualNode", attr: str) -> None:
        self._attr = attr
        self._node = node

    def __get__(self, obj: Any = None, objtype: Any = None) -> Any:
        """Fetch attribute value from the remote client."""
        return self._node._client._remoteAttr(self._node._path, self._attr)

    def __set__(self, obj: Any, value: Any) -> None:
        """No-op setter; virtual properties are read-only."""
        pass


class VirtualMethod(object):
    """Descriptor for virtual methods that invoke remote calls.

    Parameters
    ----------
    node : VirtualNode
        Virtual node instance owning this method.
    attr : str
        Attribute name to invoke on the remote.
    info : dict[str, Any]
        Method metadata including ``args`` and ``kwargs``.
    """

    def __init__(self, node: "VirtualNode", attr: str, info: dict[str, Any]) -> None:
        self._attr   = attr
        self._node   = node
        self._args   = info['args']
        self._kwargs = info['kwargs']

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """Invoke the remote attribute with the given arguments."""
        return self._node._client._remoteAttr(self._node._path, self._attr, *args, **kwargs)


def VirtualFactory(data: dict[str, Any]) -> Any:
    """Create a virtual class instance from serialized node data.

    Parameters
    ----------
    data : dict
        Serialized node data including 'funcs', 'props', 'bases', 'class', etc.

    Returns
    -------
    Any
        Instantiated virtual node with dynamic methods and properties.
    """

    def __init__(self, data: dict[str, Any]) -> None:
        # Add dynamic methods
        for k,v in data['funcs'].items():
            setattr(self.__class__,k,VirtualMethod(self,k,v))

        # Add dynamic properties
        for k in data['props']:
            setattr(self.__class__,k,VirtualProperty(self,k))

        # Add __call__ if command or Process
        if str(pr.BaseCommand) in data['bases'] or str(pr.Process) in data['bases']:
            setattr(self.__class__,'__call__',self._call)

        # Add getNode and addVarListener if root
        if str(pr.Root) in data['bases']:
            setattr(self.__class__,'getNode',self._getNode)
            setattr(self.__class__,'addVarListener',self._addVarListener)

        # Add addListener if Variable
        if str(pr.BaseVariable) in data['bases']:
            setattr(self.__class__,'addListener',self._addListener)
            setattr(self.__class__,'delListener',self._delListener)

        VirtualNode.__init__(self,data)

    newclass = type('Virtual' + data['class'], (VirtualNode,), {"__init__": __init__})
    return newclass(data)


class VirtualNode(pr.Node):
    """Virtual proxy node populated from remote tree metadata.

    Parameters
    ----------
    attrs : dict[str, Any]
        Serialized node attributes including ``name``, ``description``,
        ``path``, ``class``, ``bases``, and ``nodes``.
    """

    def __init__(self, attrs: dict[str, Any]) -> None:
        super().__init__(name=attrs['name'],
                         description=attrs['description'],
                         expand=attrs['expand'],
                         groups=attrs['groups'],
                         guiGroup=attrs['guiGroup'])

        self._path  = attrs['path']
        self._class = attrs['class']
        self._nodes = attrs['nodes']
        self._bases = attrs['bases']

        # Tracking
        self._parent    = None
        self._root      = None
        self._client    = None
        self._functions = []
        self._loaded    = False

        # Setup logging
        self._log = pr.logInit(cls=self,name=self.name,path=self._path)

    def addToGroup(self, group: str) -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('addToGroup not supported in VirtualNode')

    def removeFromGroup(self, group: str) -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('removeFromGroup not supported in VirtualNode')

    def add(self, node: "VirtualNode") -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('add not supported in VirtualNode')

    def callRecursive(self, func: Callable[..., Any], nodeTypes: Any = None, **kwargs: Any) -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('callRecursive not supported in VirtualNode')

    def __getattr__(self, name: str) -> Any:
        """Lazy-load children on first access, then resolve as a child attribute."""
        if not self._loaded:
            self._loadNodes()
        return pr.Node.__getattr__(self,name)

    @property
    def nodes(self) -> dict[str, Any]:
        """Child nodes keyed by name."""
        if not self._loaded:
            self._loadNodes()
        return self._nodes

    def node(self, name: str, load: bool = True) -> "VirtualNode | None":
        """Return child node by name, optionally loading children first.

        Parameters
        ----------
        name : str
            Child node name.
        load : bool, optional (default = True)
            If True, load child nodes before lookup.

        Returns
        -------
        Any
            Child node or None if not found.
        """
        if (not self._loaded) and load:
            self._loadNodes()

        if name in self._nodes:
            return self._nodes[name]
        else:
            return None

    def _call(self, *args: Any, **kwargs: Any) -> Any:
        """Invoke remote __call__ on this node."""
        return self._client._remoteAttr(self._path, '__call__', *args, **kwargs)

    def _addListener(self, listener: Callable[..., Any]) -> None:
        """Add a variable update listener."""
        if listener not in self._functions:
            self._functions.append(listener)

    def _delListener(self, listener: Callable[..., Any]) -> None:
        """Remove a variable update listener."""
        if listener in self._functions:
            self._functions.remove(listener)

    def _addVarListener(self, func: Callable[..., Any]) -> None:
        """Forward addVarListener to the client."""
        self._client._addVarListener(func)

    def _loadNodes(self) -> None:
        """Populate child nodes from remote metadata."""
        self._loaded = True

        for k,node in self._client._remoteAttr(self._path, 'nodes').items():
            if k in self._nodes:
                node._parent = self
                node._root   = self._root
                node._client = self._client

                self._nodes[k] = node
                self._addArrayNode(node)

    def _getNode(self, path: str, load: bool = True) -> "VirtualNode | None":
        """Resolve a dotted path to a node.

        Parameters
        ----------
        path : str
            Dotted path (e.g. 'root.Child.GrandChild').
        load : bool, optional (default = True)
            If True, load child nodes as needed.

        Returns
        -------
        Any
            Node at path or None if not found.
        """
        obj = self

        if '.' in path:
            lst = path.split('.')

            if lst[0] != self.name and lst[0] != 'root':
                return None

            for a in lst[1:]:
                if not hasattr(obj,'node'):
                    return None
                obj = obj.node(a,load)

        elif path != self.name and path != 'root':
            return None

        return obj

    def isinstance(self, typ: type["VirtualNode"]) -> bool:
        """Check if this node's type string matches the given type."""
        cs = str(typ)
        return cs in self._bases

    def _rootAttached(self, parent: "VirtualNode", root: "VirtualNode") -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('_rootAttached not supported in VirtualNode')

    def _getDict(self, modes: list[str]) -> Any:
        """Not supported; raises NodeError."""
        raise pr.NodeError('_getDict not supported in VirtualNode')

    def _setDict(self, *args: Any, **kwargs: Any) -> None:
        """Not supported; raises NodeError."""
        raise pr.NodeError('_setDict not supported in VirtualNode')

    def printYaml(
        self,
        readFirst: bool = False,
        modes: list[str] = ['RW', 'RO', 'WO'],
        incGroups: str | list[str] | None = None,
        excGroups: str | list[str] | None = ['Hidden'],
        recurse: bool = False,
    ) -> None:
        """Print remote YAML with selected access modes.

        Parameters
        ----------
        readFirst : bool, optional (default = False)
            If ``True``, perform a read before generating YAML output.
        modes : list['RW' | 'WO' | 'RO'], optional (default = ['RW','RO','WO'])
            Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
        incGroups : str or list[str], optional
            Group name or group names to include.
        excGroups : str or list[str], optional
            Group name or group names to exclude.
        recurse : bool, optional (default = False)
            If ``True``, include child nodes recursively.
        """
        print(self.getYaml(readFirst=readFirst, modes=modes, incGroups=incGroups, excGroups=excGroups, recurse=recurse))

    def _doUpdate(self, val: Any) -> None:
        """Notify listeners of a value update."""
        for func in self._functions:
            func(self.path,val)


[docs] class VirtualClient(rogue.interfaces.ZmqClient): """ A full featured client interface for Rogue. This can be used in scripts or other clients which require remote access to the running Rogue instance. This class ues a custom factory ensuring that only one instance of this class is created in a python script for a given remote connection. Parameters ---------- addr : str host address port : int host port Attributes ---------- linked: bool link state root: obj root class reference """ ClientCache = {} def __new__(cls: type["VirtualClient"], addr: str = "localhost", port: int = 9099) -> "VirtualClient": """Return cached client instances keyed by ``(addr, port)``.""" newHash = hash((addr, port)) if newHash in cls.ClientCache: return VirtualClient.ClientCache[newHash] else: return super(VirtualClient, cls).__new__(cls, addr, port) def __init__(self, addr: str = "localhost", port: int = 9099) -> None: if hash((addr,port)) in VirtualClient.ClientCache: return VirtualClient.ClientCache[hash((addr, port))] = self rogue.interfaces.ZmqClient.__init__(self,addr,port,False) self._varListeners = [] self._monitors = [] self._root = None self._link = False self._ltime = time.time() # Setup logging self._log = pr.logInit(cls=self,name="VirtualClient",path=None) # Get root name as a connection test self._root = None self.setTimeout(1000,False) try: self._root = self._remoteAttr('__ROOT__',None) except Exception: error_message = ( f"\n\nFailed to connect to {addr}:{port}!\n\n" "Possible causes for the issue:\n" "- ZeroMQ Server not included in the root class\n" "- Root process not running\n" "- Mismatch between Client address and Server address\n" "- Mismatch between Client port and Server port\n" "- Server ports being blocked\n" ) print(error_message) return print("Connected to {} at {}:{}".format(self._root.name,addr,port)) self.setTimeout(1000,True) self._root._parent = self._root self._root._root = self._root self._root._client = self setattr(self,self._root.name,self._root) # Link tracking self._link = True self._ltime = self._root.Time.value() # Create monitoring thread self._monEnable = True self._monThread = threading.Thread(target=self._monWorker) self._monThread.start()
[docs] def addLinkMonitor(self, function: Callable[[bool], None]) -> None: """ Add a link monitor callback function. This function will be called any time the link state changes. A single boolean argument will be passed to the callback function containing the current link state. Parameters ---------- function : callable Callback function with the form ``function(linkState: bool)``. """ if function not in self._monitors: self._monitors.append(function)
[docs] def remLinkMonitor(self, function: Callable[[bool], None]) -> None: """ Remove a previously added link monitor function. Parameters ---------- function : callable Previously registered callback function. """ if function in self._monitors: self._monitors.remove(function)
@property def linked(self) -> bool: """Whether the client is currently linked to the server.""" return self._link def _monWorker(self) -> None: """Monitor link heartbeat and emit link-state callbacks.""" while self._monEnable: time.sleep(1) if self._link and (time.time() - self._ltime) > 10.0: self._link = False self._log.warning(f"I have not heard from {self._root.name} in 10 seconds. It may be busy, continuing to wait...") for mon in self._monitors: mon(self._link) elif (not self._link) and (time.time() - self._ltime) < 10.0: self._link = True self._log.warning(f"I have finally heard from {self._root.name}. All is good!") for mon in self._monitors: mon(self._link) def _remoteAttr(self, path: str, attr: str, *args: Any, **kwargs: Any) -> Any: """Invoke a remote attribute on the server.""" try: ret = pickle.loads(self._send(pickle.dumps({ 'path':path, 'attr':attr, 'args':args, 'kwargs':kwargs }))) except Exception as e: raise Exception(f"ZMQ Interface Exception: {e}") if isinstance(ret,Exception): raise ret return ret def _addVarListener(self, func: Callable[..., Any]) -> None: """Register a variable update listener.""" if func not in self._varListeners: self._varListeners.append(func) def _doUpdate(self, data: bytes) -> None: """Process variable update data from the server.""" self._ltime = time.time() if self._root is None: return d = pickle.loads(data) for k,val in d.items(): n = self._root.getNode(k,False) if n is not None: n._doUpdate(val) # Call listener functions, for func in self._varListeners: func(k,val)
[docs] def stop(self) -> None: """Stop the monitor thread and release resources.""" self._monEnable = False
@property def root(self) -> "VirtualNode": """Return the connected virtual root node.""" return self._root def __hash__(self) -> int: """Hash based on host and port.""" return hash((self._host, self._port)) def __eq__(self, other: "VirtualClient") -> bool: """Compare by host and port.""" return (self.host, self.port) == (other._host, other._port) def __ne__(self, other: "VirtualClient") -> bool: """Compare by host and port.""" return not (self == other) def __enter__(self) -> "VirtualClient": """Return self for context-manager use.""" return self def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: """Stop monitoring when leaving context-manager scope.""" self.stop()