Source code for pyrogue.protocols.epicsV4

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# Module containing epics support classes and routines
# TODO:
#   Not clear on to force a read on get
# -----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
# -----------------------------------------------------------------------------

import pyrogue
import time
import warnings
import re
import numpy as np

try:
    import p4p.server.thread
    import p4p.nt
except Exception:
    warnings.warn(
        "P4P is not installed.\n\n"
        "To install with Conda (preferred in Conda environments):\n"
        "    conda install -c conda-forge p4p\n\n"
        "To install with pip in a Conda environment:\n"
        "    python -m pip install p4p\n"
        "(Using 'python -m pip' ensures pip installs into the currently active Conda env.)\n\n"
        "To install in a Python 3 virtualenv:\n"
        "    pip install p4p\n\n"
        "Note: You may also need to ensure EPICS is set up on your system for P4P to work properly."
    )

def EpicsConvStatus(varValue):
    if varValue.status == "AlarmLoLo":
        return 5  # epicsAlarmLoLo
    elif varValue.status == "AlarmLow":
        return 6  # epicsAlarmLow
    elif varValue.status == "AlarmHiHi":
        return 3  # epicsAlarmHiHi
    elif varValue.status == "AlarmHigh":
        return 4  # epicsAlarmHigh
    else:
        return 0


def EpicsConvSeverity(varValue):
    if varValue.severity == "AlarmMinor":
        return 1  # epicsSevMinor
    elif varValue.severity == "AlarmMajor":
        return 2  # epicsSevMajor
    else:
        return 0


class EpicsPvHandler(p4p.server.thread.Handler):
    def __init__(self, valType, var, log):
        self._valType = valType
        self._var = var
        self._log = log

    def put(self, pv, op):
        if self._var.isVariable and (
                self._var.mode == 'RW' or self._var.mode == 'WO'):
            try:
                if self._valType == 'enum':
                    self._var.setDisp(str(op.value()))
                elif self._valType == 's':
                    self._var.setDisp(op.value().raw.value)
                elif self._valType == 'ndarray':
                    # This seems wrong!
                    self._var.set(op._op.value().value.copy())
                else:
                    self._var.set(op.value().raw.value)

                # Need enum processing
                op.done()

            except Exception as msg:
                self._log.error(f"Got error on put: {msg}")
                op.done(error=str(msg))

        else:
            op.done(error='Put Not Supported On This Variable')

    def rpc(self, pv, op):
        if self._var.isCommand:
            val = op.value().query

            try:
                if 'arg' in val:
                    ret = self._var(val.arg)
                else:
                    ret = self._var()

                if ret is None:
                    ret = 'None'

                v = p4p.Value(
                    p4p.Type([('value', self._valType)]), {'value': ret})
                op.done(value=(v))

            except Exception as msg:
                op.done(error=msg)

        else:
            op.done(error='Rpc Not Supported On Variables')

    def onFirstConnect(self, pv):  # may be omitted
        # print(f"PV First connect called pv={pv}")
        pass

    def onLastDisconnect(self, pv):  # may be omitted
        # print(f"PV Last Disconnect called pv={pv}")
        pass


[docs] class EpicsPvHolder(object): def __init__(self, provider, name, var, log): self._var = var self._name = name self._log = log self._pv = None if self._var.isCommand: typeStr = self._var.retTypeStr else: typeStr = self._var.typeStr # Convert valType if var.nativeType is np.ndarray: self._valType = 'ndarray' # self._valType = 's' elif self._var.disp == 'enum': self._valType = 'enum' elif typeStr is None or var.nativeType is list or var.nativeType is dict: self._valType = 's' else: # Unsigned if 'UInt' in typeStr: m = re.search('^UInt(\\d+)\\.*', typeStr) if m is not None and m.lastindex == 1: bits = int(m[1]) if bits <= 8: self._valType = 'B' elif bits <= 16: self._valType = 'H' elif bits <= 32: self._valType = 'I' elif bits <= 64: self._valType = 'L' else: self._valType = 's' else: self._valType = 'L' # Signed elif 'int' in typeStr or 'Int' in typeStr: m = re.search('^Int(\\d+)\\.*', typeStr) if m is not None and m.lastindex == 1: bits = int(m[1]) if bits <= 8: self._valType = 'b' elif bits <= 16: self._valType = 'h' elif bits <= 32: self._valType = 'i' elif bits <= 64: self._valType = 'l' else: self._valType = 's' else: self._valType = 'l' # Float elif 'float' in typeStr or 'Float32' in typeStr: self._valType = 'f' # Double elif 'Double64' in typeStr or 'Float64' in typeStr: self._valType = 'd' # Default to string else: self._valType = 's' # Get initial value varVal = var.getVariableValue(read=False) # Override LinkerVariables with init=None if varVal.valueDisp is None: varVal.valueDisp = '' self._log.info( "Adding {} with type {} init={}".format( self._name, self._valType, varVal.valueDisp)) try: if self._valType == 'ndarray': # If a 1D array is encountered, use a NTScalar. Note, if an # NTScalar is used, the values will be automatically converted # to doubles. if varVal.value.ndim == 1: nt = p4p.nt.NTScalar('ad') else: nt = p4p.nt.NTNDArray() iv = varVal.value elif self._valType == 'enum': nt = p4p.nt.NTEnum( display=False, control=False, valueAlarm=False) enum = list(self._var.enum.values()) iv = {'choices': enum, 'index': enum.index(varVal.valueDisp)} elif self._valType == 's': nt = p4p.nt.NTScalar( self._valType, display=False, control=False, valueAlarm=False) iv = nt.wrap(varVal.valueDisp) else: nt = p4p.nt.NTScalar( self._valType, display=True, control=True, valueAlarm=True) # print(f"Setting value {varVal.value} to {self._name}") iv = nt.wrap(varVal.value) except Exception as e: raise Exception( "Failed to add {} with type {} ndtype={} init={}. Error={}".format( self._name, self._valType, self._var.ndType, varVal.valueDisp, e)) # Setup variable try: self._pv = p4p.server.thread.SharedPV(queue=None, handler=EpicsPvHandler( self._valType, self._var, self._log), initial=iv, nt=nt, options={}) except Exception as e: raise Exception( "Failed to start {} with type {} ndtype={} init={}. Error={}".format( self._name, self._valType, self._var.ndType, varVal.valueDisp, e)) provider.add(self._name, self._pv) self._var.addListener(self._varUpdated) # Update fields in numeric types if self._valType != 'enum' and self._valType != 's' and self._valType != 'ndarray': curr = self._pv.current() curr.raw.value = varVal.value curr.raw.alarm.status = EpicsConvStatus(varVal) curr.raw.alarm.severity = EpicsConvSeverity(varVal) curr.raw.display.description = self._var.description if self._var.units is not None: curr.raw.display.units = self._var.units if self._var.maximum is not None: curr.raw.display.limitHigh = self._var.maximum if self._var.minimum is not None: curr.raw.display.limitLow = self._var.minimum if self._var.lowWarning is not None: curr.raw.valueAlarm.lowWarningLimit = self._var.lowWarning if self._var.lowAlarm is not None: curr.raw.valueAlarm.lowAlarmLimit = self._var.lowAlarm if self._var.highWarning is not None: curr.raw.valueAlarm.highWarningLimit = self._var.highWarning if self._var.highAlarm is not None: curr.raw.valueAlarm.highAlarmLimit = self._var.highAlarm # Precision ? self._pv.post(curr) def _varUpdated(self, path, value): if self._valType == 'enum' or self._valType == 's': self._pv.post(value.valueDisp) elif self._valType == 'ndarray': self._pv.post(value.value) else: curr = self._pv.current() curr.raw.value = value.value curr.raw.alarm.status = EpicsConvStatus(value) curr.raw.alarm.severity = EpicsConvSeverity(value) curr.raw['timeStamp.secondsPastEpoch'], curr.raw['timeStamp.nanoseconds'] = divmod( float(time.time_ns()), 1.0e9) self._pv.post(curr)
[docs] class EpicsPvServer(object): """ Class to contain an epics PV server """ def __init__(self, *, base, root, incGroups, excGroups, pvMap=None): self._root = root self._base = base self._log = pyrogue.logInit(cls=self) self._server = None self._incGroups = incGroups self._excGroups = excGroups self._pvMap = pvMap self._started = False self._provider = p4p.server.StaticProvider(__name__) def _stop(self): if self._server is not None: self._server.stop() def _start(self): if self._started: return self._started = True self._stop() self._list = {} if not self._root.running: raise Exception( "Epics can not be setup on a tree which is not started") # Figure out mapping mode if self._pvMap is None: doAll = True self._pvMap = {} else: doAll = False # Create PVs for v in self._root.variableList: eName = None if doAll: if v.filterByGroup(self._incGroups, self._excGroups): eName = self._base + ':' + v.path.replace('.', ':') self._pvMap[v.path] = eName elif v.path in self._pvMap: eName = self._pvMap[v.path] if eName is not None: pvh = EpicsPvHolder(self._provider, eName, v, self._log) self._list[v] = pvh # Check for missing map entries if len(self._pvMap) != len(self._list): for k, v in self._pvMap.items(): if k not in self._list: self._log.error( f"Failed to find {k} from P4P mapping in Rogue tree!") self._server = p4p.server.Server(providers=[self._provider]) def list(self): return self._pvMap def dump(self, fname=None): if fname is not None: try: with open(fname, 'w') as f: for k, v in self._pvMap.items(): print("{} -> {}".format(v, k), file=f) except Exception: raise Exception("Failed to dump epics map to {}".format(fname)) else: for k, v in self._pvMap.items(): print("{} -> {}".format(v, k))