Source code for pydm.widgets.archiver_time_plot

import json
import time
import numpy as np
from collections import OrderedDict
from typing import List, Optional
from pyqtgraph import DateAxisItem, ErrorBarItem
from pydm.widgets.channel import PyDMChannel
from pydm.widgets.timeplot import TimePlotCurveItem
from pydm.widgets import PyDMTimePlot
from qtpy.QtCore import QObject, QTimer, Property, Signal, Slot
from qtpy.QtGui import QColor

import logging

logger = logging.getLogger(__name__)

DEFAULT_ARCHIVE_BUFFER_SIZE = 18000
DEFAULT_TIME_SPAN = 5.0


[docs]class ArchivePlotCurveItem(TimePlotCurveItem): """ ArchivePlotCurveItem is a TimePlotCurveItem with support for receiving data from the archiver appliance. Parameters ---------- channel_address : str The address to of the scalar data to plot. Will also be used to retrieve data from archiver appliance if requested. use_archive_data : bool If True, requests will be made to archiver appliance for archived data when the plot is zoomed or scrolled to the left. **kws : dict[str: any] Additional parameters supported by pyqtgraph.PlotDataItem. """ # Used to request data from archiver appliance (starting timestamp, ending timestamp, processing command) archive_data_request_signal = Signal(float, float, str) archive_data_received_signal = Signal() def __init__(self, channel_address: Optional[str] = None, use_archive_data: bool = True, **kws): super(ArchivePlotCurveItem, self).__init__(channel_address, **kws) self.use_archive_data = use_archive_data self.archive_channel = None self.archive_points_accumulated = 0 self._archiveBufferSize = DEFAULT_ARCHIVE_BUFFER_SIZE self.archive_data_buffer = np.zeros((2, self._archiveBufferSize), order="f", dtype=float) # When optimized or mean value data is requested, we can display error bars representing # the full range of values retrieved self.error_bar_item = ErrorBarItem() self.error_bar_needs_set = True if channel_address is not None and use_archive_data: self.setArchiveChannel(channel_address)
[docs] def to_dict(self) -> OrderedDict: """Returns an OrderedDict representation with values for all properties needed to recreate this curve.""" dic_ = OrderedDict( [ ("useArchiveData", self.use_archive_data), ] ) dic_.update(super(ArchivePlotCurveItem, self).to_dict()) return dic_
[docs] def setArchiveChannel(self, address: str) -> None: """Creates the channel for the input address for communicating with the archiver appliance plugin.""" archiver_prefix = "archiver://pv=" if address.startswith("ca://"): archive_address = address.replace("ca://", archiver_prefix, 1) elif address.startswith("pva://"): archive_address = address.replace("pva://", archiver_prefix, 1) else: archive_address = archiver_prefix + address self.archive_channel = PyDMChannel( address=archive_address, value_slot=self.receiveArchiveData, value_signal=self.archive_data_request_signal )
[docs] @Slot(np.ndarray) def receiveArchiveData(self, data: np.ndarray) -> None: """Receive data from archiver appliance and place it into the archive data buffer. Will overwrite any previously existing data at the indices written to. Parameters ---------- data : np.ndarray A numpy array of varying shape consisting of archived data for display. At a minimum, index 0 will contain the timestamps and index 1 the actual data observations. Additional indices may be used as well based on the type of request made to the archiver appliance. For example optimized data will include standard deviations, minimums, and maximums """ archive_data_length = len(data[0]) max_x = data[0][archive_data_length - 1] if self.points_accumulated != 0: while max_x > self.data_buffer[0][-self.points_accumulated]: # Sometimes optimized queries return data past the current timestamp, this will delete those data points data = np.delete(data, len(data[0]) - 1, axis=1) archive_data_length -= 1 max_x = data[0][archive_data_length - 1] self.archive_data_buffer[0, len(self.archive_data_buffer[0]) - archive_data_length :] = data[0] self.archive_data_buffer[1, len(self.archive_data_buffer[0]) - archive_data_length :] = data[1] self.archive_points_accumulated = archive_data_length # Error bars if data.shape[0] == 5: # 5 indicates optimized data was requested from the archiver self.error_bar_item.setData( x=self.archive_data_buffer[0, -self.archive_points_accumulated :], y=self.archive_data_buffer[1, -self.archive_points_accumulated :], top=data[4] - data[1], bottom=data[1] - data[3], beam=0.5, pen={"color": self.color}, ) if self.error_bar_needs_set: self.getViewBox().addItem(self.error_bar_item) self.error_bar_needs_set = False self.data_changed.emit() self.archive_data_received_signal.emit()
[docs] def insert_archive_data(self, data: np.ndarray) -> None: """ Inserts data directly into the archive buffer. An example use case would be zooming into optimized mean-value data and replacing it with the raw data. Parameters ---------- data : np.ndarray A numpy array of shape (2, length_of_data). Index 0 contains timestamps and index 1 contains the data observations. """ archive_data_length = len(data[0]) min_x = data[0][0] max_x = data[0][archive_data_length - 1] # Get the indices between which we want to insert the data min_insertion_index = np.searchsorted(self.archive_data_buffer[0], min_x) max_insertion_index = np.searchsorted(self.archive_data_buffer[0], max_x) # Delete any non-raw data between the indices so we don't have multiple data points for the same timestamp self.archive_data_buffer = np.delete( self.archive_data_buffer, slice(min_insertion_index, max_insertion_index), axis=1 ) num_points_deleted = max_insertion_index - min_insertion_index delta_points = archive_data_length - num_points_deleted if archive_data_length > num_points_deleted: # If the insertion will overflow the data buffer, need to delete the oldest points self.archive_data_buffer = np.delete(self.archive_data_buffer, slice(0, delta_points), axis=1) else: self.archive_data_buffer = np.insert(self.archive_data_buffer, [0], np.zeros((2, delta_points)), axis=1) min_insertion_index = np.searchsorted(self.archive_data_buffer[0], min_x) self.archive_data_buffer = np.insert(self.archive_data_buffer, [min_insertion_index], data[0:2], axis=1) self.archive_points_accumulated += archive_data_length - num_points_deleted
[docs] @Slot() def redrawCurve(self, min_x=None, max_x=None) -> None: """ Redraw the curve with any new data added since the last draw call. """ if self.archive_points_accumulated == 0: super(ArchivePlotCurveItem, self).redrawCurve() else: try: x = np.concatenate( ( self.archive_data_buffer[0, -self.archive_points_accumulated :].astype(float), self.data_buffer[0, -self.points_accumulated :].astype(float), ) ) y = np.concatenate( ( self.archive_data_buffer[1, -self.archive_points_accumulated :].astype(float), self.data_buffer[1, -self.points_accumulated :].astype(float), ) ) self.setData(y=y, x=x) except (ZeroDivisionError, OverflowError, TypeError): # Solve an issue with pyqtgraph and initial downsampling pass
[docs] def initializeArchiveBuffer(self) -> None: """ Initialize the archive data buffer used for this curve. """ self.archive_points_accumulated = 0 self.archive_data_buffer = np.zeros((2, self._archiveBufferSize), order="f", dtype=float)
[docs] def getArchiveBufferSize(self) -> int: """Return the length of the archive buffer""" return int(self._archiveBufferSize)
[docs] def setArchiveBufferSize(self, value: int) -> None: """Set the length of the archive data buffer and zero it out""" if self._archiveBufferSize != int(value): self._archiveBufferSize = max(int(value), 2) self.initializeArchiveBuffer()
[docs] def resetArchiveBufferSize(self) -> None: """Reset the length of the archive buffer back to the default and zero it out""" if self._archiveBufferSize != DEFAULT_ARCHIVE_BUFFER_SIZE: self._archiveBufferSize = DEFAULT_ARCHIVE_BUFFER_SIZE self.initializeArchiveBuffer()
[docs] def channels(self) -> List[PyDMChannel]: """Return the list of channels this curve is connected to""" return [self.channel, self.archive_channel]
[docs]class PyDMArchiverTimePlot(PyDMTimePlot): """ PyDMArchiverTimePlot is a PyDMTimePlot with support for receiving data from the archiver appliance. Parameters ---------- parent : QObject, optional The parent of this widget. init_y_channels : list A list of scalar channels to plot vs time. background: str The background color for the plot. Accepts any arguments that pyqtgraph.mkColor will accept. optimized_data_bins: int The number of bins of data returned from the archiver when using optimized requests """ def __init__( self, parent: Optional[QObject] = None, init_y_channels: List[str] = [], background: str = "default", optimized_data_bins: int = 2000, ): super(PyDMArchiverTimePlot, self).__init__( parent=parent, init_y_channels=init_y_channels, plot_by_timestamps=True, background=background, bottom_axis=DateAxisItem("bottom"), ) self.optimized_data_bins = optimized_data_bins self._min_x = None self._prev_x = None # Holds the minimum x-value of the previous update of the plot self._starting_timestamp = time.time() # The timestamp at which the plot was first rendered self._archive_request_queued = False
[docs] def updateXAxis(self, update_immediately: bool = False) -> None: """Manages the requests to archiver appliance. When the user pans or zooms the x axis to the left, a request will be made for backfill data""" if len(self._curves) == 0: return min_x = self.plotItem.getAxis("bottom").range[0] # Gets the leftmost timestamp displayed on the x-axis max_x = max([curve.max_x() for curve in self._curves]) max_range = self.plotItem.getAxis("bottom").range[1] if min_x == 0: # This is zero when the plot first renders min_x = time.time() self._min_x = min_x self._starting_timestamp = min_x - DEFAULT_TIME_SPAN # A bit of a buffer so we don't overwrite live data if self.getTimeSpan() != DEFAULT_TIME_SPAN: # Initialize x-axis based on the time span as well as trigger a call to the archiver below self._min_x = self._min_x - self.getTimeSpan() self._archive_request_queued = True self.requestDataFromArchiver() self.plotItem.setXRange(self._min_x, time.time(), padding=0.0, update=update_immediately) elif min_x < self._min_x and not self.plotItem.isAnyXAutoRange(): # This means the user has manually scrolled to the left, so request archived data self._min_x = min_x self.setTimeSpan(max_x - min_x) if not self._archive_request_queued: # Letting the user pan or scroll the plot is convenient, but can generate a lot of events in under # a second that would trigger a request for data. By using a timer, we avoid this burst of events # and consolidate what would be many requests to archiver into just one. self._archive_request_queued = True QTimer.singleShot(1000, self.requestDataFromArchiver) # Here we only update the x-axis if the user hasn't asked for autorange and they haven't zoomed in (as # detected by the max range showing on the plot being less than the data available) elif not self.plotItem.isAnyXAutoRange() and not max_range < max_x - 10: if min_x > (self._prev_x + 15) or min_x < (self._prev_x - 15): # The plus/minus 15 just makes sure we don't do this on every update tick of the graph self.setTimeSpan(max_x - min_x) else: # Keep the plot moving with a rolling window based on the current timestamp self.plotItem.setXRange(max_x - self.getTimeSpan(), max_x, padding=0.0, update=update_immediately) self._prev_x = min_x
[docs] def requestDataFromArchiver(self, min_x: Optional[float] = None, max_x: Optional[float] = None) -> None: """ Make the request to the archiver appliance data plugin for archived data. Parameters ---------- min_x : float, optional Timestamp representing the start of the time period to fetch archive data from. Defaults to the minimum value visible on the plot when omitted. max_x : float, optional Timestamp representing the end of the time period to fetch archive data from. Defaults to the timestamp of the oldest live data point in the buffer if available. If no live points are recorded yet, then defaults to the timestamp at which the plot was first rendered. """ processing_command = "" if min_x is None: min_x = self._min_x for curve in self._curves: if curve.use_archive_data: if max_x is None: if curve.points_accumulated > 0: max_x = curve.data_buffer[0][curve.getBufferSize() - curve.points_accumulated] else: max_x = self._starting_timestamp requested_seconds = max_x - min_x if requested_seconds <= 5: self._archive_request_queued = False continue # Avoids noisy requests when first rendering the plot # Max amount of raw data to return before using optimized data max_data_request = int(0.80 * self.getArchiveBufferSize()) if requested_seconds > max_data_request: processing_command = "optimized_" + str(self.optimized_data_bins) curve.archive_data_request_signal.emit(min_x, max_x - 1, processing_command)
[docs] def getArchiveBufferSize(self) -> int: """Returns the size of the data buffer used to store archived data""" if len(self._curves) == 0: return DEFAULT_ARCHIVE_BUFFER_SIZE return self._curves[0].getArchiveBufferSize()
[docs] def createCurveItem(self, *args, **kwargs) -> ArchivePlotCurveItem: """Create and return a curve item to be plotted""" curve_item = ArchivePlotCurveItem(*args, **kwargs) curve_item.archive_data_received_signal.connect(self.archive_data_received) return curve_item
[docs] @Slot() def archive_data_received(self): """Take any action needed when this plot receives new data from archiver appliance""" max_x = max([curve.max_x() for curve in self._curves]) # Assure the user sees all data available whenever the request data is returned self.plotItem.setXRange(max_x - self.getTimeSpan(), max_x, padding=0.0, update=True) self._archive_request_queued = False
[docs] def setTimeSpan(self, value): """Set the value of the plot's timespan""" if value < DEFAULT_TIME_SPAN: # Less than 5 seconds will break the plot return self._time_span = value
[docs] def clearCurves(self) -> None: """Clear all curves from the plot""" for curve in self._curves: # Need to clear out any bars from optimized data, then super() can handle the rest if not curve.error_bar_needs_set: curve.getViewBox().removeItem(curve.error_bar_item) # reset _min_x to let updateXAxis make requests anew self._min_x = self._starting_timestamp super().clearCurves()
[docs] def getCurves(self) -> List[str]: """ Dump and return the current list of curves and each curve's settings into a list of JSON-formatted strings. """ return [json.dumps(curve.to_dict()) for curve in self._curves]
[docs] def setCurves(self, new_list: List[str]) -> None: """ Add a list of curves into the graph. Parameters ---------- new_list : list A list of JSON-formatted strings, each contains a curve and its settings """ try: new_list = [json.loads(str(i)) for i in new_list] except ValueError as e: logger.exception("Error parsing curve json data: {}".format(e)) return self.clearCurves() for d in new_list: color = d.get("color") if color: color = QColor(color) self.addYChannel( d["channel"], name=d.get("name"), color=color, lineStyle=d.get("lineStyle"), lineWidth=d.get("lineWidth"), symbol=d.get("symbol"), symbolSize=d.get("symbolSize"), yAxisName=d.get("yAxisName"), useArchiveData=d.get("useArchiveData"), )
curves = Property("QStringList", getCurves, setCurves, designable=False)