#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Root Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations
import sys
import os
import glob
import rogue
import rogue.interfaces.memory as rim
import threading
import logging
import pyrogue as pr
import functools as ft
import time
import queue
import json
import zipfile
import traceback
import datetime
from contextlib import contextmanager
from typing import Any, Callable, Iterator
SystemLogInit = '[]'
class UpdateTracker(object):
"""Track grouped variable updates for root listeners."""
def __init__(self, q: Any) -> None:
"""Initialize update tracking state for one worker thread."""
self._count = 0
self._list = {}
self._period = 0
self._last = time.time()
self._q = q
def increment(self, period: float) -> None:
"""
Increment active update-group depth.
Parameters
----------
period : float, optional (default = 0)
Maximum group flush period in seconds.
"""
if self._count == 0 or self._period < period:
self._period = period
self._count +=1
def decrement(self) -> None:
"""Decrement active update-group depth."""
if self._count != 0:
self._count -= 1
self._check()
def _check(self) -> None:
"""Flush queued updates when depth or period criteria are met."""
if self._count == 0 or (self._period != 0 and (time.time() - self._last) > self._period):
if len(self._list) != 0:
#print(f"Update fired {time.time()}")
self._last = time.time()
self._q.put(self._list)
self._list = {}
def update(self, var: pr.BaseVariable) -> None:
"""
Queue a variable update.
Parameters
----------
var : pr.BaseVariable
Variable object that was updated.
"""
self._list[var.path] = var
self._check()
class RootLogHandler(logging.Handler):
"""Listen to log entries and mirror them into root log variables."""
def __init__(self,*, root: Any) -> None:
"""Initialize a log handler bound to a Root instance."""
logging.Handler.__init__(self)
self._root = root
def emit(self, record: logging.LogRecord) -> None:
"""
Parameters
----------
record : logging.LogRecord
Logging record to store in ``SystemLog``.
"""
if not self._root.running:
return
with self._root.updateGroup():
try:
se = { 'created' : record.created,
'name' : record.name,
'message' : record.getMessage(),
'exception' : None,
'traceBack' : None,
'levelName' : record.levelname,
'levelNumber' : record.levelno }
if record.exc_info is not None:
se['exception'] = record.exc_info[0].__name__
se['traceBack'] = []
for tb in traceback.format_tb(record.exc_info[2]):
se['traceBack'].append(tb.rstrip())
self._root.SystemLogLast.set(json.dumps(se))
# System log is a running json encoded list
with self._root.SystemLog.lock:
lst = json.loads(self._root.SystemLog.value())
# Limit system log size
lst = lst[-1 * (self._root._maxLog-1):]
lst.append(se)
self._root.SystemLog.set(json.dumps(lst))
except Exception as e:
print("-----------Error Logging Exception -------------")
print(e)
print(traceback.print_exc(file=sys.stdout))
print("-----------Original Error-----------------------")
print(self.format(record))
print("------------------------------------------------")
[docs]
class Root(pr.Device):
"""
Class which serves as the root of a tree of Nodes.
The root is the interface point for tree level access and updates.
Parameters
----------
name : str, optional
Root name. Defaults to class name.
description : str, optional (default = "")
Human-readable description.
expand : bool, optional (default = True)
Default GUI expand state.
timeout : float, optional (default = 1.0)
Transaction timeout in seconds.
initRead : bool, optional (default = False)
Perform an initial read on start.
initWrite : bool, optional (default = False)
Perform an initial write on start.
pollEn : bool, optional (default = True)
Enable polling on start.
maxLog : int, optional (default = 1000)
Maximum log entries to retain.
"""
def __enter__(self) -> Root:
"""Root enter."""
self.start()
return self
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
"""Root exit."""
self.stop()
def __init__(self, *,
name: str | None = None,
description: str = '',
expand: bool = True,
timeout: float = 1.0,
initRead: bool = False,
initWrite: bool = False,
pollEn: bool = True,
maxLog: int = 1000) -> None:
"""Initialize the root node, workers, and built-in variables/commands."""
rogue.interfaces.stream.Master.__init__(self)
# Store startup parameters
self._timeout = timeout
self._initRead = initRead
self._initWrite = initWrite
self._pollEn = pollEn
self._maxLog = maxLog
self._doHeartbeat = True # Backdoor flag
# Create log listener to add to SystemLog variable
formatter = logging.Formatter("%(msg)s")
handler = RootLogHandler(root=self)
handler.setFormatter(formatter)
self._logger = logging.getLogger('pyrogue')
self._logger.addHandler(handler)
# Running status
self._running = False
# Polling worker
self._pollQueue = self._pollQueue = pr.PollQueue(root=self)
# List of variable listeners
self._varListeners = []
self._varListenLock = threading.Lock()
# Variable update worker
self._updateQueue = queue.Queue()
self._updateThread = None
self._updateLock = threading.Lock()
self._updateTrack = {}
# Init
pr.Device.__init__(self, name=name, description=description, expand=expand)
# Variables
self.add(pr.LocalVariable(name='RogueVersion', value=rogue.Version.current(), mode='RO', hidden=False,
description='Rogue Version String'))
self.add(pr.LocalVariable(name='RogueDirectory', value=os.path.dirname(pr.__file__), mode='RO', hidden=False,
description='Rogue Library Directory'))
self.add(pr.LocalVariable(name='SystemLog', value=SystemLogInit, mode='RO', hidden=True, groups=['NoStream','NoSql','NoState'],
description='String containing newline separated system logic entries'))
self.add(pr.LocalVariable(name='SystemLogLast', value='', mode='RO', hidden=True, groups=['NoStream','NoState'],
description='String containing last system log entry'))
self.add(pr.LocalVariable(name='ForceWrite', value=False, mode='RW', hidden=True,
description='Configuration Flag To Always Write Non Stale Blocks For WriteAll, LoadConfig and setYaml'))
self.add(pr.LocalVariable(name='InitAfterConfig', value=False, mode='RW', hidden=True,
description='Configuration Flag To Execute Initialize after LoadConfig or setYaml'))
self.add(pr.LocalVariable(name='Time', value=time.time(), mode='RO', hidden=True,
description='Current Time In Seconds Since EPOCH UTC', groups=['NoSql']))
self.add(pr.LinkVariable(name='LocalTime', value='', mode='RO', groups=['NoStream','NoSql','NoState'],
linkedGet=lambda: time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime(self.Time.value())),
dependencies=[self.Time], description='Local Time'))
self.add(pr.LocalVariable(name='PollEn', value=False, mode='RW',groups=['NoStream','NoSql','NoState'],
localSet=lambda value: self._pollQueue.pause(not value),
localGet=lambda: not self._pollQueue.paused()))
# Commands
self.add(pr.LocalCommand(name='WriteAll', function=self._write, hidden=True,
description='Write all values to the hardware'))
self.add(pr.LocalCommand(name="ReadAll", function=self._read, hidden=True,
description='Read all values from the hardware'))
self.add(pr.LocalCommand(name='SaveState', value='',
function=lambda arg: self.saveYaml(name=arg,
readFirst=True,
modes=['RW','RO','WO'],
incGroups=None,
excGroups='NoState',
autoPrefix='state',
autoCompress=True),
hidden=True,
description='Save state to file. Data is saved in YAML format. Passed arg is full path to file to sore data to.'))
self.add(pr.LocalCommand(name='SaveConfig', value='',
function=lambda arg: self.saveYaml(name=arg,
readFirst=True,
modes=['RW','WO'],
incGroups=None,
excGroups='NoConfig',
autoPrefix='config',
autoCompress=False),
hidden=True,
description='Save configuration to file. Data is saved in YAML format. Passed arg is full path to file to sore data to.'))
self.add(pr.LocalCommand(name='LoadConfig', value='',
function=lambda arg: self.loadYaml(name=arg,
writeEach=False,
modes=['RW','WO'],
incGroups=None,
excGroups='NoConfig'),
hidden=True,
description='Read configuration from file. Data is read in YAML format. Passed arg is full path to file to read data from.'))
self.add(pr.LocalCommand(name='RemoteVariableDump', value='',
function=lambda arg: self.remoteVariableDump(name=arg,
modes=['RW','WO','RO'],
readFirst=True),
hidden=True,
description='Save a dump of the remote variable state'))
self.add(pr.LocalCommand(name='RemoteConfigDump', value='',
function=lambda arg: self.remoteVariableDump(name=arg,
modes=['RW','WO'],
readFirst=True),
hidden=True,
description='Save a dump of the remote variable state'))
self.add(pr.LocalCommand(name='Initialize', function=self.initialize, hidden=True,
description='Generate a soft reset to each device in the tree'))
self.add(pr.LocalCommand(name='HardReset', function=self.hardReset, hidden=True,
description='Generate a hard reset to each device in the tree'))
self.add(pr.LocalCommand(name='CountReset', function=self.countReset, hidden=True,
description='Generate a count reset to each device in the tree'))
self.add(pr.LocalCommand(name='ClearLog', function=self._clearLog, hidden=True,
description='Clear the message log contained in the SystemLog variable'))
self.add(pr.LocalCommand(name='SetYamlConfig', value='',
function=lambda arg: self.setYaml(yml=arg,
writeEach=False,
modes=['RW','WO'],
incGroups=None,
excGroups='NoConfig'),
hidden=True,
description='Set configuration from YAML string. Passed arg is configuration string in YAML format.'))
self.add(pr.LocalCommand(name='GetYamlConfig', value=True, retValue='',
function=lambda arg: self.getYaml(readFirst=arg,
modes=['RW','WO'],
incGroups=None,
excGroups='NoConfig',
recurse=True),
hidden=True,
description='Get configuration in YAML string. '
'Passed arg is a boolean indicating if a full system read should be generated. '
'Return data is configuration as a YAML string.'))
self.add(pr.LocalCommand(name='GetYamlState', value=True, retValue='',
function=lambda arg: self.getYaml(readFirst=arg,
modes=['RW','RO','WO'],
incGroups=None,
excGroups='NoState',
recurse=True),
hidden=True,
description='Get current state as YAML string. Pass read first arg.'))
#self.add(pr.LocalCommand(name='Restart', function=self._restart,
# description='Restart and reload the server application'))
#self.add(pr.LocalCommand(name='Exit', function=self._exit,
# description='Exit the server application'))
[docs]
def start(self) -> None:
"""Setup the tree and start background threads for pollQueue and updateQueue.
Call Device._start() recursively on child Nodes.
"""
if self._running:
raise pr.NodeError("Root is already started! Can't restart!")
# Call special root level rootAttached
self._rootAttached()
# Finish Initialization
self._finishInit()
# Get full list of Blocks and Devices with size
tmpList = []
for d in self.deviceList:
for b in d._blocks:
if isinstance(b, rim.Block):
tmpList.append(b)
# Sort the list by address/size
tmpList.sort(key=lambda x: (x._reqSlaveId(), x.address, x.size))
# Look for overlaps
for i in range(1,len(tmpList)):
self._log.debug("Comparing {} with address={:#x} to {} with address={:#x} and size={}".format(
tmpList[i].path, tmpList[i].address,
tmpList[i-1].path,tmpList[i-1].address, tmpList[i-1].size))
# Detect overlaps
if (tmpList[i].size != 0) and (tmpList[i]._reqSlaveId() == tmpList[i-1]._reqSlaveId()) and \
(tmpList[i].address < (tmpList[i-1].address + tmpList[i-1].size)):
raise pr.NodeError("{} at address={:#x} overlaps {} at address={:#x} with size={}".format(
tmpList[i].path,tmpList[i].address,
tmpList[i-1].path,tmpList[i-1].address,tmpList[i-1].size))
# Set timeout if not default
if self._timeout != 1.0:
for key,value in self._nodes.items():
value._setTimeout(self._timeout)
# Detect large timeout
if self._timeout > 10.0:
self._log.warning(f"Large timeout value of {self._timeout} seconds detected. This may cause unexpected system behavior.")
# Start update thread
self._running = True
self._updateThread = threading.Thread(target=self._updateWorker)
self._updateThread.start()
# Start heartbeat
if self._doHeartbeat:
self._hbeatThread = threading.Thread(target=self._hbeatWorker)
self._hbeatThread.start()
# Start interfaces and protocols
pr.Device._start(self)
# Read current state
if self._initRead:
self._read()
# Commit default values
# Read did not override defaults because set values are cached
if self._initWrite:
self._write()
# Start poller if enabled
self._pollQueue._start()
self.PollEn.set(self._pollEn)
[docs]
def stop(self) -> None:
"""Stop background threads. Must be called for clean exit.
Call Device._stop() to recursively stop all Devices in the tree.
"""
self._running = False
self._updateQueue.put(None)
self._updateThread.join()
if self._pollQueue:
self._pollQueue._stop()
pr.Device._stop(self)
@pr.expose
@property
def running(self) -> bool:
"""Return True if the root is running."""
return self._running
[docs]
def addVarListener(
self,
func: Callable[[str, pr.VariableValue], None],
*,
done: Callable[[], None] | None = None,
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
) -> None:
"""
Add a variable update listener function.
The variable path and value are passed as ``func(path, varValue)``.
Parameters
----------
func : callable
Listener callback of the form ``func(path, varValue)`` where
``path`` is a full variable path and ``varValue`` is a
:py:class:`pyrogue.VariableValue`.
done : callable, optional
Optional callback of the form ``done()`` executed after each update
batch.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
"""
with self._varListenLock:
self._varListeners.append((func, done, incGroups, excGroups))
def _addVarListenerCpp(
self,
func: Callable[[str, str], None],
done: Callable[[], None],
) -> None:
"""Add a listener callback using display-string values.
Parameters
----------
func : callable
Callback of the form ``func(path, valueDisp)``.
done : callable
Callback of the form ``done()`` called after each batch.
"""
self.addVarListener(lambda path, varValue: func(path, varValue.valueDisp), done=done)
[docs]
@contextmanager
def updateGroup(self, period: float = 0) -> Iterator[None]:
"""Get a context manager within which many Variable updates will be broadcast as one.
Functions that operate on and set() more than one Variable should do so within a Root.updateGroup() context.
This will reduce the number of update broadcasts that the Root has to make.
The optional 'period' parameter will allow broadcasts of the state every 'period' seconds, useful for long running functions.
Parameters
----------
period : float, optional (default = 0)
Maximum update period in seconds.
"""
tid = threading.get_ident()
# At with call
try:
self._updateTrack[tid].increment(period)
except Exception:
with self._updateLock:
self._updateTrack[tid] = UpdateTracker(self._updateQueue)
self._updateTrack[tid].increment(period)
try:
yield
finally:
# After with is done
self._updateTrack[tid].decrement()
[docs]
@contextmanager
def pollBlock(self) -> Iterator[None]:
"""
Context manager that blocks poll activity while active.
"""
# At with call
self._pollQueue._blockIncrement()
# Return to block within with call
try:
yield
finally:
# After with is done
self._pollQueue._blockDecrement()
[docs]
@pr.expose
def waitOnUpdate(self) -> None:
"""Wait until all update queue items have been processed."""
self._updateQueue.join()
[docs]
def hardReset(self) -> None:
"""Generate a hard reset on all devices.
Called recursively on the entire tree.
"""
super().hardReset()
self._clearLog()
def __reduce__(self) -> tuple[Any, tuple[dict]]:
"""Return reduced state used by virtual-client serialization."""
return pr.Node.__reduce__(self)
[docs]
@ft.lru_cache(maxsize=None)
def getNode(self, path: str) -> pr.Node:
"""Get a Node of the tree by its path string
Parameters
----------
path : str
Node path. Accepts absolute dotted path, root name, or ``root``.
Returns
-------
pr.Node or None
Located node object, or ``None`` if no node matches.
"""
obj = self
if '.' in path:
lst = path.split('.')
if lst[0] != self.name and lst[0] != 'root':
return None
for a in lst[1:]:
if not hasattr(obj,'node'):
return None
obj = obj.node(a)
elif path != self.name and path != 'root':
return None
return obj
[docs]
@pr.expose
def saveAddressMap(self, fname: str) -> None:
"""Dump the tree address map to a file
Parameters
----------
fname : str
Destination file path.
"""
# First form header
header = "Path\t"
header += "TypeStr\t"
header += "Address\t"
header += "Offset\t"
header += "Mode\t"
header += "BitOffset\t"
header += "BitSize\t"
header += "Minimum\t"
header += "Maximum\t"
header += "Enum\t"
header += "OverlapEn\t"
header += "Verify\t"
header += "ModelId\t"
header += "ByteReverse\t"
header += "BitReverse\t"
header += "BinPoint\t"
header += "BulkEn\t"
header += "UpdateNotify\t"
header += "MemBaseName\t"
header += "BlockName\t"
header += "BlockSize\t"
header += "NumValues\t"
header += "ValueBits\t"
header += "ValueStride\t"
header += "RetryCount\t"
header += "Description"
lines = []
for v in self.variableList:
if v.isinstance(pr.RemoteVariable):
data = "{}\t".format(v.path)
data += "{}\t".format(v.typeStr)
data += "{:#x}\t".format(v.address)
data += "{:#x}\t".format(v.offset)
data += "{}\t".format(v.mode)
data += "{}\t".format(v.bitOffset)
data += "{}\t".format(v.bitSize)
data += "{}\t".format(v.minimum)
data += "{}\t".format(v.maximum)
data += "{}\t".format(v.enum)
data += "{}\t".format(v.overlapEn)
data += "{}\t".format(v.verifyEn)
data += "{}\t".format(v._base.modelId)
data += "{}\t".format(v._base.isBigEndian)
data += "{}\t".format(v._base.bitReverse)
data += "{}\t".format(v._base.binPoint)
data += "{}\t".format(v.bulkEn)
data += "{}\t".format(v.updateNotify)
data += "{}\t".format(v._block._reqSlaveName())
data += "{}\t".format(v._block.path)
data += "{:#x}\t".format(v._block.size)
data += "{}\t".format(v._numValues())
data += "{}\t".format(v._valueBits())
data += "{}\t".format(v._valueStride())
data += "{}\t".format(v._retryCount())
# Escape " characters
description = v.description.replace('"',r'\"')
# Escape \n characters and strip each line in the description field
description = description.split('\n')
description = '\\\n'.join([x.strip() for x in description])
data += "{}".format(description)
lines.append(data)
with open(fname,'w') as f:
f.write(header + '\n')
for line in lines:
f.write(line + '\n')
[docs]
@pr.expose
def saveVariableList(
self,
fname: str,
polledOnly: bool = False,
incGroups: str | list[str] | None = None,
) -> None:
"""Save a string representing the entire tree
Parameters
----------
fname : str
Destination file path.
polledOnly : bool, optional (default = False)
If True, include only polled variables.
incGroups : str or list[str], optional
Group name or group names to include.
"""
with open(fname,'w') as f:
f.write("Path\t")
f.write("TypeStr\t")
f.write("Mode\t")
f.write("Enum\t")
f.write("PollInterval\t")
f.write("Groups\t")
f.write("Description\n")
for v in self.variableList:
if ((not polledOnly) or (v.pollInterval > 0)) and v.filterByGroup(incGroups=incGroups,excGroups=None):
f.write("{}\t".format(v.path))
f.write("{}\t".format(v.typeStr))
f.write("{}\t".format(v.mode))
f.write("{}\t".format(v.enum))
f.write("{}\t".format(v.pollInterval))
f.write("{}\t".format(v.groups))
f.write("{}\n".format(v.description))
def _hbeatWorker(self) -> None:
"""Heartbeat worker which updates the ``Time`` variable."""
while self._running:
time.sleep(1)
with self.updateGroup():
self.Time.set(time.time())
def _rootAttached(self) -> None:
"""Attach root references to the full node tree."""
self._parent = self
self._root = self
self._path = self.name
for key,value in self._nodes.items():
value._rootAttached(self,self)
self._buildBlocks()
# Some variable initialization can run until the blocks are built
for v in self.variables.values():
v._finishInit()
def _write(self) -> bool:
"""Write and verify all blocks."""
self._log.info("Start root write")
with self.pollBlock(), self.updateGroup():
self.writeBlocks(force=self.ForceWrite.value(), recurse=True)
self._log.info("Verify root read")
self.verifyBlocks(recurse=True)
self._log.info("Check root read")
self.checkBlocks(recurse=True)
self._log.info("Done root write")
return True
def _read(self) -> bool:
"""Read and check all blocks."""
self._log.info("Start root read")
with self.pollBlock(), self.updateGroup():
self.readBlocks(recurse=True)
self._log.info("Check root read")
self.checkBlocks(recurse=True)
self._log.info("Done root read")
return True
[docs]
@pr.expose
def saveYaml(
self,
name: str | None,
readFirst: bool,
modes: list[str] = ['RW', 'RO', 'WO'],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
autoPrefix: str = '',
autoCompress: bool = False,
) -> bool:
"""
Save YAML configuration or status to a file.
Parameters
----------
name : str, optional
Destination file path. If empty, a timestamped name is generated.
readFirst : bool
Read values from hardware before exporting.
modes : list['RW' | 'WO' | 'RO'], optional (default = ['RW', 'RO', 'WO'])
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
autoPrefix : str, optional
Prefix for auto-generated filenames.
autoCompress : bool, optional
Generate a ``.zip`` file when auto-generating names. Default False
Returns
-------
bool
Returns ``True`` when export completes.
"""
# Auto generate name if no arg
if name is None or name == '':
name = datetime.datetime.now().strftime(autoPrefix + "_%Y%m%d_%H%M%S.yml")
if autoCompress:
name += '.zip'
yml = self.getYaml(readFirst=readFirst,modes=modes,incGroups=incGroups,excGroups=excGroups, recurse=True)
if name.split('.')[-1] == 'zip':
with zipfile.ZipFile(name, 'w', compression=zipfile.ZIP_LZMA) as zf:
with zf.open(os.path.basename(name[:-4]),'w') as f:
f.write(yml.encode('utf-8'))
else:
with open(name,'w') as f:
f.write(yml)
return True
[docs]
def loadYaml(
self,
name: str | list[str],
writeEach: bool,
modes: list[str],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
) -> bool:
"""
Load YAML configuration from files or directories.
Parameters
----------
name : str or list[str]
Input file, directory, zip-path, or list of those entries.
writeEach : bool
Write each variable as it is applied.
modes : list['RW' | 'WO' | 'RO']
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
Returns
-------
bool
Returns ``True`` when load completes.
"""
# Pass arg is a python list
if isinstance(name,list):
rawlst = name
# Passed arg is a comma separated list of files
elif ',' in name:
rawlst = name.split(',')
# Not a list
else:
rawlst = [name]
# Init final list
lst = []
# Iterate through raw list and look for directories
for rl in rawlst:
# Name ends with .yml or .yaml
if rl[-4:] == '.yml' or rl[-5:] == '.yaml':
lst.append(rl)
# Entry is a zip file directory
elif '.zip' in rl:
base = rl.split('.zip')[0] + '.zip'
sub = rl.split('.zip')[1][1:]
# Open zipfile
with zipfile.ZipFile(base, 'r', compression=zipfile.ZIP_LZMA) as myzip:
# Check if passed name is a directory, otherwise generate an error
if not any(x.startswith("%s/" % sub.rstrip("/")) for x in myzip.namelist()):
raise Exception("loadYaml: Invalid load file: {}, must be a directory or end in .yml or .yaml".format(rl))
else:
# Iterate through directory contents
for zfn in myzip.namelist():
# Filter by base directory
if zfn.find(sub) == 0:
spt = zfn.split('%s/' % sub.rstrip('/'))[1]
# Entry ends in .yml or *.yml and is in current directory
if '/' not in spt and (spt[-4:] == '.yml' or spt[-5:] == '.yaml'):
lst.append(base + '/' + zfn)
# Entry is a directory
elif os.path.isdir(rl):
dlst = glob.glob('{}/*.yml'.format(rl))
dlst.extend(glob.glob('{}/*.yaml'.format(rl)))
lst.extend(sorted(dlst))
# Not a zipfile, not a directory and does not end in .yml
else:
raise Exception("loadYaml: Invalid load file: {}, must be a directory or end in .yml or .yaml".format(rl))
# Read each file
with self.pollBlock(), self.updateGroup():
for fn in lst:
d = pr.yamlToData(fName=fn)
self._setDictRoot(d=d,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups)
if not writeEach:
self._write()
if self.InitAfterConfig.value():
self.initialize()
return True
[docs]
def treeDict(
self,
modes: list[str] = ['RW', 'RO', 'WO'],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
properties: bool = True,
) -> dict[str, Any]:
"""
Return the root tree as a dictionary.
Parameters
----------
modes : list['RW' | 'WO' | 'RO'], optional (default = ['RW', 'RO', 'WO'])
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group names to include.
excGroups : str or list[str], optional
Group names to exclude.
properties : bool, optional (default = True)
Include variable property fields.
Returns
-------
dict[str, object]
Dictionary keyed by root name with tree data.
"""
d = self._getDict(modes, incGroups, excGroups, properties=properties)
return {self.name: d}
[docs]
def treeYaml(
self,
modes: list[str] = ['RW', 'RO', 'WO'],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
properties: bool | None = None,
) -> str:
"""
Return the root tree as YAML text.
Parameters
----------
modes : list['RW' | 'WO' | 'RO'], optional (default = ['RW', 'RO', 'WO'])
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group names to include.
excGroups : str or list[str], optional
Group names to exclude.
properties : bool, optional
Include variable property fields. If ``None``, defaults to ``True``.
Returns
-------
str
YAML representation of the root tree.
"""
if properties is None:
properties = True
return pr.dataToYaml(self.treeDict(modes, incGroups, excGroups, properties))
[docs]
def setYaml(
self,
yml: str,
writeEach: bool,
modes: list[str],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
) -> None:
"""
Set variable values from YAML text.
Parameters
----------
yml : str
YAML text containing values to apply.
writeEach : bool
Write each variable as it is applied.
modes : list['RW' | 'WO' | 'RO']
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
"""
d = pr.yamlToData(yml)
with self.pollBlock(), self.updateGroup():
self._setDictRoot(d=d,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups)
if not writeEach:
self._write()
if self.InitAfterConfig.value():
self.initialize()
[docs]
def remoteVariableDump(
self,
name: str | None,
modes: list[str],
readFirst: bool,
) -> bool:
"""
Dump remote variable values to a file.
Parameters
----------
name : str, optional
Destination file path. If empty, a timestamped name is generated.
modes : list['RW' | 'WO' | 'RO']
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
readFirst : bool
Read values from hardware before dumping.
Returns
-------
bool
Returns ``True`` when dump completes.
"""
# Auto generate name if no arg
if name is None or name == '':
name = datetime.datetime.now().strftime("regdump_%Y%m%d_%H%M%S.txt")
if readFirst:
self._read()
with open(name,'w') as f:
for v in self.variableList:
if hasattr(v,'_getDumpValue') and v.mode in modes:
f.write(v._getDumpValue(False))
return True
def _setDictRoot(
self,
d: dict[str, Any],
writeEach: bool,
modes: list[str],
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
) -> None:
"""
Parameters
----------
d : dict[str, object]
Root-level dictionary to apply.
writeEach : bool
Write each variable as it is applied.
modes : list['RW' | 'WO' | 'RO']
Variable modes to include. Allowed values are ``'RW'``, ``'WO'``, and ``'RO'``.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
"""
for key, value in d.items():
# Attempt to get node
node = self.getNode(key)
# Call setDict on node
if node is not None:
node._setDict(d=value,writeEach=writeEach,modes=modes,incGroups=incGroups,excGroups=excGroups,keys=None)
else:
self._log.error("Entry {} not found".format(key))
def _clearLog(self) -> None:
"""Clear the system log."""
self.SystemLog.set(SystemLogInit)
self.SystemLogLast.set('')
def _queueUpdates(self, var: Any) -> None:
"""
Parameters
----------
var : object
Variable object queued for listener update.
"""
tid = threading.get_ident()
try:
self._updateTrack[tid].update(var)
except Exception:
with self._updateLock:
self._updateTrack[tid] = UpdateTracker(self._updateQueue)
self._updateTrack[tid].update(var)
# Recursively add listeners to update list
def _recurseAddListeners(self, nvars: dict[str, Any], var: Any) -> None:
"""Collect listener variables recursively into an update dictionary."""
for vl in var._listeners:
nvars[vl.path] = vl
self._recurseAddListeners(nvars, vl)
# Worker thread
def _updateWorker(self) -> None:
"""Update-thread worker for variable notifications."""
self._log.info("Starting update thread")
while True:
uvars = self._updateQueue.get()
# Done
if uvars is None:
self._log.info("Stopping update thread")
self._updateQueue.task_done()
return
# Process list
elif len(uvars) > 0:
self._log.debug(F'Process update group. Length={len(uvars)}. Entry={list(uvars.keys())[0]}')
# Copy list and add listeners
nvars = uvars.copy()
for p,v in uvars.items():
self._recurseAddListeners(nvars, v)
# Process the new list
for p,v in nvars.items():
# Process updates
val = v._doUpdate()
# Call root listener functions,
with self._varListenLock:
for func,doneFunc,incGroups,excGroups in self._varListeners:
if v.filterByGroup(incGroups, excGroups):
try:
func(p,val)
except Exception as e:
if v == self.SystemLog or v == self.SystemLogLast:
print("------- Error Executing Syslog Listeners -------")
print("Error: {}".format(e))
print("------------------------------------------------")
else:
pr.logException(self._log,e)
# Finalize root listeners
with self._varListenLock:
for func,doneFunc,incGroups,excGroups in self._varListeners:
if doneFunc is not None:
try:
doneFunc()
except Exception as e:
pr.logException(self._log,e)
self._log.debug(F"Done update group. Length={len(uvars)}. Entry={list(uvars.keys())[0]}")
# Set done
self._updateQueue.task_done()