Source code for pyrogue._PollQueue

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue base module - PollQueue Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations

import sys
import threading
import datetime
import itertools
import heapq
from typing import Any, Iterator

import pyrogue as pr
import rogue.interfaces.memory


class PollQueueEntry(object):
    """Data class for a poll queue entry."""
    def __init__(
        self,
        readTime: datetime.datetime,
        count: int,
        interval: datetime.timedelta,
        block: Any,
    ) -> None:
        """Initialize a poll-scheduler queue entry."""
        self.readTime = readTime
        self.count    = count
        self.interval = interval
        self.block    = block

    def __lt__(self, other: PollQueueEntry) -> bool:
        """Return heap ordering by scheduled read time."""
        return self.readTime < other.readTime

    def __gt__(self, other: PollQueueEntry) -> bool:
        """Return reverse heap ordering by scheduled read time."""
        return self.readTime > other.readTime


[docs] class PollQueue(object): """Poll queue scheduler. Parameters ---------- root : object Root object used for update grouping. """ def __init__(self, *, root: pr.Root) -> None: """Initialize the poll queue.""" self._pq = [] # The heap queue self._entries = {} # {Block: Entry} mapping to look up if a block is already in the queue self._counter = itertools.count() self._condLock = threading.Condition(threading.RLock()) self._run = True self._pause = True self._root = root self.blockCount = 0 self._pollThread = threading.Thread(target=self._poll) # Setup logging self._log = pr.logInit(cls=self) def _start(self) -> None: """Start the poll thread.""" self._pollThread.start() self._log.info("PollQueue Started") def _addEntry(self, block: pr.Block, interval: float) -> None: """Add an entry to the poll queue. Parameters ---------- block : pr.Block Block object to poll. interval : float Poll interval in seconds. Returns ------- None """ with self._condLock: timedelta = datetime.timedelta(seconds=interval) # new entries are always polled first immediately # (rounded up to the next second) readTime = datetime.datetime.now() readTime = readTime.replace(microsecond=0) entry = PollQueueEntry(readTime, next(self._counter), timedelta, block) self._entries[block] = entry heapq.heappush(self._pq, entry) # Wake up the thread self._condLock.notify() def _blockIncrement(self) -> None: """Increment block count to pause poll activity in critical sections.""" with self._condLock: self.blockCount += 1 self._condLock.notify() def _blockDecrement(self) -> None: """Decrement block count and wake poll thread if needed.""" with self._condLock: self.blockCount -= 1 self._condLock.notify()
[docs] def updatePollInterval(self, var: pr.BaseVariable) -> None: """Update polling interval for a variable. Parameters ---------- var : pr.BaseVariable Variable whose poll interval changed. """ with self._condLock: self._log.debug(f'updatePollInterval {var} - {var._pollInterval}') # Special case: Variable has no block and just depends on other variables # Then do update on each dependency instead if not hasattr(var, '_block') or var._block is None: if len(var.dependencies) > 0: for dep in var.dependencies: if var._pollInterval != 0 and (dep.pollInterval == 0 or var._pollInterval < dep.pollInterval): dep.setPollInterval(var._pollInterval) return if var._block in self._entries.keys(): oldInterval = self._entries[var._block].interval blockVars = [v for v in var._block.variables if v.pollInterval > 0] if len(blockVars) > 0: minVar = min(blockVars, key=lambda x: x.pollInterval) newInterval = datetime.timedelta(seconds=minVar.pollInterval) # If block interval has changed, invalidate the current entry for the block # and re-add it with the new interval if newInterval != oldInterval: self._entries[var._block].block = None self._addEntry(var._block, minVar.pollInterval) else: # No more variables belong to block entry, can remove it self._entries[var._block].block = None del self._entries[var._block] # New entry with non-zero poll interval, pure entry add elif var.pollInterval > 0: self._addEntry(var._block, var.pollInterval)
def _poll(self) -> None: """Run by the poll thread""" while True: if self.empty() or self.paused(): # Sleep until woken with self._condLock: self._condLock.wait() if self._run is False: self._log.info("PollQueue thread exiting") return else: continue else: # Sleep until the top entry is ready to be polled # Or a new entry is added by updatePollInterval now = datetime.datetime.now() readTime = self.peek().readTime waitTime = (readTime - now).total_seconds() with self._condLock: self._log.debug(f'Poll thread sleeping for {waitTime}') self._condLock.wait(waitTime) self._log.debug(f'Global reference count: {sys.getrefcount(None)}') with self._condLock: # Stop the thread if someone set run to False if self._run is False: self._log.info("PollQueue thread exiting") return # Wait for block count to be zero while self.blockCount > 0: self._condLock.wait() # Start update capture with self._root.updateGroup(): # Pop all timed out entries from the queue now = datetime.datetime.now() blockEntries = [] for entry in self._expiredEntries(now): self._log.debug(f'Polling Block {entry.block.path}') blockEntries.append(entry) try: pr.startTransaction(entry.block, type=rogue.interfaces.memory.Read) except Exception as e: pr.logException(self._log,e) # Update the entry with new read time entry.readTime = now + entry.interval entry.count = next(self._counter) # Push the updated entry back into the queue heapq.heappush(self._pq, entry) for entry in blockEntries: try: pr.checkTransaction(entry.block) except Exception as e: pr.logException(self._log,e) def _expiredEntries(self, time: datetime.datetime | None = None) -> Iterator[PollQueueEntry]: """ An iterator of all entries that expire by a given time. Use datetime.datetime.now() if no time provided. Each entry is popped from the queue before being yielded by the iterator Parameters ---------- time : datetime.datetime, optional Time cutoff to use; defaults to ``datetime.datetime.now()``. Returns ------- Iterator of expired entries """ with self._condLock: if time is None: time = datetime.datetime.now() while self.empty() is False and self.peek().readTime <= time: entry = heapq.heappop(self._pq) if entry.block is not None: yield entry
[docs] def peek(self) -> PollQueueEntry | None: """Return (but don't pop) the top entry in the queue. """ with self._condLock: if self.empty() is False: return self._pq[0] else: return None
[docs] def empty(self) -> bool: """Return True of queue is empty, else False """ with self._condLock: return len(self._pq)==0
def _stop(self) -> None: """Stop the poll queue stread """ with self._condLock: self._run = False self._condLock.notify()
[docs] def pause(self, value: bool) -> None: """Pause or unpause the poll queue Parameters ---------- value : bool True for pause, False for unpause Returns ------- None """ if value is True: with self._condLock: self._pause = True else: with self._condLock: self._pause = False self._condLock.notify()
[docs] def paused(self) -> bool: """Check pause state """ with self._condLock: return self._pause