import logging
from enum import Enum, auto
from typing import Any, Dict, Iterable, List
import qtawesome as qta
from qtpy import QtCore
from squirrel.model import PV
from squirrel.type_hints import TagSet
logger = logging.getLogger(__name__)
NO_DATA = "--"
PAGE_SIZE = 100
# Must be added outside class def to avoid processing as an enum member
PV_BROWSER_HEADER._strings = {
PV_BROWSER_HEADER.DEVICE: "Device",
PV_BROWSER_HEADER.PV: "Setpoint Addr",
PV_BROWSER_HEADER.READBACK: "Readback Addr",
PV_BROWSER_HEADER.TAGS: "Tags",
PV_BROWSER_HEADER.DELETE: "",
}
[docs]
class PVBrowserTableModel(QtCore.QAbstractTableModel):
def __init__(self, client, parent=None):
super().__init__(parent=parent)
self.client = client
self._data = []
self._canFetchMore = True
self._token = ""
def rowCount(self, _=QtCore.QModelIndex()) -> int:
return len(self._data)
def columnCount(self, _=QtCore.QModelIndex()) -> int:
return len(PV_BROWSER_HEADER)
def canFetchMore(self, parent=QtCore.QModelIndex()) -> bool:
return self._canFetchMore
def fetchMore(self, parent=QtCore.QModelIndex()) -> None:
fetched, self._token = self.client.backend.get_paged_pvs(PAGE_SIZE, token=self._token)
self._canFetchMore = len(fetched) == PAGE_SIZE
self.beginInsertRows(QtCore.QModelIndex(), len(self._data), len(self._data) + len(fetched) - 1)
self._data.extend(fetched)
self.endInsertRows()
def headerData(
self,
section: int,
orientation: QtCore.Qt.Orientation,
role: QtCore.Qt.ItemDataRole = QtCore.Qt.DisplayRole
) -> Any:
if orientation == QtCore.Qt.Horizontal:
if role == QtCore.Qt.DisplayRole:
return PV_BROWSER_HEADER(section).display_string()
return None
def data(
self,
index: QtCore.QModelIndex,
role: QtCore.Qt.ItemDataRole = QtCore.Qt.DisplayRole
) -> Any:
column = PV_BROWSER_HEADER(index.column())
if not index.isValid():
return None
elif role == QtCore.Qt.TextAlignmentRole and index.data() == NO_DATA:
return QtCore.Qt.AlignCenter
elif role == QtCore.Qt.ToolTipRole:
entry = self._data[index.row()]
if column == PV_BROWSER_HEADER.PV:
return entry.setpoint
elif column == PV_BROWSER_HEADER.READBACK and entry.readback is not None:
return entry.readback
elif role == QtCore.Qt.DisplayRole:
entry = self._data[index.row()]
if column == PV_BROWSER_HEADER.DEVICE:
return entry.device or NO_DATA
elif column == PV_BROWSER_HEADER.PV:
return entry.setpoint
elif column == PV_BROWSER_HEADER.READBACK:
return entry.readback or NO_DATA
elif column == PV_BROWSER_HEADER.TAGS:
return entry.tags if entry.tags else {}
elif role == QtCore.Qt.DecorationRole:
if column == PV_BROWSER_HEADER.DELETE:
return qta.icon("msc.trash")
elif role == QtCore.Qt.UserRole:
# Return the full entry object for further processing
entry = self._data[index.row()]
return entry
return None
def add_pv(self, pv: PV):
i = len(self._data)
self.beginInsertRows(QtCore.QModelIndex(), i, i)
self._data.append(pv)
self.endInsertRows()
def add_pvs(self, pvs: Iterable[PV]):
start = len(self._data)
self.beginInsertRows(QtCore.QModelIndex(), start, start + len(pvs) - 1)
self._data.extend(pvs)
self.endInsertRows()
def removeRow(self, row, parent=None):
index = self.index(row, PV_BROWSER_HEADER.PV.value)
pv = self.data(index, QtCore.Qt.UserRole)
try:
self.client.backend.archive_pv(pv.uuid)
except Exception as e:
logger.exception(e)
else:
parent = parent or QtCore.QModelIndex()
self.beginRemoveRows(parent, row, row)
del self._data[row]
self.endRemoveRows()
def refetch_row(self, row):
index = self.index(row, PV_BROWSER_HEADER.PV.value)
pv = self.data(index, QtCore.Qt.UserRole)
pv_name = pv.setpoint or pv.readback
matches = self.client.backend.get_pvs(search_string=pv_name)
for match in matches:
if match.uuid == pv.uuid:
refetched = match
self._data[row] = refetched
self.dataChanged.emit(index, index)
[docs]
class PVBrowserFilterProxyModel(QtCore.QSortFilterProxyModel):
def __init__(self, parent=None, tag_set: TagSet = None):
super().__init__(parent=parent)
self._search_string = ""
self.tag_set = tag_set or {} # Initialize with an empty tag dict
@property
def search_string(self) -> str:
"""Get the current search string for filtering."""
return self._search_string
@search_string.setter
def search_string(self, value: str) -> None:
"""Set the search string for filtering. Apply filter to model
immediately. The value is converted to lowercase for case-insensitive
matching.
Parameters
----------
value : str
The string to filter entries by.
"""
self._search_string = value.lower()
self.invalidateFilter()
[docs]
def set_tag_set(self, tag_set: TagSet) -> None:
"""Set the tag set for filtering. Apply filter to model immediately.
Parameters
----------
tag_set : TagSet
The set of tags to filter entries by.
"""
self.tag_set = tag_set
logger.debug(f"Tag set updated: {self.tag_set}")
self.invalidateFilter()
[docs]
def is_tag_subset(self, entry_tags: TagSet) -> bool:
"""Check if the entry's tags are a subset of the filter's tag set.
Parameters
----------
entry_tags : TagSet
The tags of the entry to check.
Returns
-------
bool
True if the entry's tags are a subset of the filter's tag set, False otherwise.
"""
is_subset = all(self.tag_set[group].issubset(entry_tags.get(group, set())) for group in self.tag_set)
logger.debug(f"Tag values subset: {is_subset}")
return is_subset
[docs]
def search_accepts_entry(self, entry: PV) -> bool:
"""Check if the entry matches the current search string. Searches
the device, setpoint, and readback fields.
Parameters
----------
entry : PV
Entry to be searched
Returns
-------
bool
True if the entry matches the search string, False otherwise
"""
if not self.search_string:
return True
search_device = self.search_string in (entry.device or NO_DATA).lower()
search_setpoint = self.search_string in (entry.setpoint or "").lower()
search_readback = self.search_string in (entry.readback or NO_DATA).lower()
return search_device or search_setpoint or search_readback
def filterAcceptsRow(self, source_row: int, source_parent: QtCore.QModelIndex) -> bool:
row_index = self.sourceModel().index(source_row, 0, source_parent)
entry = self.sourceModel().data(row_index, QtCore.Qt.UserRole)
if not entry:
return False
logger.debug(f"Filtering row {source_row} with entry: {entry}")
return self.is_tag_subset(entry.tags) and self.search_accepts_entry(entry)
[docs]
class CSVTableModel(QtCore.QAbstractTableModel):
def __init__(self, csv_data: List[Dict[str, Any]], backend_tag_def=None, parent=None):
super().__init__(parent=parent)
self._data = csv_data
self.backend_tag_def = backend_tag_def or {}
self.tag_def = self._filter_to_existing_backend_groups()
self._headers = self._build_headers()
self.rejected_groups = []
self.rejected_values = {}
self.validation_summary = self._create_validation_summary()
def _build_headers(self) -> List[str]:
"""Build headers from the first row of data"""
if not self._data:
return []
headers = ['Setpoint', 'Readback', 'Device', 'Description', 'Tags']
return headers
def rowCount(self, parent=QtCore.QModelIndex()) -> int:
return len(self._data)
def columnCount(self, parent=QtCore.QModelIndex()) -> int:
return len(self._headers)
def headerData(self, section: int, orientation: QtCore.Qt.Orientation, role: int = QtCore.Qt.DisplayRole):
if orientation == QtCore.Qt.Horizontal and role == QtCore.Qt.DisplayRole:
if 0 <= section < len(self._headers):
return self._headers[section]
return None
def data(self, index: QtCore.QModelIndex, role: int = QtCore.Qt.DisplayRole):
if not index.isValid() or not (0 <= index.row() < len(self._data)):
return None
row_data = self._data[index.row()]
column_name = self._headers[index.column()]
if role == QtCore.Qt.DisplayRole:
if column_name == 'Setpoint':
return row_data.get('Setpoint', '')
elif column_name == 'Readback':
return row_data.get('Readback', '')
elif column_name == 'Device':
return row_data.get('Device', '')
elif column_name == 'Description':
return row_data.get('Description', '')
elif column_name == 'Tags':
return self._convert_groups_to_tagset(row_data.get('groups', {}))
elif role == QtCore.Qt.ToolTipRole:
if column_name == 'Setpoint':
return f"Setpoint: {row_data.get('Setpoint', '')}"
elif column_name == 'Readback':
return f"Readback: {row_data.get('Readback', '')}"
elif column_name == 'Device':
return f"Device: {row_data.get('Device', '')}"
elif column_name == 'Description':
return f"Description: {row_data.get('Description', '')}"
elif column_name == 'Tags':
groups = row_data.get('groups', {})
tooltip_text = "Tags:\n"
for group_name, values in groups.items():
if values:
tooltip_text += f"{group_name}: {', '.join(values)}\n"
return tooltip_text.strip()
elif role == QtCore.Qt.UserRole:
return row_data
return None
def _filter_to_existing_backend_groups(self) -> Dict:
"""Only include CSV groups that exist in backend"""
if not self._data or not self.backend_tag_def:
return {}
csv_groups = {}
for row in self._data:
for group_name, values in row.get('groups', {}).items():
if group_name not in csv_groups:
csv_groups[group_name] = set()
csv_groups[group_name].update(values)
backend_group_names = {details[0]: tag_group_id for tag_group_id, details in self.backend_tag_def.items()}
filtered_tag_def = {}
self.rejected_groups = []
for csv_group_name in csv_groups.keys():
if csv_group_name in backend_group_names:
backend_id = backend_group_names[csv_group_name]
filtered_tag_def[backend_id] = self.backend_tag_def[backend_id]
logger.debug(f"Accepted CSV group '{csv_group_name}' -> backend group_id {backend_id}")
else:
self.rejected_groups.append(csv_group_name)
logger.warn(f"Rejected CSV group '{csv_group_name}' - not found in backend")
return filtered_tag_def
def _convert_groups_to_tagset(self, csv_groups: Dict[str, List[str]]) -> Dict[int, set]:
"""Convert CSV groups to TagSet format with value-level validation"""
tagset = {}
row_rejected_values = {}
for tag_group_id, (group_name, desc, choices) in self.tag_def.items():
csv_group_values = csv_groups.get(group_name, [])
tag_ids = set()
rejected_values_for_group = []
backend_values = set(choices.values())
# Validate each CSV value against backend choices
for csv_value in csv_group_values:
if csv_value in backend_values:
for tag_id, tag_name in choices.items():
if tag_name == csv_value:
tag_ids.add(tag_id)
logger.debug(f"Accepted value '{csv_value}' -> tag_id {tag_id}")
break
else:
rejected_values_for_group.append(csv_value)
logger.warn(f"Rejected value '{csv_value}' (not in backend choices)")
if rejected_values_for_group:
if group_name not in self.rejected_values:
self.rejected_values[group_name] = set()
self.rejected_values[group_name].update(rejected_values_for_group)
row_rejected_values[group_name] = rejected_values_for_group
tagset[tag_group_id] = tag_ids
if row_rejected_values:
logger.debug(f"Row rejected values: {row_rejected_values}")
return tagset
def _create_validation_summary(self) -> str:
"""Create a summary of validation results"""
summary_parts = []
if self.rejected_groups:
summary_parts.append(f"Rejected groups: {', '.join(self.rejected_groups)}")
if self.rejected_values:
value_parts = []
for group_name, rejected_vals in self.rejected_values.items():
value_parts.append(f"{group_name}: {', '.join(sorted(rejected_vals))}")
summary_parts.append(f"Rejected values: {' | '.join(value_parts)}")
return " • ".join(summary_parts) if summary_parts else "All groups and values are valid"
[docs]
def get_validation_results(self) -> Dict:
"""Return comprehensive validation results"""
return {
'rejected_groups': self.rejected_groups,
'rejected_values': dict(self.rejected_values),
'summary': self.validation_summary
}