Source code for pyrogue._Node

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue base module - Node Classes
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import sys
from collections import OrderedDict as odict
import logging
import re
import inspect
import pyrogue as pr
import collections


def logException(log,e):
    """
    logs instances of memory error, else, will log the exception.

    Parameters
    ----------
    log :
        calls command to log instance of memory error or exception
    e :
        stores values of memory errors or exceptions

    Returns
    -------
    none
    """
    if isinstance(e,pr.MemoryError):
        log.error(e)
    else:
        log.exception(e)


def logInit(cls=None,name=None,path=None):
    """
    logs base classes in order of highest ranking class. Checks values of  cls, name, and path, if their value is not  =None, will do something

    Parameters
    ----------
    cls :
         (Default value = None)
    name :
         (Default value = None)
    path :
         (Default value = None)

    Returns
    -------
    returns logger
    """

    # Support base class in order of precedence
    baseClasses = odict({pr.BaseCommand : 'Command', pr.BaseVariable : 'Variable',
                         pr.Root : 'Root', pr.Device : 'Device'})

    """Init a logging object. Set global options."""
    logging.basicConfig(
        #level=logging.NOTSET,
        format="%(levelname)s:%(name)s:%(message)s",
        stream=sys.stdout)

    # All logging starts with rogue prefix
    ln = 'pyrogue'

    # Next add the highest ranking base class
    if cls is not None:
        for k,v in baseClasses.items():
            if isinstance(cls,k):
                ln += f'.{v}'
                break

        # Next subclass name
        ln += f'.{cls.__class__.__name__}'

    # Add full path if passed
    if path is not None:
        ln += f'.{path}'

    # Otherwise just add name if passed
    elif name is not None:
        ln += f'.{name}'

    return logging.getLogger(ln)


def expose(item):
    """
    Allows user to interface with function under item parameter

    Parameters
    ----------
    item :


    Returns
    -------
    returns item
    """

    # Property
    if inspect.isdatadescriptor(item):
        item.fget._rogueExposed = True
        return item

    # Method
    item._rogueExposed = True
    return item


class NodeError(Exception):
    """ """
    pass


[docs] class Node(object): """ Class which serves as a managed object within the pyrogue package. Each node has the following public fields: name: Global name of object description: Description of the object. groups: Group or groups this node belongs to. Examples: 'Hidden', 'NoState', 'NoConfig', 'NoStream', 'NoSql', 'NoServe' classtype: text string matching name of node sub-class path: Full path to the node (ie. node1.node2.node3) Each node is associated with a parent and has a link to the top node of a tree. A node has a list of sub-nodes as well as each sub-node being attached as an attribute. This allows tree browsing using: node1.node2.node3 Parameters ---------- name : global name of object description : Description of object path : full path to the node (ie. node1.node2.node3) expand : guiGroup : arbitrary groups for gui and graphical aesthetic purposes Returns ------- """ _nodeCount = 0 def __init__(self, *, name, description="", expand=True, hidden=False, groups=None, guiGroup=None): """ Init the node with passed attributes Parameters ---------- name : str name of variable description : str description of variable expand : bool hidden : bool Default value = False, variable is not part of hidden group groups : guiGroup : Returns ------- """ pr.Node._nodeCount += 1 # Public attributes self._name = name self._description = description self._path = name self._expand = expand self._guiGroup = guiGroup # Tracking self._parent = None self._root = None self._nodes = odict() self._anodes = odict() # Setup logging self._log = logInit(cls=self,name=name,path=None) if groups is None: self._groups = [] elif isinstance(groups,list): self._groups = groups else: self._groups = [groups] if hidden is True: self.addToGroup('Hidden') def __repr__(self): return f'{self.__class__} - {self.path}' @property def nodeCount(self): return pr.Node._nodeCount @property def name(self): """ assigns and returns name attribute to class variable """ return self._name @property def description(self): """ assigns and returns description attribute to class variable """ return self._description @property def groups(self): """ assigns and returns groups attribute to class variable """ return self._groups
[docs] def inGroup(self, group): """ Parameters ---------- group : Returns ------- """ if isinstance(group,list): return len(set(group) & set(self._groups)) > 0 else: return group in self._groups
[docs] def filterByGroup(self, incGroups, excGroups): """ Filter by the passed list of inclusion and exclusion groups Parameters ---------- incGroups : list of passed inclusion groups excGroups : list of passed exclusion groups Returns ------- """ return ((incGroups is None) or (len(incGroups) == 0) or (self.inGroup(incGroups))) and \ ((excGroups is None) or (len(excGroups) == 0) or (not self.inGroup(excGroups)))
[docs] def addToGroup(self,group): """ Add this node to the passed group, recursive to children Parameters ---------- group : variable denoting a node that has not been passed Returns ------- """ if group not in self._groups: self._groups.append(group) for k,v in self._nodes.items(): v.addToGroup(group)
[docs] def removeFromGroup(self,group): """ Remove this node from the passed group, not recursive Parameters ---------- group : variable denoting a node that is in the passed group Returns ------- if in group, remove from group """ if group in self._groups: self._groups.remove(group)
@property def hidden(self): """ assigns and returns hidden attribute to class variable """ return self.inGroup('Hidden') @hidden.setter def hidden(self, value): """ Add or remove node from the Hidden group Parameters ---------- value : bool Boolean value assigned to the Hidden group Returns ------- if value = true, add to group, else remove from group """ if value is True: self.addToGroup('Hidden') else: self.removeFromGroup('Hidden') @property def path(self): """ """ return self._path @property def expand(self): """ """ return self._expand @property def guiGroup(self): """ """ return self._guiGroup def __getattr__(self, name): """ Allow child Nodes with the 'name[key]' naming convention to be accessed as if they belong to a dictionary of Nodes with that 'name'. Raises AttributeError if no such named Nodes are found. Parameters ---------- name : str builds an OrderedDict of all child nodes named as 'name[key]' and returns it. Returns ------- """ # Node matches name in node list if name in self._nodes: return self._nodes[name] # Node matches name in node dictionary list elif name in self._anodes: return self._anodes[name] else: raise AttributeError('{} has no attribute {}'.format(self, name)) def __dir__(self): return super().__dir__() + [k for k,v in self._nodes.items()] def __reduce__(self): attr = {} attr['name'] = self._name attr['class'] = self.__class__.__name__ attr['bases'] = pr.genBaseList(self.__class__) attr['description'] = self._description attr['groups'] = self._groups attr['path'] = self._path attr['expand'] = self._expand attr['guiGroup'] = self._guiGroup attr['nodes'] = odict({k:None for k,v in self._nodes.items() if not v.inGroup('NoServe')}) attr['props'] = [] attr['funcs'] = {} # Get properties for k,v in inspect.getmembers(self.__class__, lambda a: isinstance(a,property)): if hasattr(v.fget,'_rogueExposed'): attr['props'].append(k) # Get methods for k,v in inspect.getmembers(self.__class__, callable): if hasattr(v,'_rogueExposed'): attr['funcs'][k] = {'args' : [a for a in inspect.getfullargspec(v).args if a != 'self'], 'kwargs' : inspect.getfullargspec(v).kwonlyargs} return (pr.interfaces.VirtualFactory, (attr,)) def __contains__(self, item): return item in self.nodes.values()
[docs] def add(self,node): """ Add node as sub-node Parameters ---------- node : Returns ------- """ # Special case if list (or iterable of nodes) is passed if isinstance(node, collections.abc.Iterable) and all(isinstance(n, Node) for n in node): for n in node: self.add(n) return # Fail if added to a non device node (may change in future) if not isinstance(self,pr.Device): raise NodeError('Attempting to add node with name %s to non device node %s.' % (node.name,self.name)) # Fail if root already exists if self._root is not None: raise NodeError('Error adding node with name %s to %s. Tree is already started.' % (node.name,self.name)) # Error if added node already has a parent if node._parent is not None: raise NodeError('Error adding node with name %s to %s. Node is already attached.' % (node.name,self.name)) # Names of all sub-nodes must be unique if node.name in self.__dir__() or node.name in self._anodes: raise NodeError('Error adding node with name %s to %s. Name collision.' % (node.name,self.name)) # Add some checking for characters which will make lookups problematic if re.match('^[\\w_\\[\\]]+$',node.name) is None: self._log.warning('Node {} with one or more special characters will cause lookup errors.'.format(node.name)) # Detect and add array nodes self._addArrayNode(node) # Add to primary list self._nodes[node.name] = node
def _addArrayNode(self, node): """ Parameters ---------- node : Returns ------- """ # Generic test array method fields = re.split('\\]\\[|\\[|\\]',node.name) if len(fields) < 3: return # Extract name and keys aname = fields[0] keys = fields[1:-1] if not all([key.isdigit() for key in keys]): self._log.warning('Array node {} with non numeric key will cause lookup errors.'.format(node.name)) return # Detect collisions if aname in self.__dir__(): raise NodeError('Error adding node with name %s to %s. Name collision.' % (node.name,self.name)) # Create dictionary containers if aname not in self._anodes: self._anodes[aname] = {} # Start at primary list sub = self._anodes[aname] # Iterate through keys for i in range(len(keys)): k = int(keys[i]) # Last key, set node, check for overlaps if i == (len(keys)-1): if k in sub: raise NodeError('Error adding node with name %s to %s. Name collision.' % (node.name,self.name)) sub[k] = node return # Next level is empty if k not in sub: sub[k] = {} # check for overlaps elif isinstance(sub[k],Node): raise NodeError('Error adding node with name %s to %s. Name collision.' % (node.name,self.name)) sub = sub[k]
[docs] def addNode(self, nodeClass, **kwargs): """ Parameters ---------- nodeClass : **kwargs : Returns ------- """ self.add(nodeClass(**kwargs))
[docs] def addNodes(self, nodeClass, number, stride, **kwargs): """ Parameters ---------- nodeClass : number : stride : **kwargs : Returns ------- """ name = kwargs.pop('name') offset = kwargs.pop('offset') for i in range(number): self.add(nodeClass(name='{:s}[{:d}]'.format(name, i), offset=offset+(i*stride), **kwargs))
@property def nodeList(self): """Get a recursive list of nodes.""" lst = [] for key,value in self._nodes.items(): lst.append(value) lst.extend(value.nodeList) return lst
[docs] def getNodes(self,typ,excTyp=None,incGroups=None,excGroups=None): """ Get a filtered ordered dictionary of nodes. pass a class type to receive a certain type of node class type may be a string when called over Pyro4 exc is a class type to exclude, incGroups is an optional group or list of groups that this node must be part of excGroups is an optional group or list of groups that this node must not be part of Parameters ---------- typ : excTyp : (Default value = None) incGroups : (Default value = None) excGroups : (Default value = None) Returns ------- ordered dictionary of nodes """ return odict([(k,n) for k,n in self.nodes.items() if (n.isinstance(typ) and ((excTyp is None) or (not n.isinstance(excTyp))) and n.filterByGroup(incGroups,excGroups))])
@property def nodes(self): """Get a ordered dictionary of all nodes.""" return self._nodes @property def variables(self): """ """ return self.getNodes(typ=pr.BaseVariable,excTyp=pr.BaseCommand)
[docs] def variablesByGroup(self,incGroups=None,excGroups=None): """ Parameters ---------- incGroups : (Default value = None) excGroups : (Default value = None) Returns ------- type Pass list of include and / or exclude groups """ return self.getNodes(typ=pr.BaseVariable,excTyp=pr.BaseCommand,incGroups=incGroups,excGroups=excGroups)
@property def variableList(self): """Get a recursive list of variables and commands.""" lst = [] for key,value in self.nodes.items(): if value.isinstance(pr.BaseVariable): lst.append(value) else: lst.extend(value.variableList) return lst @property def commands(self): """ """ return self.getNodes(typ=pr.BaseCommand)
[docs] def commandsByGroup(self,incGroups=None,excGroups=None): """ Parameters ---------- incGroups : (Default value = None) excGroups : (Default value = None) Returns ------- type Pass list of include and / or exclude groups """ return self.getNodes(typ=pr.BaseCommand,incGroups=incGroups,excGroups=excGroups)
@property def devices(self): """ """ return self.getNodes(pr.Device)
[docs] def devicesByGroup(self,incGroups=None,excGroups=None): """ Parameters ---------- incGroups : (Default value = None) excGroups : (Default value = None) Returns ------- type Pass list of include and / or exclude groups """ return self.getNodes(pr.Device,incGroups=incGroups,excGroups=excGroups)
@property def deviceList(self): """Get a recursive list of devices""" lst = [] for key,value in self.nodes.items(): if value.isinstance(pr.Device): lst.append(value) lst.extend(value.deviceList) return lst @property def parent(self): """ """ return self._parent @property def root(self): """ """ return self._root
[docs] def node(self, name): """ Parameters ---------- name : Returns ------- """ if name in self._nodes: return self._nodes[name] else: return None
@property def isDevice(self): """ """ return self.isinstance(pr.Device) @property def isVariable(self): """ """ return (self.isinstance(pr.BaseVariable) and (not self.isinstance(pr.BaseCommand))) @property def isCommand(self): """ """ return self.isinstance(pr.BaseCommand)
[docs] def find(self, *, recurse=True, typ=None, **kwargs): """ Find all child nodes that are a base class of 'typ' and whose properties match all of the kwargs. For string properties, accepts regexes. Parameters ---------- * : recurse : (Default value = True) typ : (Default value = None) **kwargs : Returns ------- """ if typ is None: typ = pr.Node found = [] for node in self.nodes.values(): if node.isinstance(typ): for prop, value in kwargs.items(): if not hasattr(node, prop): break attr = getattr(node, prop) if isinstance(value, str): if not re.match(value, attr): break else: if inspect.ismethod(attr): attr = attr() if not value == attr: break else: found.append(node) if recurse: found.extend(node.find(recurse=recurse, typ=typ, **kwargs)) return found
[docs] def callRecursive(self, func, nodeTypes=None, **kwargs): """ Parameters ---------- func : nodeTypes : (Default value = None) **kwargs : Returns ------- """ # Call the function getattr(self, func)(**kwargs) if nodeTypes is None: nodeTypes = [pr.Node] # Recursively call the function for key, node in self.nodes.items(): if any(isinstance(node, typ) for typ in nodeTypes): node.callRecursive(func, nodeTypes, **kwargs)
# this might be useful
[docs] def makeRecursive(self, func, nodeTypes=None): """ Parameters ---------- func : nodeTypes : (Default value = None) Returns ------- """ def closure(**kwargs): """ Parameters ---------- **kwargs : Returns ------- """ self.callRecursive(func, nodeTypes, **kwargs) return closure
[docs] def isinstance(self,typ): """ Parameters ---------- typ : Returns ------- """ return isinstance(self,typ)
def _rootAttached(self,parent,root): """ Called once the root node is attached. Parameters ---------- parent : root : Returns ------- """ self._parent = parent self._root = root self._path = parent.path + '.' + self.name self._log = logInit(cls=self,name=self._name,path=self._path) # Inherit groups from parent for grp in parent.groups: self.addToGroup(grp) def _finishInit(self): for key,value in self._nodes.items(): value._finishInit()
[docs] @expose def getYaml(self, readFirst=False, modes=['RW','RO','WO'], incGroups=None, excGroups=['Hidden'], recurse=True): """ Get current values as yaml data. modes is a list of variable modes to include. If readFirst=True a full read from hardware is performed. """ if readFirst: self.root._read() return pr.dataToYaml({self.name:self._getDict(modes=modes, incGroups=incGroups, excGroups=excGroups, recurse=recurse)})
def printYaml(self, readFirst=False, modes=['RW','RO','WO'], incGroups=None, excGroups=['Hidden'], recurse=False): print(self.getYaml(readFirst=readFirst, modes=modes, incGroups=incGroups, excGroups=excGroups, recurse=recurse)) def _getDict(self, modes=['RW', 'RO', 'WO'], incGroups=None, excGroups=None, properties=False, recurse=True): """ Get variable values in a dictionary starting from this level. Attributes that are Nodes are recursed. modes is a list of variable modes to include. Parameters ---------- modes : incGroups : excGroups : Returns ------- """ data = odict() for key,value in self.variablesByGroup(incGroups, excGroups).items(): nv = value._getDict(modes=modes,incGroups=incGroups,excGroups=excGroups, properties=properties) if nv is not None: data[key] = nv if recurse: for key, value in self.devicesByGroup(incGroups, excGroups).items(): nv = value._getDict(modes=modes,incGroups=incGroups,excGroups=excGroups, properties=properties) if nv is not None: data[key] = nv if len(data) == 0: return None else: return data def _setDict(self,d,writeEach,modes,incGroups,excGroups,keys): """ Parameters ---------- d : writeEach : modes : incGroups : excGroups : keys : Returns ------- """ # If keys is not none, someone tried to access this node with array # attributes incorrectly. This should only happen in the variable class. if keys is not None: self._log.error(f"Entry {self.name} with key {keys} not found") else: for key, value in d.items(): nodes,keys = self.nodeMatch(key) if len(nodes) == 0: self._log.error("Entry {} not found".format(key)) else: for n in nodes: if n.filterByGroup(incGroups,excGroups): n._setDict(value,writeEach,modes,incGroups,excGroups,keys) def _setTimeout(self,timeout): """ Parameters ---------- timeout : Returns ------- """ pass
[docs] def nodeMatch(self,name): """ Parameters ---------- name : Returns ------- type be a single value or a list accessor: value value[9] value[0:1] value[*] value[:] Variables will only match if their depth matches the passed lookup and wildcard: value[*] will match a variable named value[1] but not a variable named value[2][3] value[*][*] will match a variable named value[2][3]. The second return field will contain the array information if the base name matches an item in the non list array. """ # Node matches name in node list if name in self.nodes: return [self.nodes[name]],None # Otherwise we may need to slice an array else: # Generic test array method fields = re.split('\\]\\[|\\[|\\]',name) # Extract name and keys aname = fields[0] keys = fields[1:-1] # Name is in standard list if aname in self.nodes: return [self.nodes[aname]],keys # Name not in list if aname is None or aname not in self._anodes or len(keys) == 0: return [],None return _iterateDict(self._anodes[aname],keys),None
def _iterateDict(d, keys): """ Parameters ---------- d : keys : Returns ------- """ retList = [] # Wildcard, full list if keys[0] == '*' or keys[0] == ':': subList = list(d.values()) # Single item elif keys[0].isdigit(): try: subList = [d[int(keys[0])]] except Exception: subList = [] # Slice else: # Form slice-able list tmpList = [None] * (max(d.keys())+1) for k,v in d.items(): tmpList[k] = v try: subList = eval(f'tmpList[{keys[0]}]') except Exception: subList = [] for e in subList: # Add nodes at this level only if key list has been exhausted if len(keys) == 1 and isinstance(e,Node): retList.append(e) # Don't go deeper in tree than the keys provided to avoid over-matching nodes elif len(keys) > 1 and isinstance(e,dict): retList.extend(_iterateDict(e,keys[1:])) return retList def genBaseList(cls): """ """ ret = [str(cls)] for x in cls.__bases__: if x is not object: ret += genBaseList(x) return ret