#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Root Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import sys
import os
import glob
import rogue
import rogue.interfaces.memory as rim
import threading
import logging
import pyrogue as pr
import pyrogue.interfaces.stream
import functools as ft
import time
import queue
import json
import zipfile
import traceback
import datetime
from contextlib import contextmanager
SystemLogInit = '[]'
class UpdateTracker(object):
"""
"""
def __init__(self,q):
self._count = 0
self._list = {}
self._period = 0
self._last = time.time()
self._q = q
def increment(self, period):
"""
Parameters
----------
period : int
default value period = 0
Returns
-------
"""
if self._count == 0 or self._period < period:
self._period = period
self._count +=1
def decrement(self):
if self._count != 0:
self._count -= 1
self._check()
def _check(self):
if self._count == 0 or (self._period != 0 and (time.time() - self._last) > self._period):
if len(self._list) != 0:
#print(f"Update fired {time.time()}")
self._last = time.time()
self._q.put(self._list)
self._list = {}
def update(self,var):
"""
Parameters
----------
var :
Returns
-------
"""
self._list[var.path] = var
self._check()
class RootLogHandler(logging.Handler):
"""Class to listen to log entries and add them to syslog variable"""
def __init__(self,*, root):
logging.Handler.__init__(self)
self._root = root
def emit(self,record):
"""
Parameters
----------
record :
Returns
-------
"""
if not self._root.running:
return
with self._root.updateGroup():
try:
se = { 'created' : record.created,
'name' : record.name,
'message' : record.getMessage(),
'exception' : None,
'traceBack' : None,
'levelName' : record.levelname,
'levelNumber' : record.levelno }
if record.exc_info is not None:
se['exception'] = record.exc_info[0].__name__
se['traceBack'] = []
for tb in traceback.format_tb(record.exc_info[2]):
se['traceBack'].append(tb.rstrip())
self._root.SystemLogLast.set(json.dumps(se))
# System log is a running json encoded list
with self._root.SystemLog.lock:
lst = json.loads(self._root.SystemLog.value())
# Limit system log size
lst = lst[-1 * (self._root._maxLog-1):]
lst.append(se)
self._root.SystemLog.set(json.dumps(lst))
except Exception as e:
print("-----------Error Logging Exception -------------")
print(e)
print(traceback.print_exc(file=sys.stdout))
print("-----------Original Error-----------------------")
print(self.format(record))
print("------------------------------------------------")
[docs]
class Root(pr.Device):
"""
Class which serves as the root of a tree of nodes.
The root is the interface point for tree level access and updates.
The root is a stream master which generates frames containing tree
configuration and status values. This allows configuration and status
to be stored in data files.
Attributes
----------
rogue.interfaces.stream.Master :
pr.Device :
Returns
-------
"""
def __enter__(self):
"""Root enter."""
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Root exit."""
self.stop()
def __init__(self, *,
name=None,
description='',
expand=True,
timeout=1.0,
initRead=False,
initWrite=False,
pollEn=True,
maxLog=1000):
"""Init the node with passed attributes"""
rogue.interfaces.stream.Master.__init__(self)
# Store startup parameters
self._timeout = timeout
self._initRead = initRead
self._initWrite = initWrite
self._pollEn = pollEn
self._maxLog = maxLog
self._doHeartbeat = True # Backdoor flag
# Create log listener to add to SystemLog variable
formatter = logging.Formatter("%(msg)s")
handler = RootLogHandler(root=self)
handler.setFormatter(formatter)
self._logger = logging.getLogger('pyrogue')
self._logger.addHandler(handler)
# Running status
self._running = False
# Polling worker
self._pollQueue = self._pollQueue = pr.PollQueue(root=self)
# List of variable listeners
self._varListeners = []
self._varListenLock = threading.Lock()
# Variable update worker
self._updateQueue = queue.Queue()
self._updateThread = None
self._updateLock = threading.Lock()
self._updateTrack = {}
# Init
pr.Device.__init__(self, name=name, description=description, expand=expand)
# Variables
self.add(pr.LocalVariable(name='RogueVersion', value=rogue.Version.current(), mode='RO', hidden=False,
description='Rogue Version String'))
self.add(pr.LocalVariable(name='RogueDirectory', value=os.path.dirname(pr.__file__), mode='RO', hidden=False,
description='Rogue Library Directory'))
self.add(pr.LocalVariable(name='SystemLog', value=SystemLogInit, mode='RO', hidden=True, groups=['NoStream','NoSql','NoState'],
description='String containing newline separated system logic entries'))
self.add(pr.LocalVariable(name='SystemLogLast', value='', mode='RO', hidden=True, groups=['NoStream','NoState'],
description='String containing last system log entry'))
self.add(pr.LocalVariable(name='ForceWrite', value=False, mode='RW', hidden=True,
description='Configuration Flag To Always Write Non Stale Blocks For WriteAll, LoadConfig and setYaml'))
self.add(pr.LocalVariable(name='InitAfterConfig', value=False, mode='RW', hidden=True,
description='Configuration Flag To Execute Initialize after LoadConfig or setYaml'))
self.add(pr.LocalVariable(name='Time', value=time.time(), mode='RO', hidden=True,
description='Current Time In Seconds Since EPOCH UTC', groups=['NoSql']))
self.add(pr.LinkVariable(name='LocalTime', value='', mode='RO', groups=['NoStream','NoSql','NoState'],
linkedGet=lambda: time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime(self.Time.value())),
dependencies=[self.Time], description='Local Time'))
self.add(pr.LocalVariable(name='PollEn', value=False, mode='RW',groups=['NoStream','NoSql','NoState'],
localSet=lambda value: self._pollQueue.pause(not value),
localGet=lambda: not self._pollQueue.paused()))
# Commands
self.add(pr.LocalCommand(name='WriteAll', function=self._write, hidden=True,
description='Write all values to the hardware'))
self.add(pr.LocalCommand(name="ReadAll", function=self._read, hidden=True,
description='Read all values from the hardware'))
self.add(pr.LocalCommand(name='SaveState', value='',
function=lambda arg: self.saveYaml(name=arg,
readFirst=True,
modes=['RW','RO','WO'],
incGroups=None,
excGroups='NoState',
autoPrefix='state',
autoCompress=True),
hidden=True,
description='Save state to file. Data is saved in YAML format. Passed arg is full path to file to sore data to.'))
self.add(pr.LocalCommand(name='SaveConfig', value='',
function=lambda arg: self.saveYaml(name=arg,
readFirst=True,
modes=['RW','WO'],
incGroups=None,
excGroups='NoConfig',
autoPrefix='config',
autoCompress=False),
hidden=True,
description='Save configuration to file. Data is saved in YAML format. Passed arg is full path to file to sore data to.'))
self.add(pr.LocalCommand(name='LoadConfig', value='',
function=lambda arg: self.loadYaml(name=arg,
writeEach=False,
modes=['RW','WO'],
incGroups=None,
excGroups='NoConfig'),
hidden=True,
description='Read configuration from file. Data is read in YAML format. Passed arg is full path to file to read data from.'))
self.add(pr.LocalCommand(name='RemoteVariableDump', value='',
function=lambda arg: self.remoteVariableDump(name=arg,
modes=['RW','WO','RO'],
readFirst=True),
hidden=True,
description='Save a dump of the remote variable state'))
self.add(pr.LocalCommand(name='RemoteConfigDump', value='',
function=lambda arg: self.remoteVariableDump(name=arg,
modes=['RW','WO'],
readFirst=True),
hidden=True,
description='Save a dump of the remote variable state'))
self.add(pr.LocalCommand(name='Initialize', function=self.initialize, hidden=True,
description='Generate a soft reset to each device in the tree'))
self.add(pr.LocalCommand(name='HardReset', function=self.hardReset, hidden=True,
description='Generate a hard reset to each device in the tree'))
self.add(pr.LocalCommand(name='CountReset', function=self.countReset, hidden=True,
description='Generate a count reset to each device in the tree'))
self.add(pr.LocalCommand(name='ClearLog', function=self._clearLog, hidden=True,
description='Clear the message log contained in the SystemLog variable'))
self.add(pr.LocalCommand(name='SetYamlConfig', value='',
function=lambda arg: self.setYaml(yml=arg,
writeEach=False,
modes=['RW','WO'],
incGroups=None,
excGroups='NoConfig'),
hidden=True,
description='Set configuration from YAML string. Passed arg is configuration string in YAML format.'))
self.add(pr.LocalCommand(name='GetYamlConfig', value=True, retValue='',
function=lambda arg: self.getYaml(readFirst=arg,
modes=['RW','WO'],
incGroups=None,
excGroups='NoConfig',
recurse=True),
hidden=True,
description='Get configuration in YAML string. '
'Passed arg is a boolean indicating if a full system read should be generated. '
'Return data is configuration as a YAML string.'))
self.add(pr.LocalCommand(name='GetYamlState', value=True, retValue='',
function=lambda arg: self.getYaml(readFirst=arg,
modes=['RW','RO','WO'],
incGroups=None,
excGroups='NoState',
recurse=True),
hidden=True,
description='Get current state as YAML string. Pass read first arg.'))
#self.add(pr.LocalCommand(name='Restart', function=self._restart,
# description='Restart and reload the server application'))
#self.add(pr.LocalCommand(name='Exit', function=self._exit,
# description='Exit the server application'))
[docs]
def start(self):
"""Setup the tree. Start the polling thread."""
if self._running:
raise pr.NodeError("Root is already started! Can't restart!")
# Call special root level rootAttached
self._rootAttached()
# Finish Initialization
self._finishInit()
# Get full list of Blocks and Devices with size
tmpList = []
for d in self.deviceList:
for b in d._blocks:
if isinstance(b, rim.Block):
tmpList.append(b)
# Sort the list by address/size
tmpList.sort(key=lambda x: (x._reqSlaveId(), x.address, x.size))
# Look for overlaps
for i in range(1,len(tmpList)):
self._log.debug("Comparing {} with address={:#x} to {} with address={:#x} and size={}".format(
tmpList[i].path, tmpList[i].address,
tmpList[i-1].path,tmpList[i-1].address, tmpList[i-1].size))
# Detect overlaps
if (tmpList[i].size != 0) and (tmpList[i]._reqSlaveId() == tmpList[i-1]._reqSlaveId()) and \
(tmpList[i].address < (tmpList[i-1].address + tmpList[i-1].size)):
raise pr.NodeError("{} at address={:#x} overlaps {} at address={:#x} with size={}".format(
tmpList[i].path,tmpList[i].address,
tmpList[i-1].path,tmpList[i-1].address,tmpList[i-1].size))
# Set timeout if not default
if self._timeout != 1.0:
for key,value in self._nodes.items():
value._setTimeout(self._timeout)
# Detect large timeout
if self._timeout > 10.0:
self._log.warning(f"Large timeout value of {self._timeout} seconds detected. This may cause unexpected system behavior.")
# Start update thread
self._running = True
self._updateThread = threading.Thread(target=self._updateWorker)
self._updateThread.start()
# Start heartbeat
if self._doHeartbeat:
self._hbeatThread = threading.Thread(target=self._hbeatWorker)
self._hbeatThread.start()
# Start interfaces and protocols
pr.Device._start(self)
# Read current state
if self._initRead:
self._read()
# Commit default values
# Read did not override defaults because set values are cached
if self._initWrite:
self._write()
# Start poller if enabled
self._pollQueue._start()
self.PollEn.set(self._pollEn)
[docs]
def stop(self):
"""Stop the polling thread. Must be called for clean exit."""
self._running = False
self._updateQueue.put(None)
self._updateThread.join()
if self._pollQueue:
self._pollQueue._stop()
pr.Device._stop(self)
@pr.expose
@property
def running(self):
"""
"""
return self._running
[docs]
def addVarListener(self,func,*,done=None,incGroups=None,excGroups=None):
"""
Add a variable update listener function.
The variable and value structure will be passed as args: func(path,varValue)
Parameters
----------
func :
done :
incGroups :
excGroups :
Returns
-------
"""
with self._varListenLock:
self._varListeners.append((func,done,incGroups,excGroups))
def _addVarListenerCpp(self, func, done):
self.addVarListener(lambda path, varValue: func(path, varValue.valueDisp), done=done)
[docs]
@contextmanager
def updateGroup(self, period=0):
"""
Parameters
----------
period :
(Default value = 0)
Returns
-------
"""
tid = threading.get_ident()
# At with call
try:
self._updateTrack[tid].increment(period)
except Exception:
with self._updateLock:
self._updateTrack[tid] = UpdateTracker(self._updateQueue)
self._updateTrack[tid].increment(period)
try:
yield
finally:
# After with is done
self._updateTrack[tid].decrement()
[docs]
@contextmanager
def pollBlock(self):
"""
"""
# At with call
self._pollQueue._blockIncrement()
# Return to block within with call
try:
yield
finally:
# After with is done
self._pollQueue._blockDecrement()
[docs]
@pr.expose
def waitOnUpdate(self):
"""Wait until all update queue items have been processed."""
self._updateQueue.join()
[docs]
def hardReset(self):
"""Generate a hard reset on all devices"""
super().hardReset()
self._clearLog()
def __reduce__(self):
return pr.Node.__reduce__(self)
[docs]
@ft.lru_cache(maxsize=None)
def getNode(self,path):
"""
Parameters
----------
path :
Returns
-------
"""
obj = self
if '.' in path:
lst = path.split('.')
if lst[0] != self.name and lst[0] != 'root':
return None
for a in lst[1:]:
if not hasattr(obj,'node'):
return None
obj = obj.node(a)
elif path != self.name and path != 'root':
return None
return obj
[docs]
@pr.expose
def saveAddressMap(self,fname):
"""
Parameters
----------
fname :
headerEn :
(Default value = False)
Returns
-------
"""
# First form header
header = "Path\t"
header += "TypeStr\t"
header += "Address\t"
header += "Offset\t"
header += "Mode\t"
header += "BitOffset\t"
header += "BitSize\t"
header += "Minimum\t"
header += "Maximum\t"
header += "Enum\t"
header += "OverlapEn\t"
header += "Verify\t"
header += "ModelId\t"
header += "ByteReverse\t"
header += "BitReverse\t"
header += "BinPoint\t"
header += "BulkEn\t"
header += "UpdateNotify\t"
header += "MemBaseName\t"
header += "BlockName\t"
header += "BlockSize\t"
header += "NumValues\t"
header += "ValueBits\t"
header += "ValueStride\t"
header += "RetryCount\t"
header += "Description"
lines = []
for v in self.variableList:
if v.isinstance(pr.RemoteVariable):
data = "{}\t".format(v.path)
data += "{}\t".format(v.typeStr)
data += "{:#x}\t".format(v.address)
data += "{:#x}\t".format(v.offset)
data += "{}\t".format(v.mode)
data += "{}\t".format(v.bitOffset)
data += "{}\t".format(v.bitSize)
data += "{}\t".format(v.minimum)
data += "{}\t".format(v.maximum)
data += "{}\t".format(v.enum)
data += "{}\t".format(v.overlapEn)
data += "{}\t".format(v.verifyEn)
data += "{}\t".format(v._base.modelId)
data += "{}\t".format(v._base.isBigEndian)
data += "{}\t".format(v._base.bitReverse)
data += "{}\t".format(v._base.binPoint)
data += "{}\t".format(v.bulkEn)
data += "{}\t".format(v.updateNotify)
data += "{}\t".format(v._block._reqSlaveName())
data += "{}\t".format(v._block.path)
data += "{:#x}\t".format(v._block.size)
data += "{}\t".format(v._numValues())
data += "{}\t".format(v._valueBits())
data += "{}\t".format(v._valueStride())
data += "{}\t".format(v._retryCount())
# Escape " characters
description = v.description.replace('"',r'\"')
# Escape \n characters and strip each line in the description field
description = description.split('\n')
description = '\\\n'.join([x.strip() for x in description])
data += "{}".format(description)
lines.append(data)
with open(fname,'w') as f:
f.write(header + '\n')
for line in lines:
f.write(line + '\n')
[docs]
@pr.expose
def saveVariableList(self,fname,polledOnly=False,incGroups=None):
"""
Parameters
----------
fname :
polledOnly : bool
(Default value = False)
incGroups :
(Default value = None)
Returns
-------
"""
with open(fname,'w') as f:
f.write("Path\t")
f.write("TypeStr\t")
f.write("Mode\t")
f.write("Enum\t")
f.write("PollInterval\t")
f.write("Groups\t")
f.write("Description\n")
for v in self.variableList:
if ((not polledOnly) or (v.pollInterval > 0)) and v.filterByGroup(incGroups=incGroups,excGroups=None):
f.write("{}\t".format(v.path))
f.write("{}\t".format(v.typeStr))
f.write("{}\t".format(v.mode))
f.write("{}\t".format(v.enum))
f.write("{}\t".format(v.pollInterval))
f.write("{}\t".format(v.groups))
f.write("{}\n".format(v.description))
def _hbeatWorker(self):
"""
"""
while self._running:
time.sleep(1)
with self.updateGroup():
self.Time.set(time.time())
def _rootAttached(self):
"""
"""
self._parent = self
self._root = self
self._path = self.name
for key,value in self._nodes.items():
value._rootAttached(self,self)
self._buildBlocks()
# Some variable initialization can run until the blocks are built
for v in self.variables.values():
v._finishInit()
def _write(self):
"""Write all blocks"""
self._log.info("Start root write")
with self.pollBlock(), self.updateGroup():
self.writeBlocks(force=self.ForceWrite.value(), recurse=True)
self._log.info("Verify root read")
self.verifyBlocks(recurse=True)
self._log.info("Check root read")
self.checkBlocks(recurse=True)
self._log.info("Done root write")
return True
def _read(self):
"""Read all blocks"""
self._log.info("Start root read")
with self.pollBlock(), self.updateGroup():
self.readBlocks(recurse=True)
self._log.info("Check root read")
self.checkBlocks(recurse=True)
self._log.info("Done root read")
return True
[docs]
@pr.expose
def saveYaml(self,name,readFirst,modes,incGroups,excGroups,autoPrefix,autoCompress):
"""
Save YAML configuration/status to a file. Called from command
Parameters
----------
name :
readFirst :
modes :
incGroups :
excGroups :
autoPrefix :
autoCompress :
Returns
-------
"""
# Auto generate name if no arg
if name is None or name == '':
name = datetime.datetime.now().strftime(autoPrefix + "_%Y%m%d_%H%M%S.yml")
if autoCompress:
name += '.zip'
yml = self.getYaml(readFirst=readFirst,modes=modes,incGroups=incGroups,excGroups=excGroups, recurse=True)
if name.split('.')[-1] == 'zip':
with zipfile.ZipFile(name, 'w', compression=zipfile.ZIP_LZMA) as zf:
with zf.open(os.path.basename(name[:-4]),'w') as f:
f.write(yml.encode('utf-8'))
else:
with open(name,'w') as f:
f.write(yml)
return True
[docs]
def loadYaml(self,name,writeEach,modes,incGroups,excGroups):
"""
Load YAML configuration from a file. Called from command
Parameters
----------
name :
writeEach :
modes :
incGroups :
excGroups :
Returns
-------
"""
# Pass arg is a python list
if isinstance(name,list):
rawlst = name
# Passed arg is a comma separated list of files
elif ',' in name:
rawlst = name.split(',')
# Not a list
else:
rawlst = [name]
# Init final list
lst = []
# Iterate through raw list and look for directories
for rl in rawlst:
# Name ends with .yml or .yaml
if rl[-4:] == '.yml' or rl[-5:] == '.yaml':
lst.append(rl)
# Entry is a zip file directory
elif '.zip' in rl:
base = rl.split('.zip')[0] + '.zip'
sub = rl.split('.zip')[1][1:]
# Open zipfile
with zipfile.ZipFile(base, 'r', compression=zipfile.ZIP_LZMA) as myzip:
# Check if passed name is a directory, otherwise generate an error
if not any(x.startswith("%s/" % sub.rstrip("/")) for x in myzip.namelist()):
raise Exception("loadYaml: Invalid load file: {}, must be a directory or end in .yml or .yaml".format(rl))
else:
# Iterate through directory contents
for zfn in myzip.namelist():
# Filter by base directory
if zfn.find(sub) == 0:
spt = zfn.split('%s/' % sub.rstrip('/'))[1]
# Entry ends in .yml or *.yml and is in current directory
if '/' not in spt and (spt[-4:] == '.yml' or spt[-5:] == '.yaml'):
lst.append(base + '/' + zfn)
# Entry is a directory
elif os.path.isdir(rl):
dlst = glob.glob('{}/*.yml'.format(rl))
dlst.extend(glob.glob('{}/*.yaml'.format(rl)))
lst.extend(sorted(dlst))
# Not a zipfile, not a directory and does not end in .yml
else:
raise Exception("loadYaml: Invalid load file: {}, must be a directory or end in .yml or .yaml".format(rl))
# Read each file
with self.pollBlock(), self.updateGroup():
for fn in lst:
d = pr.yamlToData(fName=fn)
self._setDictRoot(d=d,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups)
if not writeEach:
self._write()
if self.InitAfterConfig.value():
self.initialize()
return True
def treeDict(self, modes=['RW', 'RO', 'WO'], incGroups=None, excGroups=None):
d = self._getDict(modes, incGroups, excGroups, properties=True)
return {self.name: d}
def treeYaml(self, modes=['RW', 'RO', 'WO'], incGroups=None, excGroups=None, properties=None):
return pr.dataToYaml(self.treeDict(modes, incGroups, excGroups, properties))
[docs]
def setYaml(self,yml,writeEach,modes,incGroups,excGroups):
"""
Set variable values from a yaml file
modes is a list of variable modes to act on.
writeEach is set to true if accessing a single variable at a time.
Writes will be performed as each variable is updated. If set to
false a bulk write will be performed after all of the variable updates
are completed. Bulk writes provide better performance when updating a large
quantity of variables.
Parameters
----------
yml :
writeEach :
modes :
incGroups :
excGroups :
Returns
-------
"""
d = pr.yamlToData(yml)
with self.pollBlock(), self.updateGroup():
self._setDictRoot(d=d,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups)
if not writeEach:
self._write()
if self.InitAfterConfig.value():
self.initialize()
[docs]
def remoteVariableDump(self,name,modes,readFirst):
"""
Dump remote variable values to a file.
Parameters
----------
name :
modes :
readFirst :
Returns
-------
"""
# Auto generate name if no arg
if name is None or name == '':
name = datetime.datetime.now().strftime("regdump_%Y%m%d_%H%M%S.txt")
if readFirst:
self._read()
with open(name,'w') as f:
for v in self.variableList:
if hasattr(v,'_getDumpValue') and v.mode in modes:
f.write(v._getDumpValue(False))
return True
def _setDictRoot(self,d,writeEach,modes,incGroups,excGroups):
"""
Parameters
----------
d :
writeEach :
modes :
incGroups :
excGroups :
Returns
-------
"""
for key, value in d.items():
# Attempt to get node
node = self.getNode(key)
# Call setDict on node
if node is not None:
node._setDict(d=value,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups,keys=None)
else:
self._log.error("Entry {} not found".format(key))
def _clearLog(self):
"""Clear the system log"""
self.SystemLog.set(SystemLogInit)
self.SystemLogLast.set('')
def _queueUpdates(self,var):
"""
Parameters
----------
var :
Returns
-------
"""
tid = threading.get_ident()
try:
self._updateTrack[tid].update(var)
except Exception:
with self._updateLock:
self._updateTrack[tid] = UpdateTracker(self._updateQueue)
self._updateTrack[tid].update(var)
# Recursively add listeners to update list
def _recurseAddListeners(self, nvars, var):
for vl in var._listeners:
nvars[vl.path] = vl
self._recurseAddListeners(nvars, vl)
# Worker thread
def _updateWorker(self):
""" """
self._log.info("Starting update thread")
while True:
uvars = self._updateQueue.get()
# Done
if uvars is None:
self._log.info("Stopping update thread")
self._updateQueue.task_done()
return
# Process list
elif len(uvars) > 0:
self._log.debug(F'Process update group. Length={len(uvars)}. Entry={list(uvars.keys())[0]}')
# Copy list and add listeners
nvars = uvars.copy()
for p,v in uvars.items():
self._recurseAddListeners(nvars, v)
# Process the new list
for p,v in nvars.items():
# Process updates
val = v._doUpdate()
# Call root listener functions,
with self._varListenLock:
for func,doneFunc,incGroups,excGroups in self._varListeners:
if v.filterByGroup(incGroups, excGroups):
try:
func(p,val)
except Exception as e:
if v == self.SystemLog or v == self.SystemLogLast:
print("------- Error Executing Syslog Listeners -------")
print("Error: {}".format(e))
print("------------------------------------------------")
else:
pr.logException(self._log,e)
# Finalize root listeners
with self._varListenLock:
for func,doneFunc,incGroups,excGroups in self._varListeners:
if doneFunc is not None:
try:
doneFunc()
except Exception as e:
pr.logException(self._log,e)
self._log.debug(F"Done update group. Length={len(uvars)}. Entry={list(uvars.keys())[0]}")
# Set done
self._updateQueue.task_done()