Skip to content

Data Insight Tool

CAGetThread(parent=None, address='')

Bases: QThread

Thread for making a CA get request to the given address. This is used to get the description of the curve.

Source code in trace/widgets/data_insight_tool.py
62
63
64
65
def __init__(self, parent: QObject = None, address: str = "") -> None:
    super().__init__(parent=parent)
    self.address = address
    self.stop_flag = False

run()

Get the value for the given address. Interruptable via the stop_flag. Does not attempt to emit the PV Value if interrupted.

Source code in trace/widgets/data_insight_tool.py
67
68
69
70
71
72
73
74
75
76
77
78
79
def run(self) -> None:
    """Get the value for the given address. Interruptable via the
    stop_flag. Does not attempt to emit the PV Value if interrupted.
    """
    pv = epics.PV(self.address)

    if self.stop_flag:
        return

    try:
        self.result_ready.emit(pv.value)
    except epics.ca.ChannelAccessException as e:
        logger.warning(f"Channel Access error: {e}")

stop()

Set the stop flag

Source code in trace/widgets/data_insight_tool.py
81
82
83
def stop(self) -> None:
    """Set the stop flag"""
    self.stop_flag = True

DataVisualizationModel(parent=None)

Bases: QAbstractTableModel

Table Model for fetching and storing the data for a given curve on the model. Gathers live data directly from the curve, but makes an HTTP request to the Archiver Appliance

Source code in trace/widgets/data_insight_tool.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __init__(self, parent: QObject = None) -> None:
    super().__init__(parent)
    self.df = pd.DataFrame(columns=["Datetime", "Value", "Severity", "Source"])

    self.address = None
    self.unit = None
    self.description = None
    self.caget_thread = None

    self.network_manager = QNetworkAccessManager()
    self.network_manager.finished.connect(self.recieve_archive_reply)

rowCount(index=QModelIndex())

Return the row count of the table

Source code in trace/widgets/data_insight_tool.py
107
108
109
110
111
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
    """Return the row count of the table"""
    if index is not None and index.isValid():
        return 0
    return self.df.shape[0]

columnCount(index=QModelIndex())

Return the column count of the table

Source code in trace/widgets/data_insight_tool.py
113
114
115
116
117
def columnCount(self, index: QModelIndex = QModelIndex()) -> int:
    """Return the column count of the table"""
    if index is not None and index.isValid():
        return 0
    return self.df.shape[1]

data(index, role=Qt.DisplayRole)

Return the data for the associated role. Currently only supporting DisplayRole.

Source code in trace/widgets/data_insight_tool.py
119
120
121
122
123
124
125
126
def data(self, index: QModelIndex, role: Qt.ItemDataRole = Qt.DisplayRole) -> str:
    """Return the data for the associated role. Currently only supporting DisplayRole."""
    if not index.isValid():
        return None
    elif role == Qt.DisplayRole:
        val = self.df.iat[index.row(), index.column()]
        return str(val)
    return None

headerData(section, orientation, role=Qt.DisplayRole)

Return data associated with the header

Source code in trace/widgets/data_insight_tool.py
128
129
130
131
def headerData(self, section: int, orientation: Qt.Orientation, role: Qt.ItemDataRole = Qt.DisplayRole) -> str:
    """Return data associated with the header"""
    if orientation == Qt.Horizontal and role == Qt.DisplayRole:
        return self.df.columns[section]

set_description(description)

Set the description of the curve. This is called when the CAGetThread emits a result_ready signal.

Parameters:

Name Type Description Default
description str

The description of the curve

required
Source code in trace/widgets/data_insight_tool.py
133
134
135
136
137
138
139
140
141
142
143
def set_description(self, description: str) -> None:
    """Set the description of the curve. This is called when the CAGetThread
    emits a result_ready signal.

    Parameters
    ----------
    description : str
        The description of the curve
    """
    self.description = description
    self.description_changed.emit()

set_all_data(curve_item, x_range)

Set the model's data for the given curve and the given time range. This function determines what kind of data should be saved and prompts the methods for setting live or archived data as necessary. This also saves the meta data.

Parameters:

Name Type Description Default
curve_item TimePlotCurveItem

The curve for the model to collect and store data on

required
x_range list[int] | tuple[int, int]

The time range to collect and store data between

required
Source code in trace/widgets/data_insight_tool.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def set_all_data(self, curve_item: TimePlotCurveItem, x_range: list[int] | tuple[int, int]) -> None:
    """Set the model's data for the given curve and the given time range.
    This function determines what kind of data should be saved and prompts
    the methods for setting live or archived data as necessary. This also
    saves the meta data.

    Parameters
    ----------
    curve_item : TimePlotCurveItem
        The curve for the model to collect and store data on
    x_range : list[int] | tuple[int, int]
        The time range to collect and store data between
    """
    self.address = curve_item.address if curve_item.address else ""
    self.unit = curve_item.units

    # Set the meta data label of the DataInsightTool
    self.set_description("Loading...")

    # Create a new CAGetThread to get the description of the curve
    if isinstance(self.caget_thread, CAGetThread) and self.caget_thread.isRunning():
        self.caget_thread.stop()
    self.caget_thread = CAGetThread(self, self.address + ".DESC")
    self.caget_thread.result_ready.connect(self.set_description)
    self.caget_thread.start()

    curve_range = (curve_item.min_x(), curve_item.max_x())
    left_ts = max(x_range[0], curve_range[0])
    right_ts = min(x_range[1], curve_range[1])

    # Populate the model with live data if it is shown on the plot
    if (curve_range[0] <= x_range[1]) and (x_range[0] <= curve_range[1]):
        self.set_live_data(curve_item, (left_ts, right_ts))

    # Populate the model with archive data if it is shown on the plot
    if x_range[0] <= curve_range[0]:
        self.request_archive_data(curve_item.address, (x_range[0], left_ts))
    else:
        # Emulate network reply being recieved for parent widget
        self.reply_recieved.emit()

set_live_data(curve_item, x_range)

Set the live data for the given curve in the given time range. Appends rows within the time range to the end of the model's dataframe.

Parameters:

Name Type Description Default
curve_item TimePlotCurveItem

The curve for the model to collect and store data on

required
x_range list[int] | tuple[int, int]

The time range to collect and store data between

required
Source code in trace/widgets/data_insight_tool.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def set_live_data(self, curve_item: TimePlotCurveItem, x_range: list[int] | tuple[int, int]) -> None:
    """Set the live data for the given curve in the given time range. Appends
    rows within the time range to the end of the model's dataframe.

    Parameters
    ----------
    curve_item : TimePlotCurveItem
        The curve for the model to collect and store data on
    x_range : list[int] | tuple[int, int]
        The time range to collect and store data between
    """
    data_n = curve_item.points_accumulated
    if data_n == 0:
        return

    data = curve_item.data_buffer[:, -data_n:]
    indices = np.where((x_range[0] <= data[0]) & (data[0] <= x_range[1]))[0]

    convert_data = {"Datetime": [], "Value": [], "Severity": []}
    convert_data["Datetime"] = data[0, indices]
    convert_data["Value"] = data[1, indices]
    convert_data["Severity"] = ["NaN"] * indices.size
    convert_data["Source"] = ["Live"] * indices.size

    live_df = pd.DataFrame(convert_data)
    live_df["Datetime"] = live_df["Datetime"].apply(datetime.fromtimestamp)

    self.beginResetModel()
    self.df = live_df
    self.endResetModel()

request_archive_data(pv_name, x_range)

Request data from the Archiver Appliance for the given PV and time range. Only gets raw data, never optimized. Ends early if there is no environment variable PYDM_ARCHIVER_URL, which would contain the url for the Archiver Appliance.

Parameters:

Name Type Description Default
pv_name str

The PV address to request data for

required
x_range list[int] | tuple[int, int]

The time range to collect and store data between

required
Source code in trace/widgets/data_insight_tool.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def request_archive_data(self, pv_name: str, x_range: list[int] | tuple[int, int]) -> None:
    """Request data from the Archiver Appliance for the given PV and time range.
    Only gets raw data, never optimized. Ends early if there is no environment
    variable PYDM_ARCHIVER_URL, which would contain the url for the Archiver
    Appliance.

    Parameters
    ----------
    pv_name : str
        The PV address to request data for
    x_range : list[int] | tuple[int, int]
        The time range to collect and store data between
    """
    # Check the $PYDM_ARCHIVER_URL is populated
    base_url = os.getenv("PYDM_ARCHIVER_URL")
    if base_url is None:
        logger.error(
            "Environment variable: PYDM_ARCHIVER_URL must be defined to use the archiver plugin, for "
            "example: http://lcls-archapp.slac.stanford.edu"
        )
        return

    # Correctly format the timestamps for the Archiver Appliance
    from_dt = datetime.fromtimestamp(x_range[0], tz=timezone.utc)
    from_date_str = from_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

    to_dt = datetime.fromtimestamp(x_range[1], tz=timezone.utc)
    to_date_str = to_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

    # Construct the request url and make the request
    url_string = f"{base_url}/retrieval/data/getData.json?pv={pv_name}&from={from_date_str}&to={to_date_str}"
    request = QNetworkRequest(QUrl(url_string))
    self.network_manager.get(request)

recieve_archive_reply(reply)

Process the recieved reply to the request made in request_archive_data. Unpack the data and call set_archive_data. Mostly checks if the reply contains an error.

Parameters:

Name Type Description Default
reply QNetworkReply

Reply to the network request made in request_archive_data

required
Source code in trace/widgets/data_insight_tool.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def recieve_archive_reply(self, reply: QNetworkReply) -> None:
    """Process the recieved reply to the request made in request_archive_data.
    Unpack the data and call set_archive_data. Mostly checks if the reply
    contains an error.

    Parameters
    ----------
    reply : QNetworkReply
        Reply to the network request made in request_archive_data
    """
    self.reply_recieved.emit()
    if reply.error() == QNetworkReply.NoError:
        bytes_str = reply.readAll()
        try:
            data_dict = json.loads(str(bytes_str, "utf-8"))
            self.set_archive_data(data_dict)
        except json.JSONDecodeError:
            logger.warning("Data Insight Tool: No data received from archiver")
    else:
        logger.debug(
            f"Request for data from archiver failed, request url: {reply.url()} retrieved header: "
            f"{reply.header(QNetworkRequest.ContentTypeHeader)} error: {reply.error()}"
        )
    reply.deleteLater()

set_archive_data(data_dict)

Set the live data for the given curve in the given time range. Appends rows within the time range to the end of the model's dataframe.

Parameters:

Name Type Description Default
data_dict dict

Dictionary containing all data to be added to the model's dataframe

required
Source code in trace/widgets/data_insight_tool.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def set_archive_data(self, data_dict: dict) -> None:
    """Set the live data for the given curve in the given time range. Appends
    rows within the time range to the end of the model's dataframe.

    Parameters
    ----------
    data_dict : dict
        Dictionary containing all data to be added to the model's dataframe
    """
    convert_data = {"Datetime": [], "Value": [], "Severity": []}
    for point in data_dict[0]["data"]:
        ts = point["secs"] + (point["nanos"] * 1e-9)
        convert_data["Datetime"].append(datetime.fromtimestamp(ts))
        convert_data["Value"].append(point["val"])
        convert_data["Severity"].append(SEVERITY_MAP[point["severity"]])
    convert_data["Source"] = ["Archive"] * len(data_dict[0]["data"])
    archive_df = pd.DataFrame(convert_data)

    if self.df.empty:
        self.beginResetModel()
        self.df = archive_df
        self.endResetModel()
    else:
        self.beginInsertRows(QModelIndex(), 0, archive_df.shape[0] - 1)
        self.df = pd.concat([archive_df, self.df])
        self.endInsertRows()
    self.layoutChanged.emit()

export_data(file_path, extension)

Export the model's data to the given file. Adds metadata to the top of the exported file with the curve's address, unit (if any), and description.

Parameters:

Name Type Description Default
file_path Path

The path of the file to be (over)written with the exported data

required
extension str

The extension of the file to be (over)written

required

Raises:

Type Description
ValueError

Raised when export is requested without data in the model, or when an invalid file format is requested for export

IsADirectoryError

Raised when the provided filepath is a directory

Source code in trace/widgets/data_insight_tool.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def export_data(self, file_path: Path, extension: str) -> None:
    """Export the model's data to the given file. Adds metadata to the top of
    the exported file with the curve's address, unit (if any), and description.

    Parameters
    ----------
    file_path : Path
        The path of the file to be (over)written with the exported data
    extension : str
        The extension of the file to be (over)written

    Raises
    ------
    ValueError
        Raised when export is requested without data in the model, or when an
        invalid file format is requested for export
    IsADirectoryError
        Raised when the provided filepath is a directory
    """
    if self.df.empty:
        raise ValueError("No data to export. Request data first.")
    if file_path.is_dir():
        raise IsADirectoryError("The selected path is a directory. Select a file to export to.")
    if extension not in [".csv", ".mat", ".json"]:
        raise ValueError("Unrecognized file format requested. Skipping export.")

    header_dict = {"Address": self.address, "Unit": self.unit, "Description": self.description}

    export_df = self.df.copy()
    export_df["Datetime"] = export_df["Datetime"].astype("int64") / 1e9

    if extension == ".csv":
        file_header = "".join([f"{k}: {v}\n" for k, v in header_dict.items()])
        with file_path.open("w") as file:
            file.write(file_header)
            export_df.to_csv(file, index=False, mode="a")
    elif extension == ".mat":
        header_dict.update({name: col.values for name, col in export_df.items()})
        savemat(file_path, header_dict)
    elif extension == ".json":
        if export_df["Value"].dtype == object:
            export_df["Value"] = export_df["Value"].astype("str")
        data_dict = export_df.to_dict(orient="records")
        export_dict = {"meta": header_dict, "data": data_dict}
        with file_path.open("w") as file:
            json.dump(export_dict, file, indent=2)

DataInsightTool(parent, plot=None)

Bases: QWidget

The Data Insight Tool is a standalone widget that allows users to display all archive and live data on the plot for any given curve. Users are also able to export the raw data from this tool.

Source code in trace/widgets/data_insight_tool.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def __init__(self, parent: QObject, plot: PyDMArchiverTimePlot = None) -> None:
    super().__init__(parent=parent)
    self.setWindowFlag(Qt.Window)
    self.resize(600, 600)
    self.setWindowTitle("Data Insight Tool")

    self.layout_init()

    self.data_vis_model.reply_recieved.connect(self.loading_label.hide)
    self.data_vis_model.description_changed.connect(self.set_meta_data)
    self.export_button.clicked.connect(self.export_data_to_file)
    self.pv_select_box.currentIndexChanged.connect(self.get_data)
    self.refresh_button.clicked.connect(self.get_data)

    if isinstance(plot, PyDMArchiverTimePlot):
        self.plot = plot

plot property writable

Return the plot associated with this widget

layout_init()

Initialize the layout of the Data Insight Tool widget.

Source code in trace/widgets/data_insight_tool.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def layout_init(self) -> None:
    """Initialize the layout of the Data Insight Tool widget."""
    self.main_layout = QVBoxLayout()

    # Populate the PV selection and request layout at the top of the widget
    self.request_layout = QHBoxLayout()
    self.pv_select_box = QComboBox()
    self.pv_select_box.setSizeAdjustPolicy(QComboBox.AdjustToContents)
    self.request_layout.addWidget(self.pv_select_box, alignment=Qt.AlignLeft)

    self.loading_label = QLabel("Loading...")
    self.loading_label.hide()
    self.request_layout.addWidget(self.loading_label, alignment=Qt.AlignCenter)

    self.export_button = QPushButton("Export to File")
    self.request_layout.addWidget(self.export_button, alignment=Qt.AlignRight)
    self.main_layout.addLayout(self.request_layout)

    # Create the metadata label and refresh button
    self.metadata_layout = QHBoxLayout()
    self.meta_data_label = QLabel()
    self.metadata_layout.addWidget(self.meta_data_label, alignment=Qt.AlignLeft)

    self.refresh_button = QPushButton("Refresh Data")
    self.metadata_layout.addWidget(self.refresh_button, alignment=Qt.AlignRight)
    self.main_layout.addLayout(self.metadata_layout)

    # Set up the main data table in the center of the widget
    self.data_vis_model = DataVisualizationModel()
    self.data_table = FrozenTableView(self.data_vis_model)
    self.main_layout.addWidget(self.data_table)

    self.setLayout(self.main_layout)

set_meta_data()

Populate the meta_data_label with the curve's unit (if any) and description.

Source code in trace/widgets/data_insight_tool.py
422
423
424
425
426
427
428
429
def set_meta_data(self) -> None:
    """Populate the meta_data_label with the curve's unit (if any) and description."""
    meta_labels = []
    if self.data_vis_model.unit:
        meta_labels.append(str(self.data_vis_model.unit))
    if self.data_vis_model.description:
        meta_labels.append(str(self.data_vis_model.description))
    self.meta_data_label.setText(", ".join(meta_labels))

combobox_to_curve(combobox_ind)

Convert an index for the pv_select_box combobox to the corresponding curve item from the curves model.

Parameters:

Name Type Description Default
combobox_ind int

The index for pv_select_box

required

Returns:

Type Description
ArchivePlotCurveItem

The curve item that corresponds to the PV chosen on the combobox

Source code in trace/widgets/data_insight_tool.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def combobox_to_curve(self, combobox_ind: int) -> ArchivePlotCurveItem:
    """Convert an index for the pv_select_box combobox to the corresponding
    curve item from the curves model.

    Parameters
    ----------
    combobox_ind : int
        The index for pv_select_box

    Returns
    -------
    ArchivePlotCurveItem
        The curve item that corresponds to the PV chosen on the combobox
    """
    if combobox_ind < 0 or self.pv_select_box.count() <= combobox_ind:
        combobox_ind = self.pv_select_box.currentIndex()
    return self.plot.curveAtIndex(combobox_ind)

update_pv_select_box()

Populate the pv_select_box with all curves in the plot. This is called when the plot is updated.

Source code in trace/widgets/data_insight_tool.py
449
450
451
452
453
454
455
456
457
458
@Slot()
def update_pv_select_box(self) -> None:
    """Populate the pv_select_box with all curves in the plot. This is called
    when the plot is updated.
    """
    self.pv_select_box.blockSignals(True)
    self.pv_select_box.clear()
    self.pv_select_box.blockSignals(False)
    curve_names = [c.address for c in self.plot._curves if isinstance(c, ArchivePlotCurveItem)]
    self.pv_select_box.addItems(curve_names)

export_data_to_file()

Prompt the user to select a file to export data to then prompt the DataVisualizationModel to export its data to the selected file.

Source code in trace/widgets/data_insight_tool.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
@Slot()
def export_data_to_file(self) -> None:
    """Prompt the user to select a file to export data to then prompt the
    DataVisualizationModel to export its data to the selected file.
    """
    file_name, extension_filter = QFileDialog.getSaveFileName(
        self,
        "Export Archive Data",
        Path(".").name,
        "Comma-Separated Values File (*.csv);;MAT-File (*.mat);;JSON File (*.json)",
    )
    if not extension_filter:
        return
    extension = re.search(r"\*(.*?)\)", extension_filter).group(1)
    file_name = Path(file_name).with_suffix(extension)

    try:
        self.data_vis_model.export_data(file_name, extension)
    except (ValueError, IsADirectoryError) as e:
        logger.error(str(e))
        QMessageBox.critical(self, "Error", str(e))

get_data(combobox_index=-1)

Prompt the DataVisualizationModel to fetch and save the data for the curve chosen by the user for the time range on the associated plot.

Parameters:

Name Type Description Default
combobox_index int

The index in the pv_select_box for the user selected curve, by default -1

-1
Source code in trace/widgets/data_insight_tool.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
@Slot()
@Slot(int)
def get_data(self, combobox_index: int = -1) -> None:
    """Prompt the DataVisualizationModel to fetch and save the data for the
    curve chosen by the user for the time range on the associated plot.

    Parameters
    ----------
    combobox_index : int, optional
        The index in the pv_select_box for the user selected curve, by default -1
    """
    if self.pv_select_box.count() < 1:
        logger.warning("Curves must be added to the main display before data can be requested.")
        return

    curve_item = self.combobox_to_curve(combobox_index)
    x_range = self.plot.getXAxis().range

    self.data_vis_model.set_all_data(curve_item, x_range)
    self.set_meta_data()
    self.loading_label.show()