#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# Module containing epics support classes and routines
# TODO:
# Not clear on to force a read on get
# -----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
# -----------------------------------------------------------------------------
import pyrogue
import time
import warnings
import re
import numpy as np
from typing import Any
try:
import p4p.server.thread
import p4p.nt
except Exception:
warnings.warn(
"P4P is not installed.\n\n"
"To install with Conda (preferred in Conda environments):\n"
" conda install -c conda-forge p4p\n\n"
"To install with pip in a Conda environment:\n"
" python -m pip install p4p\n"
"(Using 'python -m pip' ensures pip installs into the currently active Conda env.)\n\n"
"To install in a Python 3 virtualenv:\n"
" pip install p4p\n\n"
"Note: You may also need to ensure EPICS is set up on your system for P4P to work properly."
)
def EpicsConvStatus(varValue: pyrogue.VariableValue) -> int:
"""
Convert PyRogue variable status to EPICS alarm status code.
Parameters
----------
varValue : object
PyRogue variable value with ``status`` attribute.
Returns
-------
int
EPICS alarm status code (0=no alarm, 3=HiHi, 4=High, 5=LoLo, 6=Low).
"""
if varValue.status == "AlarmLoLo":
return 5 # epicsAlarmLoLo
elif varValue.status == "AlarmLow":
return 6 # epicsAlarmLow
elif varValue.status == "AlarmHiHi":
return 3 # epicsAlarmHiHi
elif varValue.status == "AlarmHigh":
return 4 # epicsAlarmHigh
else:
return 0
def EpicsConvSeverity(varValue: pyrogue.VariableValue) -> int:
"""
Convert PyRogue variable severity to EPICS alarm severity code.
Parameters
----------
varValue : object
PyRogue variable value with ``severity`` attribute.
Returns
-------
int
EPICS alarm severity code (0=no alarm, 1=minor, 2=major).
"""
if varValue.severity == "AlarmMinor":
return 1 # epicsSevMinor
elif varValue.severity == "AlarmMajor":
return 2 # epicsSevMajor
else:
return 0
class EpicsPvHandler(p4p.server.thread.Handler):
"""Handler for EPICS PV put/get and RPC operations.
Parameters
----------
valType : str
EPICS value type string (for example ``'i'``, ``'f'``, ``'enum'``).
var : pyrogue.BaseVariable
PyRogue variable/command bound to the PV.
log : object
Logger instance.
"""
def __init__(self, valType: str, var: pyrogue.BaseVariable, log: Any) -> None:
self._valType = valType
self._var = var
self._log = log
def put(self, pv: Any, op: Any) -> None:
"""Handle EPICS put requests for writable variables and commands."""
if self._var.isVariable and (
self._var.mode == 'RW' or self._var.mode == 'WO'):
try:
if self._valType == 'enum':
self._var.setDisp(str(op.value()))
elif self._valType == 's':
self._var.setDisp(op.value().raw.value)
elif self._valType == 'ndarray':
# This seems wrong!
self._var.set(op._op.value().value.copy())
else:
self._var.set(op.value().raw.value)
# Need enum processing
op.done()
except Exception as msg:
self._log.error(f"Got error on put: {msg}")
op.done(error=str(msg))
else:
op.done(error='Put Not Supported On This Variable')
def rpc(self, pv: Any, op: Any) -> None:
"""Handle EPICS RPC requests for command-backed PVs."""
if self._var.isCommand:
val = op.value().query
try:
if 'arg' in val:
ret = self._var(val.arg)
else:
ret = self._var()
if ret is None:
ret = 'None'
v = p4p.Value(
p4p.Type([('value', self._valType)]), {'value': ret})
op.done(value=(v))
except Exception as msg:
op.done(error=msg)
else:
op.done(error='Rpc Not Supported On Variables')
def onFirstConnect(self, pv: Any) -> None: # may be omitted
"""Called when the first client connects to the PV."""
# print(f"PV First connect called pv={pv}")
pass
def onLastDisconnect(self, pv: Any) -> None: # may be omitted
"""Called when the last client disconnects from the PV."""
# print(f"PV Last Disconnect called pv={pv}")
pass
[docs]
class EpicsPvHolder(object):
"""Holds and manages a single EPICS PV backed by a PyRogue variable.
Parameters
----------
provider : object
P4P static provider to add the PV to.
name : str
EPICS PV name.
var : object
PyRogue variable/command to expose.
log : object
Logger instance.
"""
def __init__(self, provider: Any, name: str, var: Any, log: Any) -> None:
self._var = var
self._name = name
self._log = log
self._pv = None
if self._var.isCommand:
typeStr = self._var.retTypeStr
else:
typeStr = self._var.typeStr
# Convert valType
if var.nativeType is np.ndarray:
self._valType = 'ndarray'
# self._valType = 's'
elif self._var.disp == 'enum':
self._valType = 'enum'
elif typeStr is None or var.nativeType is list or var.nativeType is dict:
self._valType = 's'
else:
# Unsigned
if 'UInt' in typeStr:
m = re.search('^UInt(\\d+)\\.*', typeStr)
if m is not None and m.lastindex == 1:
bits = int(m[1])
if bits <= 8:
self._valType = 'B'
elif bits <= 16:
self._valType = 'H'
elif bits <= 32:
self._valType = 'I'
elif bits <= 64:
self._valType = 'L'
else:
self._valType = 's'
else:
self._valType = 'L'
# Signed
elif 'int' in typeStr or 'Int' in typeStr:
m = re.search('^Int(\\d+)\\.*', typeStr)
if m is not None and m.lastindex == 1:
bits = int(m[1])
if bits <= 8:
self._valType = 'b'
elif bits <= 16:
self._valType = 'h'
elif bits <= 32:
self._valType = 'i'
elif bits <= 64:
self._valType = 'l'
else:
self._valType = 's'
else:
self._valType = 'l'
# Float
elif 'float' in typeStr or 'Float32' in typeStr:
self._valType = 'f'
# Double
elif 'Double64' in typeStr or 'Float64' in typeStr:
self._valType = 'd'
# Default to string
else:
self._valType = 's'
# Get initial value
varVal = var.getVariableValue(read=False)
# Override LinkerVariables with init=None
if varVal.valueDisp is None:
varVal.valueDisp = ''
self._log.info(
"Adding {} with type {} init={}".format(
self._name,
self._valType,
varVal.valueDisp))
try:
if self._valType == 'ndarray':
# If a 1D array is encountered, use a NTScalar. Note, if an
# NTScalar is used, the values will be automatically converted
# to doubles.
if varVal.value.ndim == 1:
nt = p4p.nt.NTScalar('ad')
else:
nt = p4p.nt.NTNDArray()
iv = varVal.value
elif self._valType == 'enum':
nt = p4p.nt.NTEnum(
display=False,
control=False,
valueAlarm=False)
enum = list(self._var.enum.values())
iv = {'choices': enum, 'index': enum.index(varVal.valueDisp)}
elif self._valType == 's':
nt = p4p.nt.NTScalar(
self._valType,
display=False,
control=False,
valueAlarm=False)
iv = nt.wrap(varVal.valueDisp)
else:
nt = p4p.nt.NTScalar(
self._valType,
display=True,
control=True,
valueAlarm=True)
# print(f"Setting value {varVal.value} to {self._name}")
iv = nt.wrap(varVal.value)
except Exception as e:
raise Exception(
"Failed to add {} with type {} ndtype={} init={}. Error={}".format(
self._name,
self._valType,
self._var.ndType,
varVal.valueDisp,
e))
# Setup variable
try:
self._pv = p4p.server.thread.SharedPV(queue=None, handler=EpicsPvHandler(
self._valType, self._var, self._log), initial=iv, nt=nt, options={})
except Exception as e:
raise Exception(
"Failed to start {} with type {} ndtype={} init={}. Error={}".format(
self._name,
self._valType,
self._var.ndType,
varVal.valueDisp,
e))
provider.add(self._name, self._pv)
self._var.addListener(self._varUpdated)
# Update fields in numeric types
if self._valType != 'enum' and self._valType != 's' and self._valType != 'ndarray':
curr = self._pv.current()
curr.raw.value = varVal.value
curr.raw.alarm.status = EpicsConvStatus(varVal)
curr.raw.alarm.severity = EpicsConvSeverity(varVal)
curr.raw.display.description = self._var.description
if self._var.units is not None:
curr.raw.display.units = self._var.units
if self._var.maximum is not None:
curr.raw.display.limitHigh = self._var.maximum
if self._var.minimum is not None:
curr.raw.display.limitLow = self._var.minimum
if self._var.lowWarning is not None:
curr.raw.valueAlarm.lowWarningLimit = self._var.lowWarning
if self._var.lowAlarm is not None:
curr.raw.valueAlarm.lowAlarmLimit = self._var.lowAlarm
if self._var.highWarning is not None:
curr.raw.valueAlarm.highWarningLimit = self._var.highWarning
if self._var.highAlarm is not None:
curr.raw.valueAlarm.highAlarmLimit = self._var.highAlarm
# Precision ?
self._pv.post(curr)
def _varUpdated(self, path: str, value: Any) -> None:
"""Post updated variable values into the backing EPICS PV."""
if self._valType == 'enum' or self._valType == 's':
self._pv.post(value.valueDisp)
elif self._valType == 'ndarray':
self._pv.post(value.value)
else:
curr = self._pv.current()
curr.raw.value = value.value
curr.raw.alarm.status = EpicsConvStatus(value)
curr.raw.alarm.severity = EpicsConvSeverity(value)
curr.raw['timeStamp.secondsPastEpoch'], curr.raw['timeStamp.nanoseconds'] = divmod(
float(time.time_ns()), 1.0e9)
self._pv.post(curr)
[docs]
class EpicsPvServer(object):
"""
EPICS PV server that exposes PyRogue variables as EPICS process variables.
Uses P4P to serve variables from a PyRogue tree over the EPICS protocol.
Parameters
----------
base : str
Base string prepended to PV names.
root : pyrogue.Root
PyRogue root node containing variables to expose.
incGroups : str | list[str] | None, optional
Include only variables in these groups.
excGroups : str | list[str] | None, optional
Exclude variables in these groups.
pvMap : dict[str, str] | None, optional
Explicit path-to-PV-name mapping.
"""
def __init__(
self,
*,
base: str,
root: pyrogue.Root,
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
pvMap: dict[str, str] | None = None,
) -> None:
if excGroups is None:
excGroups = ['NoServe']
self._root = root
self._base = base
self._log = pyrogue.logInit(cls=self)
self._server = None
self._incGroups = incGroups
self._excGroups = excGroups
self._pvMap = pvMap
self._started = False
self._provider = p4p.server.StaticProvider(__name__)
def _stop(self) -> None:
"""Stop the EPICS PV server."""
if self._server is not None:
self._server.stop()
def _start(self) -> None:
"""Start the EPICS PV server and register all variables."""
if self._started:
return
self._started = True
self._stop()
self._list = {}
if not self._root.running:
raise Exception(
"Epics can not be setup on a tree which is not started")
# Figure out mapping mode
if self._pvMap is None:
doAll = True
self._pvMap = {}
else:
doAll = False
# Create PVs
for v in self._root.variableList:
eName = None
if doAll:
if v.filterByGroup(self._incGroups, self._excGroups):
eName = self._base + ':' + v.path.replace('.', ':')
self._pvMap[v.path] = eName
elif v.path in self._pvMap:
eName = self._pvMap[v.path]
if eName is not None:
pvh = EpicsPvHolder(self._provider, eName, v, self._log)
self._list[v] = pvh
# Check for missing map entries
if len(self._pvMap) != len(self._list):
for k, v in self._pvMap.items():
if k not in self._list:
self._log.error(
f"Failed to find {k} from P4P mapping in Rogue tree!")
self._server = p4p.server.Server(providers=[self._provider])
[docs]
def list(self) -> dict[str, str]:
"""
Get the PyRogue path to EPICS PV name mapping.
Returns
-------
dict
Mapping of variable paths to EPICS PV names.
"""
return self._pvMap
[docs]
def dump(self, fname: str | None = None) -> None:
"""
Print or write the PV mapping to file.
Parameters
----------
fname : str, optional
If provided, write mapping to this file; otherwise print to stdout.
"""
if fname is not None:
try:
with open(fname, 'w') as f:
for k, v in self._pvMap.items():
print("{} -> {}".format(v, k), file=f)
except Exception:
raise Exception("Failed to dump epics map to {}".format(fname))
else:
for k, v in self._pvMap.items():
print("{} -> {}".format(v, k))