Source code for pyrogue.interfaces._Virtual

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue base module - Virtual Classes
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------

import pyrogue as pr
import rogue.interfaces
import pickle
import time
import threading


class VirtualProperty(object):
    def __init__(self, node, attr):
        self._attr = attr
        self._node = node

    def __get__(self, obj=None, objtype=None):
        return self._node._client._remoteAttr(self._node._path, self._attr)

    def __set__(self, obj, value):
        pass


class VirtualMethod(object):
    def __init__(self, node, attr, info):
        self._attr   = attr
        self._node   = node
        self._args   = info['args']
        self._kwargs = info['kwargs']

    def __call__(self, *args, **kwargs):
        return self._node._client._remoteAttr(self._node._path, self._attr, *args, **kwargs)


def VirtualFactory(data):

    def __init__(self,data):

        # Add dynamic methods
        for k,v in data['funcs'].items():
            setattr(self.__class__,k,VirtualMethod(self,k,v))

        # Add dynamic properties
        for k in data['props']:
            setattr(self.__class__,k,VirtualProperty(self,k))

        # Add __call__ if command or Process
        if str(pr.BaseCommand) in data['bases'] or str(pr.Process) in data['bases']:
            setattr(self.__class__,'__call__',self._call)

        # Add getNode and addVarListener if root
        if str(pr.Root) in data['bases']:
            setattr(self.__class__,'getNode',self._getNode)
            setattr(self.__class__,'addVarListener',self._addVarListener)

        # Add addListener if Variable
        if str(pr.BaseVariable) in data['bases']:
            setattr(self.__class__,'addListener',self._addListener)
            setattr(self.__class__,'delListener',self._delListener)

        VirtualNode.__init__(self,data)

    newclass = type('Virtual' + data['class'], (VirtualNode,), {"__init__": __init__})
    return newclass(data)


class VirtualNode(pr.Node):
    def __init__(self, attrs):
        super().__init__(name=attrs['name'],
                         description=attrs['description'],
                         expand=attrs['expand'],
                         groups=attrs['groups'],
                         guiGroup=attrs['guiGroup'])

        self._path  = attrs['path']
        self._class = attrs['class']
        self._nodes = attrs['nodes']
        self._bases = attrs['bases']

        # Tracking
        self._parent    = None
        self._root      = None
        self._client    = None
        self._functions = []
        self._loaded    = False

        # Setup logging
        self._log = pr.logInit(cls=self,name=self.name,path=self._path)

    def addToGroup(self,group):
        raise pr.NodeError('addToGroup not supported in VirtualNode')

    def removeFromGroup(self,group):
        raise pr.NodeError('removeFromGroup not supported in VirtualNode')

    def add(self,node):
        raise pr.NodeError('add not supported in VirtualNode')

    def callRecursive(self, func, nodeTypes=None, **kwargs):
        raise pr.NodeError('callRecursive not supported in VirtualNode')

    def __getattr__(self, name):
        if not self._loaded:
            self._loadNodes()
        return pr.Node.__getattr__(self,name)

    @property
    def nodes(self):
        if not self._loaded:
            self._loadNodes()
        return self._nodes

    def node(self, name, load=True):
        if (not self._loaded) and load:
            self._loadNodes()

        if name in self._nodes:
            return self._nodes[name]
        else:
            return None

    def _call(self, *args, **kwargs):
        return self._client._remoteAttr(self._path, '__call__', *args, **kwargs)

    def _addListener(self, listener):
        if listener not in self._functions:
            self._functions.append(listener)

    def _delListener(self, listener):
        if listener in self._functions:
            self._functions.remove(listener)

    def _addVarListener(self,func):
        self._client._addVarListener(func)

    def _loadNodes(self):
        self._loaded = True

        for k,node in self._client._remoteAttr(self._path, 'nodes').items():
            if k in self._nodes:
                node._parent = self
                node._root   = self._root
                node._client = self._client

                self._nodes[k] = node
                self._addArrayNode(node)

    def _getNode(self,path,load=True):
        obj = self

        if '.' in path:
            lst = path.split('.')

            if lst[0] != self.name and lst[0] != 'root':
                return None

            for a in lst[1:]:
                if not hasattr(obj,'node'):
                    return None
                obj = obj.node(a,load)

        elif path != self.name and path != 'root':
            return None

        return obj

    def isinstance(self,typ):
        cs = str(typ)
        return cs in self._bases

    def _rootAttached(self,parent,root):
        raise pr.NodeError('_rootAttached not supported in VirtualNode')

    def _getDict(self,modes):
        raise pr.NodeError('_getDict not supported in VirtualNode')

    def _setDict(self,*args,**kwargs):
        raise pr.NodeError('_setDict not supported in VirtualNode')

    def printYaml(self, readFirst=False, modes=['RW','RO','WO'], incGroups=None, excGroups=['Hidden'], recurse=False):
        print(self.getYaml(readFirst=readFirst, modes=modes, incGroups=incGroups, excGroups=excGroups, recurse=recurse))

    def _doUpdate(self, val):
        for func in self._functions:
            func(self.path,val)


[docs]class VirtualClient(rogue.interfaces.ZmqClient): """ A full featured client interface for Rogue. This can be used in scripts or other clients which require remote access to the running Rogue instance. This class ues a custom factory ensuring that only one instance of this class is created in a python script for a given remote connection. Parameters ---------- addr : str host address port : int host port Attributes ---------- linked: bool link state root: obj root class reference """ ClientCache = {} def __new__(cls, addr="localhost", port=9099): newHash = hash((addr, port)) if newHash in cls.ClientCache: return VirtualClient.ClientCache[newHash] else: return super(VirtualClient, cls).__new__(cls, addr, port) def __init__(self, addr="localhost", port=9099): if hash((addr,port)) in VirtualClient.ClientCache: return VirtualClient.ClientCache[hash((addr, port))] = self rogue.interfaces.ZmqClient.__init__(self,addr,port,False) self._varListeners = [] self._monitors = [] self._root = None self._link = False self._ltime = time.time() # Setup logging self._log = pr.logInit(cls=self,name="VirtualClient",path=None) # Get root name as a connection test self._root = None self.setTimeout(1000,False) try: self._root = self._remoteAttr('__ROOT__',None) except Exception: print("\n\nFailed to connected to {}:{}!!!! Please Close PyDM Window!!!!\n\n".format(addr,port)) return print("Connected to {} at {}:{}".format(self._root.name,addr,port)) self.setTimeout(1000,True) self._root._parent = self._root self._root._root = self._root self._root._client = self setattr(self,self._root.name,self._root) # Link tracking self._link = True self._ltime = self._root.Time.value() # Create monitoring thread self._monEnable = True self._monThread = threading.Thread(target=self._monWorker) self._monThread.start()
[docs] def addLinkMonitor(self, function): """ Add a link monitor callback function. This function will be called any time the link state changes. A single boolean argument will be passed to the callback function containing the current link state. Parameters ---------- function : obj Call back function with the form function(linkState) """ if function not in self._monitors: self._monitors.append(function)
[docs] def remLinkMonitor(self, function): """ Remove a previously added link monitor function. Parameters ---------- function : obj Call back function """ if function in self._monitors: self._monitors.remove(function)
@property def linked(self): return self._link def _monWorker(self): while self._monEnable: time.sleep(1) if self._link and (time.time() - self._ltime) > 10.0: self._link = False self._log.warning(f"I have not heard from {self._root.name} in 10 seconds. It may be busy, continuing to wait...") for mon in self._monitors: mon(self._link) elif (not self._link) and (time.time() - self._ltime) < 10.0: self._link = True self._log.warning(f"I have finally heard from {self._root.name}. All is good!") for mon in self._monitors: mon(self._link) def _remoteAttr(self, path, attr, *args, **kwargs): try: ret = pickle.loads(self._send(pickle.dumps({ 'path':path, 'attr':attr, 'args':args, 'kwargs':kwargs }))) except Exception as e: raise Exception(f"ZMQ Interface Exception: {e}") if isinstance(ret,Exception): raise ret return ret def _addVarListener(self,func): if func not in self._varListeners: self._varListeners.append(func) def _doUpdate(self,data): self._ltime = time.time() if self._root is None: return d = pickle.loads(data) for k,val in d.items(): n = self._root.getNode(k,False) if n is not None: n._doUpdate(val) # Call listener functions, for func in self._varListeners: func(k,val) def stop(self): self._monEnable = False @property def root(self): return self._root def __hash__(self): return hash((self._host, self._port)) def __eq__(self, other): return (self.host, self.port) == (other._host, other._port) def __ne__(self, other): return not (self == other) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.stop()