#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# Module containing EPICS V7 support classes and routines using pythonSoftIOC
# -----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
# -----------------------------------------------------------------------------
import pyrogue
import threading
import warnings
import hashlib
import numpy as np
try:
from softioc import builder, softioc
from softioc.asyncio_dispatcher import AsyncioDispatcher
except Exception:
warnings.warn(
"softioc (pythonSoftIOC) is not installed.\n\n"
"To install with pip:\n"
" pip install softioc\n\n"
"To install with Conda:\n"
" conda install -c conda-forge softioc\n\n"
"Note: softioc requires EPICS base to be available on your system."
)
try:
import p4p.server
import p4p.server.thread
import p4p.nt
except Exception:
warnings.warn(
"PVAccess for Python (P4P) is not installed.\n\n"
"To install with pip:\n"
" pip install p4p\n\n"
"To install with Conda:\n"
" conda install -c conda-forge p4p\n\n"
"Note: p4p requires EPICS base to be available on your system."
)
# Module-level flag: EPICS IOC can only be initialized once per process
_ioc_started = False
_EPICS_MAX_NAME_LEN = 60
# p4p structural type characters that cannot be used as NTScalar SharedPV types.
# Variant ('v'), union ('U'), and struct ('S') have no PyRogue scalar equivalent.
_PVA_UNSUPPORTED_TYPES = frozenset(('v', 'U', 'S'))
def _make_epicsV7_suffix(base, suffix):
"""
Return suffix unchanged if the full PV name fits within the CA limit.
When ``base:suffix`` exceeds ``_EPICS_MAX_NAME_LEN`` characters, return
a deterministic hashed short suffix of the form ``tail_XXXXXXXXXX``
(10 lowercase hex characters derived from SHA-1 of the full name).
Parameters
----------
base : str
The EPICS PV base name (device prefix).
suffix : str
The PV suffix derived from the pyrogue variable path.
Returns
-------
str
The original suffix if it fits, or a hashed ``tail_`` suffix.
"""
fullName = f"{base}:{suffix}"
if len(fullName) <= _EPICS_MAX_NAME_LEN:
return suffix
return "tail_" + hashlib.sha1(fullName.encode()).hexdigest()[:10]
def EpicsConvStatus(varValue: pyrogue.VariableValue) -> int:
"""
Convert PyRogue variable status to EPICS alarm status code.
Parameters
----------
varValue : object
PyRogue variable value with ``status`` attribute.
Returns
-------
int
EPICS alarm status code (0=no alarm, 3=HiHi, 4=High, 5=LoLo, 6=Low).
"""
if varValue.status == "AlarmLoLo":
return 5 # epicsAlarmLoLo
elif varValue.status == "AlarmLow":
return 6 # epicsAlarmLow
elif varValue.status == "AlarmHiHi":
return 3 # epicsAlarmHiHi
elif varValue.status == "AlarmHigh":
return 4 # epicsAlarmHigh
else:
return 0
def EpicsConvSeverity(varValue: pyrogue.VariableValue) -> int:
"""
Convert PyRogue variable severity to EPICS alarm severity code.
Parameters
----------
varValue : object
PyRogue variable value with ``severity`` attribute.
Returns
-------
int
EPICS alarm severity code (0=no alarm, 1=minor, 2=major).
"""
if varValue.severity == "AlarmMinor":
return 1 # epicsSevMinor
elif varValue.severity == "AlarmMajor":
return 2 # epicsSevMajor
else:
return 0
def _epicsV7_to_pva_type(var):
"""
Map a PyRogue variable to a p4p NTScalar type character.
Mirrors the type dispatch in EpicsPvHolder._createRecord so the PVA
long-name SharedPV uses the same type as the softioc CA record.
Parameters
----------
var : pyrogue.BaseVariable
PyRogue variable whose type is to be mapped.
Returns
-------
str
A p4p scalar type character: ``'?'`` (bool),
``'b'``/``'B'``/``'h'``/``'H'``/``'i'``/``'I'``/``'l'``/``'L'`` (int8–int64 / uint8–uint64),
``'f'``/``'d'`` (float32/float64), ``'s'`` (string), ``'enum'``, or ``'ndarray'``.
"""
typeStr = var.typeStr if var.typeStr is not None else ''
if var.nativeType is np.ndarray:
return 'ndarray'
if var.disp == 'enum':
enum_strings = list(var.enum.values())
if len(enum_strings) <= 16:
return 'enum'
else:
return 's' # large-enum fallback: matches longStringOut in softioc
if (var.nativeType is list or var.nativeType is dict or
typeStr in ('str', 'list', 'dict', 'NoneType') or typeStr == ''):
return 's'
if typeStr == 'Bool':
return '?'
if typeStr == 'UInt64':
return 'L'
if typeStr == 'Int64':
return 'l'
if typeStr == 'UInt32':
return 'I'
if typeStr == 'UInt16':
return 'H'
if typeStr == 'UInt8':
return 'B'
if typeStr == 'Int32' or typeStr == 'int':
return 'i'
if typeStr == 'Int16':
return 'h'
if typeStr == 'Int8':
return 'b'
if 'Float32' in typeStr:
return 'f'
if typeStr == 'float' or 'Float64' in typeStr or 'Double64' in typeStr:
return 'd'
return 's'
def _make_shared_pv(var, pva_type, handler):
"""
Create a p4p SharedPV for a long-name PVA alias.
The SharedPV is initialised with the current variable value so that the
first pvget from a PVA client returns a valid reading rather than
``Disconnected`` (PVA-05).
Parameters
----------
var : pyrogue.BaseVariable
PyRogue variable providing the initial value.
pva_type : str
Type character returned by ``_epicsV7_to_pva_type``.
handler : p4p.server.thread.Handler
Put handler that will receive PVA write requests.
Returns
-------
p4p.server.thread.SharedPV
"""
if pva_type in _PVA_UNSUPPORTED_TYPES:
raise ValueError(
f"p4p type '{pva_type}' (variant/union/struct) cannot be used as a SharedPV "
f"scalar type; no PyRogue variable maps to this type"
)
varVal = var.getVariableValue(read=False)
if pva_type == 'ndarray':
nt = p4p.nt.NTScalar('ad')
iv = varVal.value if varVal.value is not None else np.array([], dtype=np.float64)
elif pva_type == 'enum':
nt = p4p.nt.NTEnum(display=False, control=False, valueAlarm=False)
enum_strings = list(var.enum.values())
disp = varVal.valueDisp if varVal.valueDisp is not None else ''
try:
idx = enum_strings.index(disp)
except ValueError:
idx = 0
iv = {'choices': enum_strings, 'index': idx}
elif pva_type == 's':
nt = p4p.nt.NTScalar('s', display=False, control=False)
iv = nt.wrap(varVal.valueDisp if varVal.valueDisp is not None else '')
else:
nt = p4p.nt.NTScalar(pva_type, display=True, control=False, valueAlarm=True)
default = False if pva_type == '?' else 0
iv = nt.wrap(varVal.value if varVal.value is not None else default)
iv.alarm.severity = EpicsConvSeverity(varVal)
iv.alarm.status = EpicsConvStatus(varVal)
if var.description is not None:
iv.display.description = var.description
if var.units is not None:
iv.display.units = var.units
if var.maximum is not None:
iv.display.limitHigh = var.maximum
if var.minimum is not None:
iv.display.limitLow = var.minimum
if var.lowWarning is not None:
iv.valueAlarm.lowWarningLimit = var.lowWarning
if var.lowAlarm is not None:
iv.valueAlarm.lowAlarmLimit = var.lowAlarm
if var.highWarning is not None:
iv.valueAlarm.highWarningLimit = var.highWarning
if var.highAlarm is not None:
iv.valueAlarm.highAlarmLimit = var.highAlarm
return p4p.server.thread.SharedPV(queue=None, handler=handler, initial=iv, nt=nt)
def _post_pv_long(pv_long, pva_type, var, value):
"""
Post an updated value to the long-name SharedPV from ``_varUpdated``.
Mirrors the epicsV4.py ``_varUpdated`` post pattern for each type.
Parameters
----------
pv_long : p4p.server.thread.SharedPV
The SharedPV serving the full long PVA name.
pva_type : str
Type character returned by ``_epicsV7_to_pva_type``.
var : pyrogue.BaseVariable
The backing PyRogue variable (unused currently; kept for symmetry).
value : pyrogue.VariableValue
New variable value to publish.
"""
if pva_type == 'enum':
pv_long.post(value.valueDisp if value.valueDisp is not None else '')
elif pva_type == 'ndarray':
if value.value is None:
return
pv_long.post(value.value)
elif pva_type == 's':
pv_long.post(str(value.valueDisp) if value.valueDisp is not None else '')
else:
# Numeric scalar
if value.value is None:
return
curr = pv_long.current()
curr.raw.value = value.value
curr.raw.alarm.severity = EpicsConvSeverity(value)
curr.raw.alarm.status = EpicsConvStatus(value)
pv_long.post(curr)
class _RoguePvaHandler:
"""DynamicProvider handler that routes PVA channel requests to the correct SharedPV.
p4p 4.2.2's StaticProvider has a routing bug where all channel requests
resolve to a single PV when multiple PVs share one provider or server.
This handler implements correct Python-level dispatch via a plain dict so
a single DynamicProvider + single Server can serve all long-name aliases.
"""
def __init__(self):
self._pvs = {} # long_name -> SharedPV
def add(self, name, pv):
self._pvs[name] = pv
def testChannel(self, name):
return name in self._pvs
def makeChannel(self, name, peer):
return self._pvs[name]
class EpicsPvaLongNameHandler(p4p.server.thread.Handler):
"""Routes PVA put on a long-name alias back to the parent EpicsPvHolder._on_put.
Parameters
----------
holder : EpicsPvHolder
The holder instance that owns the softioc record and the ``_on_put``
callback.
"""
def __init__(self, holder):
self._holder = holder
def put(self, pv, op):
# CRITICAL: signature is (self, pv, op) — two args after self
# Confirmed in p4p 4.2.2 p4p/server/raw.py:56
try:
pva_type = self._holder._pva_type
if pva_type == 'enum':
self._holder._on_put(int(op.value()))
elif pva_type == 's':
self._holder._on_put(op.value().raw.value)
elif pva_type == 'ndarray':
self._holder._on_put(op.value().raw.value.copy())
else:
self._holder._on_put(op.value().raw.value)
op.done()
except Exception as e:
op.done(error=str(e))
[docs]
class EpicsPvHolder(object):
"""Holds and manages a single EPICS PV backed by a PyRogue variable.
Parameters
----------
var : pyrogue.BaseVariable
PyRogue variable to expose as an EPICS PV.
name : str
Full EPICS PV name (base:suffix).
suffix : str
PV name suffix relative to the device base name.
"""
def __init__(self, var, name: str, suffix: str, provider=None, long_name=None) -> None:
self._var = var
self._name = name
self._suffix = suffix
self._record = None
# True for writable (Out) records; Out records have process=False support
self._is_writable = var.isCommand or var.mode in ('RW', 'WO')
# PVA long-name alias (only for hashed PVs).
# _pv_long is the SharedPV; _long_name is the full PV name used as the PVA channel name.
# The SharedPV is registered in EpicsPvServer's single shared StaticProvider/Server,
# not here — one server for all long-name aliases avoids per-PV FD exhaustion.
self._pv_long = None
self._pva_type = None
self._long_name = long_name # stored so _start() can register into the shared provider
if long_name is not None:
try:
self._pva_type = _epicsV7_to_pva_type(var)
handler = EpicsPvaLongNameHandler(self)
self._pv_long = _make_shared_pv(var, self._pva_type, handler)
except Exception:
self._pv_long = None
self._pva_type = None
self._long_name = None
self._createRecord()
# Add listener to sync future updates from PyRogue to EPICS
self._var.addListener(self._varUpdated)
def _createRecord(self):
"""Create the softioc record for this PV based on variable type and mode."""
v = self._var
is_writable = v.isCommand or v.mode in ('RW', 'WO')
# --- Command (TYPE-12) ---
if v.isCommand:
# always_update=True ensures callback fires even for value=0 (needed for no-arg commands)
self._record = builder.longOut(self._suffix, on_update=self._on_put, always_update=True)
return
# Determine typeStr safely
typeStr = v.typeStr if v.typeStr is not None else ''
# --- ndarray (TYPE-08) ---
if v.nativeType is np.ndarray:
# Determine FTVL from typeStr; default DOUBLE
ftvl_map = {
'UInt8': 'UCHAR', 'UInt16': 'USHORT', 'UInt32': 'ULONG',
'Int8': 'CHAR', 'Int16': 'SHORT', 'Int32': 'LONG',
'Float32': 'FLOAT', 'Float64': 'DOUBLE', 'Double64': 'DOUBLE',
}
ftvl = 'DOUBLE'
for k, tv in ftvl_map.items():
if k in typeStr:
ftvl = tv
break
# Get initial value to determine length
varVal = v.getVariableValue(read=False)
length = varVal.value.size if varVal.value is not None else 1
if is_writable:
self._record = builder.WaveformOut(self._suffix, length=length, FTVL=ftvl, on_update=self._on_put)
else:
self._record = builder.WaveformIn(self._suffix, length=length, FTVL=ftvl)
return
# --- enum (TYPE-06) ---
if v.disp == 'enum':
enum_strings = list(v.enum.values())
if len(enum_strings) <= 16:
# mbbIn/mbbOut support at most 16 states (EPICS hard limit)
if is_writable:
self._record = builder.mbbOut(self._suffix, *enum_strings, on_update=self._on_put)
else:
self._record = builder.mbbIn(self._suffix, *enum_strings)
else:
# Fall back to string record for large enums
if is_writable:
self._record = builder.longStringOut(self._suffix, on_update=self._on_put)
else:
self._record = builder.longStringIn(self._suffix)
return
# --- string / list / dict / None (TYPE-07) ---
if (v.nativeType is list or v.nativeType is dict or
typeStr in ('str', 'list', 'dict', 'NoneType') or typeStr == ''):
if is_writable:
self._record = builder.longStringOut(self._suffix, on_update=self._on_put)
else:
self._record = builder.longStringIn(self._suffix)
return
# --- Bool (TYPE-09) ---
if typeStr == 'Bool':
if is_writable:
self._record = builder.longOut(self._suffix, on_update=self._on_put)
else:
self._record = builder.longIn(self._suffix)
return
# --- UInt64 / Int64 → int64 (TYPE-02) ---
if typeStr in ('UInt64', 'Int64'):
if is_writable:
self._record = builder.int64Out(self._suffix, on_update=self._on_put)
else:
self._record = builder.int64In(self._suffix)
return
# --- UInt8/16/32, Int8/16/32, and plain 'int' → long (TYPE-01, TYPE-03) ---
if (typeStr == 'int' or
any(typeStr.startswith(p) for p in ('UInt8', 'UInt16', 'UInt32', 'Int8', 'Int16', 'Int32'))):
if is_writable:
self._record = builder.longOut(self._suffix, on_update=self._on_put)
else:
self._record = builder.longIn(self._suffix)
return
# --- Float32 (TYPE-04) ---
if 'Float32' in typeStr:
if is_writable:
self._record = builder.aOut(self._suffix, on_update=self._on_put)
else:
self._record = builder.aIn(self._suffix)
return
# --- Float64 / Double64 and plain 'float' (TYPE-05) ---
if typeStr == 'float' or 'Float64' in typeStr or 'Double64' in typeStr:
if is_writable:
self._record = builder.aOut(self._suffix, on_update=self._on_put)
else:
self._record = builder.aIn(self._suffix)
return
# --- Fallback: string ---
if is_writable:
self._record = builder.longStringOut(self._suffix, on_update=self._on_put)
else:
self._record = builder.longStringIn(self._suffix)
def _varUpdated(self, path, value):
"""Called when the backing PyRogue variable changes; pushes new value to EPICS record."""
if self._record is None:
return
try:
v = self._var
typeStr = v.typeStr if v.typeStr is not None else ''
# ProcessDeviceSupportOut.set() accepts process=False to prevent on_update re-trigger.
# ProcessDeviceSupportIn.set() has NO process parameter — passing process= raises
# TypeError which is silently swallowed, leaving In records stuck at 0.
# Use **_set_kw to pass process=False only for writable (Out) records.
_set_kw = {'process': False} if self._is_writable else {}
# --- ndarray ---
if v.nativeType is np.ndarray:
if value.value is not None:
self._record.set(value.value, **_set_kw)
# PVA long-name alias: dual-post (PVA-03)
if self._pv_long is not None:
_post_pv_long(self._pv_long, self._pva_type, self._var, value)
return
# --- enum (VAR-01, VAR-04 push direction) ---
if v.disp == 'enum':
enum_strings = list(v.enum.values())
disp = value.valueDisp if value.valueDisp is not None else ''
if len(enum_strings) <= 16:
# mbb record: set by index
try:
idx = enum_strings.index(disp)
except ValueError:
idx = 0
self._record.set(idx, **_set_kw)
else:
# longString fallback: set by display string
self._record.set(str(disp), **_set_kw)
# PVA long-name alias: dual-post (PVA-03)
if self._pv_long is not None:
_post_pv_long(self._pv_long, self._pva_type, self._var, value)
return
# --- string / list / dict / None (VAR-05 push direction) ---
if (v.nativeType is list or v.nativeType is dict or
typeStr in ('str', 'list', 'dict', 'NoneType') or typeStr == ''):
disp = value.valueDisp if value.valueDisp is not None else ''
# longStringIn/Out supports arbitrary length strings
self._record.set(str(disp), **_set_kw)
# PVA long-name alias: dual-post (PVA-03)
if self._pv_long is not None:
_post_pv_long(self._pv_long, self._pva_type, self._var, value)
return
# --- All numeric types (int, float, bool) with alarm severity (VAR-06) ---
if value.value is None:
return
sev = EpicsConvSeverity(value)
if typeStr == 'Bool':
self._record.set(int(value.value), severity=sev, **_set_kw)
else:
# Use value directly, not severity parameter (softioc 4.7+ doesn't support severity on all record types)
self._record.set(value.value, **_set_kw)
# PVA long-name alias: dual-post (PVA-03)
if self._pv_long is not None:
_post_pv_long(self._pv_long, self._pva_type, self._var, value)
except Exception:
pass # Listener callbacks must not raise; softioc may call this from IOC thread
def _on_put(self, new_value):
"""Called by softioc when a CA/PVA client writes to this PV."""
try:
v = self._var
# Command (TYPE-12): value 0 → no-arg call, non-zero → pass as argument
# always_update=True ensures callback fires for value=0 (needed for no-arg commands)
if v.isCommand:
if new_value == 0:
v() # No-arg call
else:
v(new_value) # Call with argument
return
typeStr = v.typeStr if v.typeStr is not None else ''
# Enum (VAR-04): softioc passes index int for mbb, or string for longString fallback
if v.disp == 'enum':
enum_strings = list(v.enum.values())
if len(enum_strings) <= 16:
# mbb record: new_value is an index int
if 0 <= new_value < len(enum_strings):
v.setDisp(enum_strings[new_value])
else:
# longString fallback: new_value is already the display string
if isinstance(new_value, (bytes, bytearray)):
new_value = new_value.decode('utf-8', errors='replace')
v.setDisp(str(new_value))
return
# String (VAR-05): decode bytes if needed
if (v.nativeType is list or v.nativeType is dict or
typeStr in ('str', 'list', 'dict', 'NoneType')):
if isinstance(new_value, (bytes, bytearray)):
new_value = new_value.decode('utf-8', errors='replace')
v.setDisp(str(new_value))
return
# Bool: convert to int
if typeStr == 'Bool':
v.set(int(new_value))
return
# All numeric types (VAR-03)
v.set(new_value)
except Exception:
pass # softioc on_update callbacks must not raise
[docs]
class EpicsPvServer(object):
"""
EPICS PV server that exposes PyRogue variables as EPICS process variables.
Uses pythonSoftIOC (softioc) to serve variables from a PyRogue tree over
the EPICS CA/PVA protocol. Records are updated when PyRogue variable
listeners fire (e.g. via auto-polling or explicit reads); softioc does not
invoke a Python callback on client caget/ctxt.get().
Parameters
----------
base : str
Base string prepended to PV names (device name for SetDeviceName).
root : pyrogue.Root
PyRogue root node containing variables to expose.
incGroups : str | list[str] | None, optional
Include only variables in these groups.
excGroups : str | list[str] | None, optional
Exclude variables in these groups. Defaults to ['NoServe'].
pvMap : dict[str, str] | None, optional
Explicit rogue_path-to-EPICS_name mapping. If None, all variables
passing incGroups/excGroups filter are auto-mapped.
"""
def __init__(
self,
*,
base: str,
root,
incGroups=None,
excGroups=None,
pvMap=None,
) -> None:
if excGroups is None:
excGroups = ['NoServe']
self._base = base
self._root = root
self._log = pyrogue.logInit(cls=self)
self._incGroups = incGroups
self._excGroups = excGroups
self._pvMap = pvMap if pvMap is not None else {}
self._caNameMap = {} # rogue path -> hashed CA name (only for PVs that were hashed)
self._holders = []
self._thread = None
self._started = False # Track if this instance has been started
self._pva_provider = None # single StaticProvider for all long-name aliases
self._pva_server = None # single p4p.server.Server for all long-name aliases
root.addProtocol(self)
[docs]
def list(self) -> dict:
"""
Get the PyRogue path to EPICS PV name mapping.
Returns
-------
dict
Mapping of variable paths to EPICS PV names.
"""
return self._pvMap
[docs]
def dump(self, fname=None) -> None:
"""
Print or write the PV mapping to file.
For PVs whose CA record name was hashed (name exceeded 60 chars),
an annotation showing the CA short name is appended.
Parameters
----------
fname : str, optional
If provided, write mapping to this file; otherwise print to stdout.
"""
lines = []
for k, v in self._pvMap.items():
line = "{} -> {}".format(v, k)
if k in self._caNameMap:
line += " (CA: {})".format(self._caNameMap[k])
lines.append(line)
if fname is not None:
try:
with open(fname, 'w') as f:
for line in lines:
print(line, file=f)
except Exception:
raise Exception("Failed to dump epics map to {}".format(fname))
else:
for line in lines:
print(line)
def _start(self) -> None:
"""Start the EPICS IOC and register all variables.
Calls builder.SetDeviceName before any record creation, creates all
EpicsPvHolder instances (which create records), then calls
builder.LoadDatabase() and starts iocInit in a background daemon thread.
"""
global _ioc_started
if not self._root.running:
raise Exception("epicsV7 cannot be set up on a tree that is not started")
# Prevent duplicate _start() calls - softioc can only LoadDatabase once per process
if self._started:
raise Exception(f"epicsV7: Duplicate _start() call for base={self._base}. Protocol may have been added twice.")
self._started = True
self._log.info(f"epicsV7: First _start() call for base={self._base}")
# Determine mapping mode
if not self._pvMap:
doAll = True
else:
doAll = False
# CRITICAL: SetDeviceName MUST be called before any record creation
builder.SetDeviceName(self._base)
self._log.info(f"epicsV7: SetDeviceName({self._base}) called, creating {len(self._root.variableList)} PV holders")
# Create all PV holders (record creation happens inside EpicsPvHolder._createRecord)
# Collision detection: maps hashed eSuffix -> original v.path
# Used to detect if two distinct variables hash to the same short name
_hashCollisions = {}
for v in self._root.variableList:
eName = None
eSuffix = None
if doAll:
if v.filterByGroup(self._incGroups, self._excGroups):
# PV naming: base:path.replace('.', ':')
# suffix is the part AFTER base: (relative to SetDeviceName)
suffix = v.path.replace('.', ':')
eSuffix = _make_epicsV7_suffix(self._base, suffix)
# Full long name always stored in _pvMap (per HASH-05)
eName = self._base + ':' + suffix
self._pvMap[v.path] = eName
# Track CA name if it was hashed (per OPS-02)
if eSuffix != suffix:
caName = self._base + ':' + eSuffix
self._caNameMap[v.path] = caName
# Collision detection (per COLL-01): check BEFORE any builder call
if eSuffix in _hashCollisions:
raise RuntimeError(
f"epicsV7: Hash collision detected! "
f"Two variables map to the same CA suffix '{eSuffix}': "
f"'{_hashCollisions[eSuffix]}' and '{v.path}'"
)
_hashCollisions[eSuffix] = v.path
elif v.path in self._pvMap:
eName = self._pvMap[v.path]
# Derive suffix: strip "base:" prefix
suffix = eName[len(self._base) + 1:] if eName.startswith(self._base + ':') else eName
eSuffix = _make_epicsV7_suffix(self._base, suffix)
# Track CA name if it was hashed (explicit pvMap mode)
if eSuffix != suffix:
caName = self._base + ':' + eSuffix
self._caNameMap[v.path] = caName
if eName is not None:
is_hashed = (eSuffix != suffix)
holder = EpicsPvHolder(
v, eName, eSuffix,
long_name=eName if is_hashed else None,
)
self._holders.append(holder)
# Verify all explicit pvMap entries were found
if not doAll and len(self._pvMap) != len(self._holders):
for k, v in self._pvMap.items():
found = any(h._name == v for h in self._holders)
if not found:
self._log.error(f"Failed to find {k} from pvMap in Rogue tree!")
# CRITICAL: LoadDatabase after ALL records created, before iocInit
self._log.info(f"epicsV7: Created {len(self._holders)} PV holders, calling LoadDatabase()")
builder.LoadDatabase()
# Start IOC in background daemon thread (only once per process)
if not _ioc_started:
_ioc_started = True
# Create dispatcher and start IOC
dispatcher = AsyncioDispatcher()
ioc_ready = threading.Event()
def _run_ioc():
# AsyncioDispatcher creates and manages its own event loop
softioc.iocInit(dispatcher, enable_pva=True)
ioc_ready.set()
self._thread = threading.Thread(target=_run_ioc, name='epicsV7-ioc', daemon=True)
self._thread.start()
# Block until iocInit signals completion
ioc_ready.wait()
else:
self._log.warning("epicsV7: IOC already started in this process; skipping iocInit")
# Initialize all record values from current PyRogue variable values
# This must happen AFTER iocInit so records can accept updates
self._log.info(f"epicsV7: Initializing {len(self._holders)} record values from PyRogue variables")
for holder in self._holders:
try:
varVal = holder._var.getVariableValue(read=False)
if varVal is not None:
holder._varUpdated(holder._var.path, varVal)
except Exception as e:
self._log.error(f"epicsV7: Failed to initialize {holder._name}: {e}")
# Start a single p4p.server.Server for ALL long-name aliases AFTER the initial
# value sweep so SharedPVs have current values before clients can connect (PVA-05).
# One DynamicProvider + one Server for all aliases avoids per-PV FD exhaustion.
# p4p 4.2.2's StaticProvider has a routing bug (all requests resolve to one PV
# when multiple PVs share a provider or server), so we use DynamicProvider with
# a plain Python dict for correct name→SharedPV dispatch.
long_name_holders = [h for h in self._holders if h._pv_long is not None]
if long_name_holders:
try:
handler = _RoguePvaHandler()
for holder in long_name_holders:
handler.add(holder._long_name, holder._pv_long)
self._pva_provider = p4p.server.DynamicProvider('rogue-long-names', handler)
self._pva_server = p4p.server.Server(providers=[self._pva_provider])
self._log.info(f"epicsV7: Started PVA long-name server for {len(long_name_holders)} alias(es)")
except Exception as e:
self._pva_server = None
self._pva_provider = None
self._log.error(f"epicsV7: Failed to start PVA long-name server: {e}")
def _stop(self) -> None:
"""Stop the EPICS IOC.
softioc has no programmatic stop API; the IOC thread is a daemon and
will exit automatically when the process exits. The shared PVA server
and StaticProvider are stopped, and all SharedPVs are closed.
"""
if self._pva_server is not None:
try:
self._pva_server.stop()
except Exception:
pass
self._pva_server = None
for holder in self._holders:
if holder._pv_long is not None:
# Set to None BEFORE close to prevent _varUpdated from
# posting to a closed PV (pitfall 4 prevention)
pv = holder._pv_long
holder._pv_long = None
try:
pv.close(destroy=True)
except Exception:
pass
if self._pva_provider is not None:
try:
self._pva_provider.close()
except Exception:
pass
self._pva_provider = None