Source code for pydm.data_plugins

"""
Loads all the data plugins available at the given PYDM_DATA_PLUGINS_PATH
environment variable and subfolders that follows the *_plugin.py and have
classes that inherits from the pydm.data_plugins.PyDMPlugin class.
"""
import inspect
import logging
import os
from collections import deque
from contextlib import contextmanager
from typing import Any, Dict, Generator, List, Optional, Type

import entrypoints
from qtpy.QtWidgets import QApplication

from .. import config
from ..utilities import import_module_by_filename, log_failures, parsed_address
from .plugin import PyDMPlugin

logger = logging.getLogger(__name__)
plugin_modules: Dict[str, PyDMPlugin] = {}
__read_only = False
global __CONNECTION_QUEUE__
__CONNECTION_QUEUE__ = None
global __DEFER_CONNECTIONS__
__DEFER_CONNECTIONS__ = False
__plugins_initialized = False


@contextmanager
def connection_queue(defer_connections=False):
    global __CONNECTION_QUEUE__
    global __DEFER_CONNECTIONS__
    if __CONNECTION_QUEUE__ is None:
        __CONNECTION_QUEUE__ = deque()
        __DEFER_CONNECTIONS__ = defer_connections
    yield
    if __DEFER_CONNECTIONS__:
        return
    establish_queued_connections()


def establish_queued_connections():
    global __DEFER_CONNECTIONS__
    global __CONNECTION_QUEUE__
    if __CONNECTION_QUEUE__ is None:
        return
    try:
        while __CONNECTION_QUEUE__ is not None and len(__CONNECTION_QUEUE__) > 0:
            channel = __CONNECTION_QUEUE__.popleft()
            establish_connection_immediately(channel)
            QApplication.instance().processEvents()
    except IndexError:
        pass
    finally:
        __CONNECTION_QUEUE__ = None
        __DEFER_CONNECTIONS__ = False


def establish_connection(channel):
    global __CONNECTION_QUEUE__
    if __CONNECTION_QUEUE__ is not None:
        __CONNECTION_QUEUE__.append(channel)
    else:
        establish_connection_immediately(channel)


def establish_connection_immediately(channel):
    plugin = plugin_for_address(channel.address)
    plugin.add_connection(channel)


def plugin_for_address(address: str) -> Optional[PyDMPlugin]:
    """
    Find the correct PyDMPlugin for a channel
    """
    # Check for a configured protocol
    try:
        protocol = parsed_address(address).scheme
    except AttributeError:
        protocol = None

    # Use default protocol
    if protocol is None and config.DEFAULT_PROTOCOL is not None:
        logger.debug("Using default protocol %s for %s", config.DEFAULT_PROTOCOL, address)
        # If no protocol was specified, and the default protocol
        # environment variable is specified, try to use that instead.
        protocol = config.DEFAULT_PROTOCOL

    # Load proper plugin module
    if protocol:
        initialize_plugins_if_needed()
        try:
            return plugin_modules[str(protocol)]
        except KeyError:
            logger.exception("Could not find protocol for %r", address)
    # Catch all in case of improper plugin specification
    logger.error(
        "Channel {addr} did not specify a valid protocol "
        "and no default protocol is defined. This channel "
        "will receive no data. To specify a default protocol, "
        "set the PYDM_DEFAULT_PROTOCOL environment variable."
        "".format(addr=address)
    )

    return None


[docs]def add_plugin(plugin: Type[PyDMPlugin]) -> Optional[PyDMPlugin]: """ Add a PyDM plugin to the global registry of protocol vs. plugins Parameters ---------- plugin : PyDMPlugin type The class of plugin to instantiate Returns ------- plugin : PyDMPlugin, optional The instantiated PyDMPlugin. If instantiation failed, will return None. """ # Warn users if we are overwriting a protocol which already has a plugin if plugin.protocol in plugin_modules: logger.warning( "Replacing %s plugin with %s for use with protocol %s", plugin, plugin_modules[plugin.protocol], plugin.protocol, ) try: instance = plugin() except Exception: logger.exception(f"Data plugin: {plugin} failed to load and will not be available for use!") return None plugin_modules[plugin.protocol] = instance return instance
@log_failures( logger, explanation=("Unable to import plugin file: {args[0]}. " "This plugin will be skipped."), include_traceback=True, ) def _get_plugins_from_source(source_filename: str) -> List[Type[PyDMPlugin]]: """ For a given source filename, find PyDMPlugin classes. Parameters ---------- source_filename : str The source code filename. Returns ------- plugins : list of PyDMPlugin classes The plugin classes. """ module = import_module_by_filename(source_filename) return list(set(obj for _, obj in inspect.getmembers(module) if _is_valid_plugin_class(obj))) def find_plugins_from_path( path: str, token: str = config.DATA_PLUGIN_SUFFIX ) -> Generator[Type[PyDMPlugin], None, None]: """ Yield all data plugins found in the provided path. Parameters ---------- path : str The path to look for plugins. token : str, optional The suffix that plugin files are expected to have. """ for root, _, files in os.walk(path): if root.split(os.path.sep)[-1].startswith("__"): continue logger.debug("Looking for PyDM Data Plugins at: %s", root) for name in files: if name.endswith(token): yield from _get_plugins_from_source(os.path.join(root, name)) def find_plugins_from_entrypoints( key: str = config.ENTRYPOINT_DATA_PLUGIN, ) -> Generator[Type[PyDMPlugin], None, None]: """ Yield all PyDMPlugin classes specified by entrypoints. Uses ``entrypoints`` to find packaged external tools in packages that configure the ``pydm.data_plugin`` entrypoint. Parameters ---------- key : str, optional The entrypoint key. """ for entry in entrypoints.get_group_all(key): logger.debug("Found data plugin entrypoint: %s", entry.name) try: plugin_cls = entry.load() except Exception as ex: logger.exception("Failed to load %s entry %s: %s", key, entry.name, ex) continue if not _is_valid_plugin_class(plugin_cls): logger.warning("Invalid plugin class specified in entrypoint " "%s: %s", entry.name, plugin_cls) continue yield plugin_cls def _is_valid_plugin_class(obj: Any) -> bool: """Is the object a data plugin class?""" return inspect.isclass(obj) and issubclass(obj, PyDMPlugin) and obj is not PyDMPlugin def load_plugins_from_entrypoints(key: str = config.ENTRYPOINT_DATA_PLUGIN) -> Dict[str, PyDMPlugin]: """ Load plugins from file locations that match a specific token Parameters ---------- key : str, optional The entrypoint key. Returns ------- plugins : dict Dictionary of plugins add from this folder """ added_plugins = dict() for plugin in find_plugins_from_entrypoints(key): if not plugin.protocol: logger.warning( "No protocol specified for data plugin: %s.%s", plugin.__module__, plugin, ) continue added_plugin = add_plugin(plugin) if added_plugin is not None: added_plugins[plugin.protocol] = added_plugin return added_plugins
[docs]def load_plugins_from_path(locations: List[str], token: str = config.DATA_PLUGIN_SUFFIX) -> Dict[str, PyDMPlugin]: """ Load plugins from file locations that match a specific token Parameters ---------- locations : list of str List of file locations token : str Phrase that must match the end of the filename for it to be checked for PyDMPlugins Returns ------- plugins : dict Dictionary of plugins add from this folder """ added_plugins = dict() for loc in locations: for plugin in find_plugins_from_path(loc, token=token): if not plugin.protocol: logger.warning( "No protocol specified for data plugin: %s.%s", plugin.__module__, plugin, ) continue added_plugin = add_plugin(plugin) if added_plugin is not None: added_plugins[plugin.protocol] = added_plugin return added_plugins
def is_read_only(): """ Check whether or not the app is running with the read only flag set. Returns ------- bool True if read only. False otherwise. """ return __read_only def set_read_only(read_only): """ Set the read only flag for the data plugins. Parameters ---------- read_only : bool """ global __read_only __read_only = read_only if read_only: logger.info("Running PyDM in Read Only mode.") def initialize_plugins_if_needed(): global __plugins_initialized if __plugins_initialized: return __plugins_initialized = True # Load the data plugins from PYDM_DATA_PLUGINS_PATH logger.debug("*" * 80) logger.debug("* Loading PyDM Data Plugins") logger.debug("*" * 80) path = os.getenv("PYDM_DATA_PLUGINS_PATH", None) if path is None: locations = [] else: locations = path.split(os.pathsep) # Ensure that we first visit the local data_plugins location plugin_dir = os.path.dirname(os.path.realpath(__file__)) locations.insert(0, plugin_dir) load_plugins_from_path(locations) load_plugins_from_entrypoints(config.ENTRYPOINT_DATA_PLUGIN)