Source code for pydm.utilities

import functools
import importlib
import importlib.util
import logging
import ntpath
import os
import platform
import shlex
import sys
import types
import uuid
import errno

from typing import List, Optional

from qtpy import QtCore, QtGui, QtWidgets

from . import colors, macro, shortcuts
from .connection import close_widget_connections, establish_widget_connections
from .iconfont import IconFont
from .remove_protocol import protocol_and_address, remove_protocol, parsed_address
from .units import convert, find_unit_options, find_unittype

logger = logging.getLogger(__name__)


[docs]def is_ssh_session(): """ Whether or not this is a SSH session. Returns ------- bool True if it is a ssh session, False otherwise. """ return os.getenv('SSH_CONNECTION') is not None
[docs]def setup_renderer(): """ This utility function reverts the renderer to Software rendering if it is running in a SSH session. """ if is_ssh_session(): logger.info('Using PyDM via SSH. Reverting to Software Rendering.') from qtpy.QtCore import QCoreApplication, Qt from qtpy.QtQuick import QQuickWindow, QSGRendererInterface QCoreApplication.setAttribute(Qt.AA_UseSoftwareOpenGL) QQuickWindow.setSceneGraphBackend(QSGRendererInterface.Software)
[docs]def is_pydm_app(app=None): """ Check whether or not `QApplication.instance()` is a PyDMApplication. Parameters ---------- app : QApplication, Optional The app to inspect. If no application is provided the current running `QApplication` will be queried. Returns ------- bool True if it is a PyDMApplication, False otherwise. """ from qtpy.QtWidgets import QApplication from ..application import PyDMApplication if app is None: app = QApplication.instance() if isinstance(app, PyDMApplication): return True else: return False
[docs]def is_qt_designer(): """ Check whether or not running inside Qt Designer. Returns ------- bool True if inside Designer, False otherwise. """ from ..qtdesigner import DesignerHooks return DesignerHooks().form_editor is not None
[docs]def get_designer_current_path(): """ Fetch the absolute path for the current active form at Qt Designer. Returns ------- path : str, None The absolute path for the current active form or None in case not available """ if not is_qt_designer(): return None from ..qtdesigner import DesignerHooks form_editor = DesignerHooks().form_editor win_manager = form_editor.formWindowManager() form_window = win_manager.activeFormWindow() if form_window is None and win_manager.formWindowCount() > 0: form_window = win_manager.formWindow(0) if form_window is not None: abs_dir = form_window.absoluteDir() if abs_dir: return abs_dir.absolutePath() return None
[docs]def path_info(path_str): """ Retrieve basic information about the given path. Parameters ---------- path_str : str The path from which to extract information. Returns ------- tuple base dir, file name, list of args """ if platform.system() == "Windows": os_path_mod = ntpath else: os_path_mod = os.path dir_name, other_parts = os_path_mod.split(path_str) split = shlex.split(other_parts) file_name = split.pop(0) args = split return dir_name, file_name, args
def _extensions(fname): name = os.path.basename(fname) MAX_ITER = 10 exts = [] for i in range(MAX_ITER): new_name, ext = os.path.splitext(name) if ext: exts.insert(0, ext) if name == new_name: break name = new_name return exts def _screen_file_extensions(preferred_extension): """ Return a prioritized list of screen file extensions. Include .ui & .py files (also .adl files if adl2pydm installed). Prefer extension as described by fname. """ extensions = [".py", ".ui"] # search for screens with these extensions try: import adl2pydm # proceed only if package is importable # noqa: F401 extensions.append(".adl") except ImportError: pass # don't search twice for preferred extension if preferred_extension in extensions: extensions.remove(preferred_extension) # search first for preferred extension extensions.insert(0, preferred_extension) return extensions
[docs]def find_file(fname, base_path=None, mode=None, extra_path=None, raise_if_not_found=False): """ Look for files at the search paths common to PyDM. The search order is as follows: * The ``base_path`` argument * Qt Designer Path - the path for the current form as reported by the designer * The current working directory * Directories listed in ``extra_path`` * Directories listed in the environment variable ``PYDM_DISPLAYS_PATH`` Parameters ---------- fname : str The file name. Environment variables, ~ and ~user constructs before search. base_path : str The directory name of a file pathname from a display, if any mode : int The mode required for the file, defaults to os.F_OK | os.R_OK. Which ensure that the file exists and we can read it. extra_path : list Additional paths to look for file. raise_if_not_found : bool Flag which if False will add a check that raises a FileNotFoundError instead of returning None when the file is not found. Returns ------- file_path : str Returns the file path or None in case the file was not found """ fname = os.path.expanduser(os.path.expandvars(fname)) if mode is None: mode = os.F_OK | os.R_OK x_path = [] if base_path: x_path.extend([os.path.abspath(base_path)]) if is_qt_designer(): designer_path = get_designer_current_path() if designer_path: x_path.extend([designer_path]) # Current working directory x_path.extend([os.getcwd()]) if extra_path: if not isinstance(extra_path, (list, tuple)): extra_path = [extra_path] extra_path = [os.path.expanduser(os.path.expandvars(x)) for x in extra_path] x_path.extend(extra_path) pydm_search_path = os.getenv("PYDM_DISPLAYS_PATH", None) if pydm_search_path: x_path.extend(pydm_search_path.split(os.pathsep)) for idx, path in enumerate(x_path): x_path[idx] = os.path.expanduser(os.path.expandvars(path)) root, ext = os.path.splitext(fname) # loop through the possible screen file extensions for e in _screen_file_extensions(ext): file_path = which(str(root) + str(e), mode=mode, pathext=e, extra_path=x_path) if file_path is not None: break # pick the first screen file found if raise_if_not_found: if not file_path: raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), fname) return file_path
[docs]def find_display_in_path(file, mode=None, path=None, pathext=None): """ Look for a display file in a given path. This is basically a wrapper on top of the ``which`` command defined below so we don't need to keep redefining the ``PYDM_DISPLAYS_PATH`` variable. Parameters ---------- file : str The file name. mode : int The mode required for the file, defaults to os.F_OK | os.R_OK. Which ensure that the file exists and we can read it. path : str The PATH string. Returns ------- str Returns the full path to the file or None in case it was not found. """ if pathext is None and sys.platform == "win32": pathext = ".ui" if path is None: path = os.getenv("PYDM_DISPLAYS_PATH", None) if mode is None: mode = os.F_OK | os.R_OK return which(file, mode, path, pathext=pathext)
[docs]def which(cmd, mode=os.F_OK | os.X_OK, path=None, pathext=None, extra_path=None): """Given a command, mode, and a PATH string, return the path which conforms to the given mode on the PATH, or None if there is no such file. `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result of os.environ.get("PATH"), or can be overridden with a custom search path. Note: This function was backported from the Python 3 source code and modified to deal with the case in which we WANT to look at the path even with a relative path. """ # Check that a given file can be accessed with the correct mode. # Additionally check that `file` is not a directory, as on Windows # directories pass the os.access check. def _access_check(fn, mode): return (os.path.exists(fn) and os.access(fn, mode) and not os.path.isdir(fn)) # If we're given a path with a directory part, look it up directly # rather than referring to PATH directories. This includes checking # relative to the current directory, e.g. ./script # if os.path.dirname(cmd): # if _access_check(cmd, mode): # return cmd # return None if path is None: path = os.environ.get("PATH", os.defpath) if not path: return None path = path.split(os.pathsep) if extra_path is not None: path = extra_path + path if sys.platform == "win32": # The current directory takes precedence on Windows. if os.curdir not in path: path.insert(0, os.curdir) # PATHEXT is necessary to check on Windows. if pathext is None: pathext = os.environ.get("PATHEXT", "") pathext = pathext.split(os.pathsep) # See if the given file matches any of the expected path # extensions. This will allow us to short circuit when given # "python.exe". If it does match, only test that one, otherwise we # have to try others. if any(cmd.lower().endswith(ext.lower()) for ext in pathext): files = [cmd] else: files = [cmd + ext for ext in pathext] else: # On other platforms you don't have things like PATHEXT to tell you # what file suffixes are executable, so just pass on cmd as-is. files = [cmd] seen = set() for dir_ in path: normdir = os.path.normcase(dir_) if normdir not in seen: seen.add(normdir) for thefile in files: name = os.path.join(dir_, thefile) if _access_check(name, mode): return name return None
[docs]def only_main_thread(func): """ Decorator that wraps a function which should only be executed at the Qt main thread. The decorator will log an error message and raise a RuntimeError if the decorated function is invoked from a thread other than the Qt main one. Parameters ---------- func : callable The function to wrap Returns ------- wrapper """ @functools.wraps(func) def wrapper(*args, **kwargs): main_t = QtWidgets.QApplication.instance().thread() curr_t = QtCore.QThread.currentThread() if curr_t != main_t: msg = "{}.{} can only be invoked from the main Qt thread.".format( func.__module__, func.__name__ ) logger.error(msg) raise RuntimeError(msg) return func(*args, **kwargs) if not callable(func): raise ValueError("Parameter must be a callable.") return wrapper
[docs]def log_failures( logger: logging.Logger, explanation: str = "Failed to run {func.__name__}", include_traceback: bool = False, level: int = logging.WARNING, ): """ Decorator that wraps a function to be run. Exceptions raised while executing that function will be logged. In case of an exception, the wrapper will return ``None``. Parameters ---------- logger : logging.Logger The logger instance to log messages to. explanation : str, optional The explanation message to include. Format arguments include: ``func``, ``args``, ``kwargs``, and the exception ``ex``. include_traceback : bool, optional Include traceback information in the log message. level : int, optional Logging level to use. """ def wrapper(func: callable): @functools.wraps(func) def wrapped(*args, **kwargs): try: return func(*args, **kwargs) except Exception as ex: msg = explanation.format( func=func, args=args, kwargs=kwargs, ex=ex ) if include_traceback: logger.log(level, msg, exc_info=ex) else: logger.log(level, msg) return None return wrapped return wrapper
[docs]def import_module_by_filename( source_filename: str, *, add_to_modules: bool = True ) -> types.ModuleType: """ For a given source filename, import it and search for objects. Parameters ---------- source_filename : str The source code filename. add_to_modules : bool, optional, keyword-only Add the imported module to ``sys.modules``. Defaults to ``True``. Returns ------- module : types.ModuleType The imported module. """ module_dir = os.path.dirname(os.path.abspath(source_filename)) if module_dir not in sys.path: sys.path.append(module_dir) module_name = str(uuid.uuid4()) spec = importlib.util.spec_from_file_location(module_name, source_filename) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) if add_to_modules: sys.modules[module_name] = module return module
[docs]def get_clipboard() -> Optional[QtGui.QClipboard]: """Get the clipboard instance. Requires a QApplication.""" app = QtWidgets.QApplication.instance() if app is None: return None return QtWidgets.QApplication.clipboard()
[docs]def get_clipboard_modes() -> List[int]: """ Get the clipboard modes for the current platform. Returns ------- list of int Qt-specific modes to try for interacting with the clipboard. """ clipboard = get_clipboard() if clipboard is None: return [] if platform.system() == "Linux": # Mode selection is only valid for X11. return [ QtGui.QClipboard.Selection, QtGui.QClipboard.Clipboard ] return [QtGui.QClipboard.Clipboard]
[docs]def copy_to_clipboard(text: str, *, quiet: bool = False): """ Copy ``text`` to the clipboard. Parameters ---------- text : str The text to copy to the clipboard. quiet : bool, optional, keyword-only If quiet is set, do not log the copied text. Defaults to False. """ clipboard = get_clipboard() if clipboard is None: return None for mode in get_clipboard_modes(): clipboard.setText(text, mode=mode) event = QtCore.QEvent(QtCore.QEvent.Clipboard) app = QtWidgets.QApplication.instance() if app is not None: app.sendEvent(clipboard, event) if not quiet: logger.warning( ( "Copied text to clipboard:\n" "-------------------------\n" "%s\n" "-------------------------\n" ), text )
[docs]def get_clipboard_text() -> str: """ Get ``text`` from the clipboard. If unavailable or unset, empty string. Returns ------- str The clipboard text, if available. """ clipboard = get_clipboard() if clipboard is None: return "" for mode in get_clipboard_modes(): text = clipboard.text(mode=mode) if text: return text return ""