Skip to content

Data Insight Tool

CAGetThread(parent=None, address='')

Bases: QThread

Thread for making a CA get request to the given address. This is used to get the description of the curve.

Source code in trace/widgets/data_insight_tool.py
63
64
65
66
def __init__(self, parent: QObject = None, address: str = "") -> None:
    super().__init__(parent=parent)
    self.address = address
    self.stop_flag = False

run()

Get the value for the given address. Interruptable via the stop_flag. Does not attempt to emit the PV Value if interrupted.

Source code in trace/widgets/data_insight_tool.py
68
69
70
71
72
73
74
75
76
77
78
79
80
def run(self) -> None:
    """Get the value for the given address. Interruptable via the
    stop_flag. Does not attempt to emit the PV Value if interrupted.
    """
    pv = epics.PV(self.address)

    if self.stop_flag:
        return

    try:
        self.result_ready.emit(pv.value)
    except epics.ca.ChannelAccessException as e:
        logger.warning(f"Channel Access error: {e}")

stop()

Set the stop flag

Source code in trace/widgets/data_insight_tool.py
82
83
84
def stop(self) -> None:
    """Set the stop flag"""
    self.stop_flag = True

DataVisualizationModel(parent=None)

Bases: QAbstractTableModel

Table Model for fetching and storing the data for a given curve on the model. Gathers live data directly from the curve, but makes an HTTP request to the Archiver Appliance

Source code in trace/widgets/data_insight_tool.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
def __init__(self, parent: QObject = None) -> None:
    super().__init__(parent)
    self.df = pd.DataFrame(columns=self._df_columns)

    self.address = None
    self.unit = None
    self.description = None
    self.caget_thread = None
    self._decode_as_string = False

    self.network_manager = QNetworkAccessManager()
    self.network_manager.finished.connect(self.recieve_archive_reply)

decode_as_string property writable

weather or not to show the value column as a string or raw data

rowCount(index=QModelIndex())

Return the row count of the table

Source code in trace/widgets/data_insight_tool.py
110
111
112
113
114
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
    """Return the row count of the table"""
    if index is not None and index.isValid():
        return 0
    return self.df.shape[0]

columnCount(index=QModelIndex())

Return the column count of the table

Source code in trace/widgets/data_insight_tool.py
116
117
118
119
120
def columnCount(self, index: QModelIndex = QModelIndex()) -> int:
    """Return the column count of the table"""
    if index is not None and index.isValid():
        return 0
    return self.df.shape[1]

data(index, role=Qt.DisplayRole)

Return the data for the associated role. Currently only supporting DisplayRole.

Source code in trace/widgets/data_insight_tool.py
122
123
124
125
126
127
128
129
130
131
def data(self, index: QModelIndex, role: Qt.ItemDataRole = Qt.DisplayRole) -> str:
    """Return the data for the associated role. Currently only supporting DisplayRole."""
    if not index.isValid():
        return None
    elif role == Qt.DisplayRole:
        val = self.df.iat[index.row(), index.column()]
        if index.column() == 1 and self.decode_as_string:
            val = self.list_to_ascii(val)
        return str(val)
    return None

headerData(section, orientation, role=Qt.DisplayRole)

Return data associated with the header

Source code in trace/widgets/data_insight_tool.py
133
134
135
136
def headerData(self, section: int, orientation: Qt.Orientation, role: Qt.ItemDataRole = Qt.DisplayRole) -> str:
    """Return data associated with the header"""
    if orientation == Qt.Horizontal and role == Qt.DisplayRole:
        return self.df.columns[section]

set_description(description)

Set the description of the curve. This is called when the CAGetThread emits a result_ready signal.

Parameters:

Name Type Description Default
description str

The description of the curve

required
Source code in trace/widgets/data_insight_tool.py
152
153
154
155
156
157
158
159
160
161
162
def set_description(self, description: str) -> None:
    """Set the description of the curve. This is called when the CAGetThread
    emits a result_ready signal.

    Parameters
    ----------
    description : str
        The description of the curve
    """
    self.description = description
    self.description_changed.emit()

set_all_data(curve_item, x_range)

Set the model's data for the given curve and the given time range. This function determines what kind of data should be saved and prompts the methods for setting live or archived data as necessary. This also saves the meta data.

Parameters:

Name Type Description Default
curve_item TimePlotCurveItem

The curve for the model to collect and store data on

required
x_range list[int] | tuple[int, int]

The time range to collect and store data between

required
Source code in trace/widgets/data_insight_tool.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def set_all_data(self, curve_item: TimePlotCurveItem, x_range: list[int] | tuple[int, int]) -> None:
    """Set the model's data for the given curve and the given time range.
    This function determines what kind of data should be saved and prompts
    the methods for setting live or archived data as necessary. This also
    saves the meta data.

    Parameters
    ----------
    curve_item : TimePlotCurveItem
        The curve for the model to collect and store data on
    x_range : list[int] | tuple[int, int]
        The time range to collect and store data between
    """
    self.address = curve_item.address if curve_item.address else ""
    self.unit = curve_item.units

    # Set the meta data label of the DataInsightTool
    self.set_description("Loading...")

    # Create a new CAGetThread to get the description of the curve
    if isinstance(self.caget_thread, CAGetThread) and self.caget_thread.isRunning():
        self.caget_thread.stop()
    self.caget_thread = CAGetThread(self, self.address + ".DESC")
    self.caget_thread.result_ready.connect(self.set_description)
    self.caget_thread.start()

    curve_range = (curve_item.min_x(), curve_item.max_x())
    left_ts = max(x_range[0], curve_range[0])
    right_ts = min(x_range[1], curve_range[1])

    # Reset data model to empty state
    self.beginResetModel()
    self.df = pd.DataFrame(columns=self._df_columns)
    self.endResetModel()

    # Populate the model with live data if it is shown on the plot
    if (curve_range[0] <= x_range[1]) and (x_range[0] <= curve_range[1]):
        self.set_live_data(curve_item, (left_ts, right_ts))

    # Populate the model with archive data if it is shown on the plot
    if x_range[0] <= curve_range[0]:
        self.request_archive_data(curve_item.address, (x_range[0], left_ts))
    else:
        # Emulate network reply being recieved for parent widget
        self.reply_recieved.emit()

set_live_data(curve_item, x_range)

Set the live data for the given curve in the given time range. Appends rows within the time range to the end of the model's dataframe.

Parameters:

Name Type Description Default
curve_item TimePlotCurveItem

The curve for the model to collect and store data on

required
x_range list[int] | tuple[int, int]

The time range to collect and store data between

required
Source code in trace/widgets/data_insight_tool.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def set_live_data(self, curve_item: TimePlotCurveItem, x_range: list[int] | tuple[int, int]) -> None:
    """Set the live data for the given curve in the given time range. Appends
    rows within the time range to the end of the model's dataframe.

    Parameters
    ----------
    curve_item : TimePlotCurveItem
        The curve for the model to collect and store data on
    x_range : list[int] | tuple[int, int]
        The time range to collect and store data between
    """
    data_n = curve_item.points_accumulated
    if data_n == 0:
        return

    data = curve_item.data_buffer[:, -data_n:]
    indices = np.where((x_range[0] <= data[0]) & (data[0] <= x_range[1]))[0]

    live_df = pd.DataFrame(
        {
            "Datetime": [datetime.fromtimestamp(ts) for ts in data[0, indices]],
            "Value": data[1, indices],
            "Severity": ["NaN"] * indices.size,
            "Source": ["Live"] * indices.size,
        }
    )

    if live_df.empty:
        return

    self.beginInsertRows(QModelIndex(), 0, live_df.shape[0] - 1)
    self.df = pd.concat([live_df, self.df]) if not self.df.empty else live_df
    self.endInsertRows()

request_archive_data(pv_name, x_range)

Request data from the Archiver Appliance for the given PV and time range. Only gets raw data, never optimized. Ends early if there is no environment variable PYDM_ARCHIVER_URL, which would contain the url for the Archiver Appliance.

Parameters:

Name Type Description Default
pv_name str

The PV address to request data for

required
x_range list[int] | tuple[int, int]

The time range to collect and store data between

required
Source code in trace/widgets/data_insight_tool.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def request_archive_data(self, pv_name: str, x_range: list[int] | tuple[int, int]) -> None:
    """Request data from the Archiver Appliance for the given PV and time range.
    Only gets raw data, never optimized. Ends early if there is no environment
    variable PYDM_ARCHIVER_URL, which would contain the url for the Archiver
    Appliance.

    Parameters
    ----------
    pv_name : str
        The PV address to request data for
    x_range : list[int] | tuple[int, int]
        The time range to collect and store data between
    """
    # Check the $PYDM_ARCHIVER_URL is populated
    base_url = os.getenv("PYDM_ARCHIVER_URL")
    if base_url is None:
        logger.error(
            "Environment variable: PYDM_ARCHIVER_URL must be defined to use the archiver plugin, for "
            "example: http://lcls-archapp.slac.stanford.edu"
        )
        return

    # Correctly format the timestamps for the Archiver Appliance
    from_dt = datetime.fromtimestamp(x_range[0], tz=timezone.utc)
    from_date_str = from_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

    to_dt = datetime.fromtimestamp(x_range[1], tz=timezone.utc)
    to_date_str = to_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

    # Construct the request url and make the request
    url_string = f"{base_url}/retrieval/data/getData.json?pv={pv_name}&from={from_date_str}&to={to_date_str}"
    request = QNetworkRequest(QUrl(url_string))
    self.network_manager.get(request)

recieve_archive_reply(reply)

Process the recieved reply to the request made in request_archive_data. Unpack the data and call set_archive_data. Mostly checks if the reply contains an error.

Parameters:

Name Type Description Default
reply QNetworkReply

Reply to the network request made in request_archive_data

required
Source code in trace/widgets/data_insight_tool.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def recieve_archive_reply(self, reply: QNetworkReply) -> None:
    """Process the recieved reply to the request made in request_archive_data.
    Unpack the data and call set_archive_data. Mostly checks if the reply
    contains an error.

    Parameters
    ----------
    reply : QNetworkReply
        Reply to the network request made in request_archive_data
    """
    if reply.error() == QNetworkReply.NoError:
        bytes_str = reply.readAll()
        try:
            data_dict = json.loads(str(bytes_str, "utf-8"))
            self.set_archive_data(data_dict)
        except json.JSONDecodeError:
            logger.warning("Data Insight Tool: No data received from archiver")
    else:
        logger.debug(
            f"Request for data from archiver failed, request url: {reply.url()} retrieved header: "
            f"{reply.header(QNetworkRequest.ContentTypeHeader)} error: {reply.error()}"
        )
    self.reply_recieved.emit()
    reply.deleteLater()

set_archive_data(data_dict)

Set the live data for the given curve in the given time range. Appends rows within the time range to the end of the model's dataframe.

Parameters:

Name Type Description Default
data_dict dict

Dictionary containing all data to be added to the model's dataframe

required
Source code in trace/widgets/data_insight_tool.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def set_archive_data(self, data_dict: dict) -> None:
    """Set the live data for the given curve in the given time range. Appends
    rows within the time range to the end of the model's dataframe.

    Parameters
    ----------
    data_dict : dict
        Dictionary containing all data to be added to the model's dataframe
    """
    points = data_dict[0]["data"]
    archive_df = pd.DataFrame(
        {
            "Datetime": [datetime.fromtimestamp(p["secs"] + p["nanos"] * 1e-9) for p in points],
            "Value": [p["val"] for p in points],
            "Severity": [SEVERITY_MAP[p["severity"]] for p in points],
            "Source": ["Archive"] * len(points),
        }
    )

    self.beginInsertRows(QModelIndex(), 0, archive_df.shape[0] - 1)
    self.df = pd.concat([archive_df, self.df]) if not self.df.empty else archive_df
    self.endInsertRows()

export_data(file_path, extension)

Export the model's data to the given file. Adds metadata to the top of the exported file with the curve's address, unit (if any), and description.

Parameters:

Name Type Description Default
file_path Path

The path of the file to be (over)written with the exported data

required
extension str

The extension of the file to be (over)written

required

Raises:

Type Description
ValueError

Raised when export is requested without data in the model, or when an invalid file format is requested for export

IsADirectoryError

Raised when the provided filepath is a directory

Source code in trace/widgets/data_insight_tool.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def export_data(self, file_path: Path, extension: str) -> None:
    """Export the model's data to the given file. Adds metadata to the top of
    the exported file with the curve's address, unit (if any), and description.

    Parameters
    ----------
    file_path : Path
        The path of the file to be (over)written with the exported data
    extension : str
        The extension of the file to be (over)written

    Raises
    ------
    ValueError
        Raised when export is requested without data in the model, or when an
        invalid file format is requested for export
    IsADirectoryError
        Raised when the provided filepath is a directory
    """
    if self.df.empty:
        raise ValueError("No data to export. Request data first.")
    if file_path.is_dir():
        raise IsADirectoryError("The selected path is a directory. Select a file to export to.")
    if extension not in [".csv", ".mat", ".json"]:
        raise ValueError("Unrecognized file format requested. Skipping export.")

    header_dict = {"Address": self.address, "Unit": self.unit, "Description": self.description}

    export_df = self.df.copy(deep=True)
    export_df["Datetime"] = export_df["Datetime"].astype("int64") / 1e9

    if self.decode_as_string:
        export_df["Value"] = export_df["Value"].apply(self.list_to_ascii)

    if extension == ".csv":
        file_header = "".join([f"{k}: {v}\n" for k, v in header_dict.items()])
        with file_path.open("w") as file:
            file.write(file_header)
            export_df.to_csv(file, index=False, mode="a")
    elif extension == ".mat":
        header_dict.update({name: col.values for name, col in export_df.items()})
        savemat(file_path, header_dict)
    elif extension == ".json":
        if export_df["Value"].dtype == object:
            export_df["Value"] = export_df["Value"].astype("str")
        data_dict = export_df.to_dict(orient="records")
        export_dict = {"meta": header_dict, "data": data_dict}
        with file_path.open("w") as file:
            json.dump(export_dict, file, indent=2)

has_waveform_data()

Return True if any value in the Value column is a list or numpy array

Source code in trace/widgets/data_insight_tool.py
376
377
378
def has_waveform_data(self) -> bool:
    """Return True if any value in the Value column is a list or numpy array"""
    return bool(self.df["Value"].apply(lambda v: isinstance(v, (list, np.ndarray))).any())

list_to_ascii(val) staticmethod

Convert a list of integers into an ASCII string, ignoring null characters

Source code in trace/widgets/data_insight_tool.py
380
381
382
383
384
385
386
387
388
@staticmethod
def list_to_ascii(val: list[int]) -> str:
    """Convert a list of integers into an ASCII string, ignoring null characters"""
    if not isinstance(val, (list, np.ndarray)):
        return str(val)
    characters = map(chr, list(val))
    string = "".join(characters)
    string = string.replace("\u0000", "")
    return string

DataInsightTool(parent, plot=None)

Bases: QWidget

The Data Insight Tool is a standalone widget that allows users to display all archive and live data on the plot for any given curve. Users are also able to export the raw data from this tool.

Source code in trace/widgets/data_insight_tool.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def __init__(self, parent: QObject, plot: PyDMArchiverTimePlot = None) -> None:
    super().__init__(parent=parent)
    self.setWindowFlag(Qt.Window)
    self.resize(600, 600)
    self.setWindowTitle("Data Insight Tool")

    self.layout_init()

    self.unopened = True

    self.data_vis_model.reply_recieved.connect(self.loading_label.hide)
    self.data_vis_model.reply_recieved.connect(self.update_decode_as_string_visibility)
    self.data_vis_model.description_changed.connect(self.set_meta_data)
    self.export_button.clicked.connect(self.export_data_to_file)
    self.decode_as_string_checkbox.toggled.connect(self.set_decode_as_string)
    self.pv_select_box.currentIndexChanged.connect(self.get_data)
    self.refresh_button.clicked.connect(self.get_data)

    if isinstance(plot, PyDMArchiverTimePlot):
        self.plot = plot

plot property writable

Return the plot associated with this widget

layout_init()

Initialize the layout of the Data Insight Tool widget.

Source code in trace/widgets/data_insight_tool.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def layout_init(self) -> None:
    """Initialize the layout of the Data Insight Tool widget."""
    self.main_layout = QVBoxLayout()

    # Populate the PV selection and request layout at the top of the widget
    self.request_layout = QHBoxLayout()
    self.pv_select_box = QComboBox()
    self.pv_select_box.setSizeAdjustPolicy(QComboBox.AdjustToContents)
    self.request_layout.addWidget(self.pv_select_box, alignment=Qt.AlignLeft)

    self.loading_label = QLabel("Loading...")
    self.loading_label.hide()
    self.request_layout.addWidget(self.loading_label, alignment=Qt.AlignCenter)

    self.export_button = QPushButton("Export to File")
    self.request_layout.addWidget(self.export_button, alignment=Qt.AlignRight)
    self.main_layout.addLayout(self.request_layout)

    # Create the metadata label and refresh button
    self.metadata_layout = QHBoxLayout()
    self.meta_data_label = QLabel()
    self.metadata_layout.addWidget(self.meta_data_label, alignment=Qt.AlignLeft)

    self.decode_as_string_checkbox = QCheckBox()
    self.decode_as_string_checkbox.setText("Decode As String")
    self.decode_as_string_checkbox.hide()
    self.metadata_layout.addWidget(self.decode_as_string_checkbox)

    self.refresh_button = QPushButton("Refresh Data")
    self.metadata_layout.addWidget(self.refresh_button, alignment=Qt.AlignRight)
    self.main_layout.addLayout(self.metadata_layout)

    # Set up the main data table in the center of the widget
    self.data_vis_model = DataVisualizationModel()
    self.data_table = FrozenTableView(self.data_vis_model)
    self.main_layout.addWidget(self.data_table)

    self.setLayout(self.main_layout)

set_meta_data()

Populate the meta_data_label with the curve's unit (if any) and description.

Source code in trace/widgets/data_insight_tool.py
476
477
478
479
480
481
482
483
def set_meta_data(self) -> None:
    """Populate the meta_data_label with the curve's unit (if any) and description."""
    meta_labels = []
    if self.data_vis_model.unit:
        meta_labels.append(str(self.data_vis_model.unit))
    if self.data_vis_model.description:
        meta_labels.append(str(self.data_vis_model.description))
    self.meta_data_label.setText(", ".join(meta_labels))

combobox_to_curve(combobox_ind)

Convert an index for the pv_select_box combobox to the corresponding curve item from the curves model.

Parameters:

Name Type Description Default
combobox_ind int

The index for pv_select_box

required

Returns:

Type Description
ArchivePlotCurveItem

The curve item that corresponds to the PV chosen on the combobox

Source code in trace/widgets/data_insight_tool.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def combobox_to_curve(self, combobox_ind: int) -> ArchivePlotCurveItem:
    """Convert an index for the pv_select_box combobox to the corresponding
    curve item from the curves model.

    Parameters
    ----------
    combobox_ind : int
        The index for pv_select_box

    Returns
    -------
    ArchivePlotCurveItem
        The curve item that corresponds to the PV chosen on the combobox
    """
    if combobox_ind < 0 or self.pv_select_box.count() <= combobox_ind:
        combobox_ind = self.pv_select_box.currentIndex()
    return self.plot.curveAtIndex(combobox_ind)

set_decode_as_string()

set the decode_as_string flag on the self.data_vis_model based off of the self.decode_as_string_checkbox then emit the dataChanged signal from the data_vis_model for the value column

Source code in trace/widgets/data_insight_tool.py
503
504
505
506
507
508
509
def set_decode_as_string(self) -> None:
    """set the decode_as_string flag on the self.data_vis_model based off of the self.decode_as_string_checkbox
    then emit the dataChanged signal from the data_vis_model for the value column"""
    if self.decode_as_string_checkbox.isChecked():
        self.data_vis_model.decode_as_string = True
    else:
        self.data_vis_model.decode_as_string = False

update_decode_as_string_visibility()

Show the decode_as_string_checkbox only when the model's Value column contains arrays

Source code in trace/widgets/data_insight_tool.py
511
512
513
514
@Slot()
def update_decode_as_string_visibility(self) -> None:
    """Show the decode_as_string_checkbox only when the model's Value column contains arrays"""
    self.decode_as_string_checkbox.setVisible(self.data_vis_model.has_waveform_data())

update_pv_select_box()

Populate the pv_select_box with all curves in the plot. This is called when the plot is updated.

Source code in trace/widgets/data_insight_tool.py
516
517
518
519
520
521
522
523
524
525
@Slot()
def update_pv_select_box(self) -> None:
    """Populate the pv_select_box with all curves in the plot. This is called
    when the plot is updated.
    """
    self.pv_select_box.blockSignals(True)
    self.pv_select_box.clear()
    curve_names = [c.address for c in self.plot._curves if isinstance(c, ArchivePlotCurveItem)]
    self.pv_select_box.addItems(curve_names)
    self.pv_select_box.blockSignals(False)

export_data_to_file()

Prompt the user to select a file to export data to then prompt the DataVisualizationModel to export its data to the selected file.

Source code in trace/widgets/data_insight_tool.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
@Slot()
def export_data_to_file(self) -> None:
    """Prompt the user to select a file to export data to then prompt the
    DataVisualizationModel to export its data to the selected file.
    """
    file_name, extension_filter = QFileDialog.getSaveFileName(
        self,
        "Export Archive Data",
        Path(".").name,
        "Comma-Separated Values File (*.csv);;MAT-File (*.mat);;JSON File (*.json)",
    )
    if not extension_filter:
        return
    extension = re.search(r"\*(.*?)\)", extension_filter).group(1)
    file_name = Path(file_name).with_suffix(extension)

    try:
        self.data_vis_model.export_data(file_name, extension)
    except (ValueError, IsADirectoryError) as e:
        logger.error(str(e))
        QMessageBox.critical(self, "Error", str(e))

get_data(combobox_index=-1)

Prompt the DataVisualizationModel to fetch and save the data for the curve chosen by the user for the time range on the associated plot.

Parameters:

Name Type Description Default
combobox_index int

The index in the pv_select_box for the user selected curve, by default -1

-1
Source code in trace/widgets/data_insight_tool.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
@Slot()
@Slot(int)
def get_data(self, combobox_index: int = -1) -> None:
    """Prompt the DataVisualizationModel to fetch and save the data for the
    curve chosen by the user for the time range on the associated plot.

    Parameters
    ----------
    combobox_index : int, optional
        The index in the pv_select_box for the user selected curve, by default -1
    """
    if self.pv_select_box.count() < 1:
        logger.warning("Curves must be added to the main display before data can be requested.")
        return

    curve_item = self.combobox_to_curve(combobox_index)
    x_range = self.plot.getXAxis().range

    self.decode_as_string_checkbox.hide()
    self.data_vis_model.set_all_data(curve_item, x_range)
    self.set_meta_data()
    self.loading_label.show()