Source code for pydm.data_plugins.plugin

import functools
import numpy as np
import weakref
import threading

from typing import Optional, Callable
from urllib.parse import ParseResult

from ..utilities.remove_protocol import parsed_address
from ..widgets import PyDMChannel
from qtpy.compat import isalive
from qtpy.QtCore import Signal, QObject, Qt
from qtpy.QtWidgets import QApplication
from .. import config


[docs]class PyDMConnection(QObject): new_value_signal = Signal((float,), (int,), (str,), (bool,), (object,)) connection_state_signal = Signal(bool) new_severity_signal = Signal(int) write_access_signal = Signal(bool) enum_strings_signal = Signal(tuple) unit_signal = Signal(str) prec_signal = Signal(int) upper_ctrl_limit_signal = Signal((float,), (int,)) lower_ctrl_limit_signal = Signal((float,), (int,)) upper_alarm_limit_signal = Signal((float,), (int,)) lower_alarm_limit_signal = Signal((float,), (int,)) upper_warning_limit_signal = Signal((float,), (int,)) lower_warning_limit_signal = Signal((float,), (int,)) timestamp_signal = Signal(float) def __init__(self, channel, address, protocol=None, parent=None): super().__init__(parent) self.protocol = protocol self.address = address self.connected = False self.value = None self.listener_count = 0 self.app = QApplication.instance() def add_listener(self, channel): self.listener_count = self.listener_count + 1 if channel.connection_slot is not None: self.connection_state_signal.connect(channel.connection_slot, Qt.QueuedConnection) if channel.value_slot is not None: for signal_type in (int, float, str, bool, object): try: self.new_value_signal[signal_type].connect(channel.value_slot, Qt.QueuedConnection) # If the signal exists (always does in this case since we define it for all 'signal_type' values above) # but doesn't match slot, TypeError is thrown. We also don't need to catch KeyError/IndexError here, # since those are only thrown when signal type doesn't exist. except TypeError: pass if channel.severity_slot is not None: self.new_severity_signal.connect(channel.severity_slot, Qt.QueuedConnection) if channel.write_access_slot is not None: self.write_access_signal.connect(channel.write_access_slot, Qt.QueuedConnection) if channel.enum_strings_slot is not None: self.enum_strings_signal.connect(channel.enum_strings_slot, Qt.QueuedConnection) if channel.unit_slot is not None: self.unit_signal.connect(channel.unit_slot, Qt.QueuedConnection) if channel.upper_ctrl_limit_slot is not None: self.upper_ctrl_limit_signal.connect(channel.upper_ctrl_limit_slot, Qt.QueuedConnection) if channel.lower_ctrl_limit_slot is not None: self.lower_ctrl_limit_signal.connect(channel.lower_ctrl_limit_slot, Qt.QueuedConnection) if channel.upper_alarm_limit_slot is not None: self.upper_alarm_limit_signal.connect(channel.upper_alarm_limit_slot, Qt.QueuedConnection) if channel.lower_alarm_limit_slot is not None: self.lower_alarm_limit_signal.connect(channel.lower_alarm_limit_slot, Qt.QueuedConnection) if channel.upper_warning_limit_slot is not None: self.upper_warning_limit_signal.connect(channel.upper_warning_limit_slot, Qt.QueuedConnection) if channel.lower_warning_limit_slot is not None: self.lower_warning_limit_signal.connect(channel.lower_warning_limit_slot, Qt.QueuedConnection) if channel.prec_slot is not None: self.prec_signal.connect(channel.prec_slot, Qt.QueuedConnection) if channel.timestamp_slot is not None: self.timestamp_signal.connect(channel.timestamp_slot, Qt.QueuedConnection)
[docs] def remove_listener(self, channel, destroying: Optional[bool] = False) -> None: """ Removes a listener from this PyDMConnection. If there are no more listeners remaining after removal, then the PyDMConnection will be closed. Parameters ---------- channel: PyDMChannel The PyDMChannel containing the signals/slots that were being used to listen to the connected address. destroying: bool, optional Should be set to True if this method is being invoked from a flow in which the PyDMWidget using this channel is being destroyed. Since Qt will automatically handle the disconnect of signals/slots when a QObject is destroyed, setting this to True ensures we do not try to do the disconnection a second time. If set to False, any active signals/slots on the channel will be manually disconnected here. """ if self._should_disconnect(channel.connection_slot, destroying): try: self.connection_state_signal.disconnect(channel.connection_slot) except TypeError: pass if self._should_disconnect(channel.value_slot, destroying): for signal_type in (int, float, str, bool, object): try: self.new_value_signal[signal_type].disconnect(channel.value_slot) # If the signal exists (always does in this case since we define it for all 'signal_type' earlier) # but doesn't match slot, TypeError is thrown. We also don't need to catch KeyError/IndexError here, # since those are only thrown when signal type doesn't exist. except TypeError: pass if self._should_disconnect(channel.severity_slot, destroying): try: self.new_severity_signal.disconnect(channel.severity_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.write_access_slot, destroying): try: self.write_access_signal.disconnect(channel.write_access_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.enum_strings_slot, destroying): try: self.enum_strings_signal.disconnect(channel.enum_strings_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.unit_slot, destroying): try: self.unit_signal.disconnect(channel.unit_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.upper_ctrl_limit_slot, destroying): try: self.upper_ctrl_limit_signal.disconnect(channel.upper_ctrl_limit_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.lower_ctrl_limit_slot, destroying): try: self.lower_ctrl_limit_signal.disconnect(channel.lower_ctrl_limit_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.upper_alarm_limit_slot, destroying): try: self.upper_alarm_limit_signal.disconnect(channel.upper_alarm_limit_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.lower_alarm_limit_slot, destroying): try: self.lower_alarm_limit_signal.disconnect(channel.lower_alarm_limit_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.upper_warning_limit_slot, destroying): try: self.upper_warning_limit_signal.disconnect(channel.upper_warning_limit_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.lower_warning_limit_slot, destroying): try: self.lower_warning_limit_signal.disconnect(channel.lower_warning_limit_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.prec_slot, destroying): try: self.prec_signal.disconnect(channel.prec_slot) except (KeyError, TypeError): pass if self._should_disconnect(channel.timestamp_slot, destroying): try: self.timestamp_signal.disconnect(channel.timestamp_slot) except (KeyError, TypeError): pass if not destroying and channel.value_signal is not None and hasattr(self, "put_value"): for signal_type in (str, int, float, np.ndarray, dict): try: channel.value_signal[signal_type].disconnect(self.put_value) # When signal type can't be found, PyQt5 throws KeyError here, but PySide6 index error. # If signal type exists but doesn't match the slot, TypeError gets thrown. except (KeyError, IndexError, TypeError): pass self.listener_count = self.listener_count - 1 if self.listener_count < 1: self.close()
@staticmethod def _should_disconnect(slot: Callable, destroying: bool): """Return True if the signal/slot should be disconnected, False otherwise""" if slot is None: # Nothing to do if the slot does not exist return False if not destroying: # If the PyDMWidget associated with this slot is not being destroyed, then we do need to # manually disconnect the signal/slot return True if isinstance(slot, functools.partial): # If the slot was created as a partial, we also need to manually disconnect it even if the PyDMWidget # is being destroyed since Qt does not handle automatic disconnection when a partial is used return True # This means we are destroying the PyDMWidget and the slot is not a partial, so let Qt # handle the disconnect for us return False def close(self): pass
[docs]class PyDMPlugin(object): protocol = None connection_class = PyDMConnection designer_online_by_default = False def __init__(self): self.connections = {} self.channels = weakref.WeakSet() self.lock = threading.Lock() @staticmethod def get_parsed_address(channel: PyDMChannel) -> ParseResult: parsed_addr = parsed_address(channel.address) return parsed_addr @staticmethod def get_full_address(channel: PyDMChannel) -> Optional[str]: parsed_addr = parsed_address(channel.address) if parsed_addr: full_addr = parsed_addr.netloc + parsed_addr.path else: full_addr = None return full_addr @staticmethod def get_address(channel: PyDMChannel) -> str: parsed_addr = parsed_address(channel.address) addr = parsed_addr.netloc return addr @staticmethod def get_subfield(channel: PyDMChannel) -> Optional[str]: parsed_addr = parsed_address(channel.address) if parsed_addr: subfield = parsed_addr.path if subfield != "": subfield = subfield[1:].split("/") else: subfield = None return subfield @staticmethod def get_connection_id(channel: PyDMChannel) -> Optional[str]: return PyDMPlugin.get_full_address(channel) def add_connection(self, channel: PyDMChannel) -> None: from pydm.utilities import is_qt_designer with self.lock: connection_id = self.get_connection_id(channel) address = self.get_address(channel) # If this channel is already connected to this plugin lets ignore if channel in self.channels: return if is_qt_designer() and not config.DESIGNER_ONLINE and not self.designer_online_by_default: return self.channels.add(channel) if connection_id in self.connections: self.connections[connection_id].add_listener(channel) else: self.connections[connection_id] = self.connection_class(channel, address, self.protocol) def remove_connection(self, channel: PyDMChannel, destroying: bool = False) -> None: with self.lock: connection_id = self.get_connection_id(channel) if connection_id in self.connections and channel in self.channels: self.connections[connection_id].remove_listener(channel, destroying=destroying) self.channels.remove(channel) if self.connections[connection_id].listener_count < 1: if isalive(self.connections[connection_id]): self.connections[connection_id].deleteLater() del self.connections[connection_id]