Python Reference¶
Auto-generated documentation from Python source code.
Modules¶
Configuration¶
Services¶
EPICS Service¶
app.services.epics_service
¶
EpicsService(enable_circuit_breaker=True)
¶
Native async EPICS service using aioca.
No ThreadPool needed - aioca is natively async.
Features: - Circuit breaker integration for failure isolation - Per-IOC circuit breakers (extracted from PV name prefix) - Automatic recovery when IOCs become healthy
connect_pv(pv_name)
async
¶
Pre-connect to a PV.
connect_many(pv_names)
async
¶
Pre-connect to multiple PVs.
get_single(pv_name, timeout=None)
async
¶
Read a single PV with metadata.
get_many(pv_names)
async
¶
Get values for multiple PVs in parallel.
aioca handles parallel connections internally when given a list.
get_many_with_progress(pv_names, progress_callback=None)
async
¶
Get values for multiple PVs with progress tracking.
Processes in batches to allow progress updates while maintaining aioca's efficient parallel connections within each batch.
put_single(pv_name, value)
async
¶
Write a value to a single PV.
put_many(values)
async
¶
Put values to multiple PVs.
aioca caput with lists writes in sequence if PVs are pre-connected.
shutdown()
async
¶
Cleanup resources.
get_epics_service()
¶
Get or create the EPICS service singleton.
Redis Service¶
app.services.redis_service
¶
PVCacheEntry(value, connected, updated_at, status=None, severity=None, timestamp=None, units=None, error=None)
dataclass
¶
RedisService()
¶
Manages Redis connection and PV value cache with connection tracking.
Enhanced for 40k PV monitoring with: - Per-PV connection state tracking (connected/disconnected) - Heartbeat mechanism for system health monitoring - Disconnected PV set for quick lookup - Timestamps for stale data detection
connect()
async
¶
Establish Redis connection.
disconnect()
async
¶
Close Redis connection.
is_connected()
¶
Check if Redis is connected.
set_pv_value(pv_name, value, connected=True, status=None, severity=None, timestamp=None, units=None, error=None)
async
¶
Set a single PV value in the cache with full metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pv_name
|
str
|
The PV name/address |
required |
value
|
Any
|
The PV value (can be scalar or array) |
required |
connected
|
bool
|
Whether the PV is currently connected |
True
|
status
|
str | None
|
EPICS alarm status string |
None
|
severity
|
int | None
|
EPICS alarm severity (0-3) |
None
|
timestamp
|
float | None
|
EPICS timestamp |
None
|
units
|
str | None
|
Engineering units |
None
|
error
|
str | None
|
Error message if disconnected |
None
|
set_pv_connected(pv_name, connected, error=None)
async
¶
Update only the connection state of a PV.
Called from connection callbacks when PV connects/disconnects.
set_pv_values_bulk(values)
async
¶
Set multiple PV values in the cache (atomic).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
values
|
dict[str, PVCacheEntry | dict]
|
Dict of pv_name -> PVCacheEntry or dict |
required |
get_pv_value(pv_name)
async
¶
Get a single PV value from the cache.
get_pv_values_bulk(pv_names)
async
¶
Get multiple PV values from the cache.
get_all_pv_values()
async
¶
Get all cached PV values.
get_all_pv_values_as_dict()
async
¶
Get all cached PV values as plain dicts (for JSON serialization).
delete_pv_value(pv_name)
async
¶
Delete a single PV value from the cache.
clear_all_pv_values()
async
¶
Clear all cached PV values.
get_cached_pv_count()
async
¶
Get the number of cached PV values.
get_disconnected_pvs()
async
¶
Get all PVs currently marked as disconnected.
get_disconnected_count()
async
¶
Get count of disconnected PVs.
get_stale_pvs(max_age_seconds=None)
async
¶
Get PVs that haven't been updated recently (potential staleness).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_age_seconds
|
float | None
|
Maximum age in seconds. Defaults to config value. |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of PV names that are stale |
update_heartbeat()
async
¶
Update the monitor heartbeat timestamp.
get_heartbeat()
async
¶
Get the last heartbeat timestamp.
get_heartbeat_age()
async
¶
Get the age of the last heartbeat in seconds.
is_monitor_alive(max_age_seconds=5.0)
async
¶
Check if the monitor process is still alive.
publish_pv_update(pv_name, value=None)
async
¶
Publish a PV value update to the pub/sub channel.
If value is None, the current cached value will be fetched.
publish_pv_updates_bulk(pv_names)
async
¶
Publish multiple PV value updates.
Fetches current values from cache and publishes them.
subscribe_pv_updates(callback)
async
¶
Subscribe to PV value updates.
get_health_stats()
async
¶
Get comprehensive health statistics.
acquire_monitor_lock(instance_id)
async
¶
Acquire exclusive lock for monitor process (leader election).
Only one monitor instance should run at a time. This uses Redis SET with NX (only set if not exists) for atomic leader election.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
str
|
Unique identifier for this monitor instance |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if lock acquired, False if another instance holds it |
renew_monitor_lock(instance_id)
async
¶
Renew lock if we still own it.
Must be called periodically (< MONITOR_LOCK_TTL) to maintain leadership.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
str
|
Our instance ID to verify ownership |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if renewed successfully, False if we lost leadership |
release_monitor_lock(instance_id)
async
¶
Release the monitor lock if we own it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
str
|
Our instance ID to verify ownership |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if released, False if we didn't own it |
get_monitor_lock_holder()
async
¶
Get the current monitor lock holder instance ID.
Returns:
| Type | Description |
|---|---|
str | None
|
Instance ID of current leader, or None if no leader |
get_monitor_heartbeat()
async
¶
Get the last monitor heartbeat timestamp.
Alias for get_heartbeat() for clearer API.
get_redis_service()
¶
Get or create the Redis service singleton.
PV Monitor¶
app.services.pv_monitor
¶
PVMonitor(redis_service)
¶
Background service that monitors all PVs and updates Redis cache.
Enhanced for 40k PV reliability with: - Batched startup to prevent UDP flood - Connection callbacks for tracking connect/disconnect events - System heartbeat for health monitoring - Proper handling of disconnection events
start(pv_addresses)
async
¶
Start monitoring all PVs with batched initialization.
Batching prevents UDP flood that can cause packet loss and dropped monitors when starting 40k+ PVs simultaneously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pv_addresses
|
list[str]
|
List of PV names to monitor |
required |
stop()
async
¶
Stop all monitors and cleanup.
restart_monitor(pv_name)
async
¶
Restart monitoring for a specific PV.
Called by watchdog when attempting to reconnect a disconnected PV.
refresh_pv_list(pv_addresses)
async
¶
Reload PV list from database (called when PVs added/removed).
Adds new monitors and removes stale ones.
get_monitored_count()
¶
Get the number of monitored PVs.
get_active_subscription_count()
¶
Get the number of active subscriptions.
is_running()
¶
Check if the monitor is running.
get_status()
¶
Get current monitor status.
get_pv_monitor(redis_service=None)
¶
Get or create the PV Monitor singleton.
Snapshot Service¶
app.services.snapshot_service
¶
SnapshotService(session, epics_service, redis_service=None)
¶
Service for snapshot operations with EPICS integration.
list_snapshots(title=None, tag_ids=None)
async
¶
List all snapshots, optionally filtered by title and/or tags.
get_by_id(snapshot_id, limit=None, offset=0)
async
¶
Get snapshot with values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
snapshot_id
|
str
|
The snapshot ID |
required |
limit
|
int | None
|
Max number of values to return (None = all) |
None
|
offset
|
int
|
Number of values to skip for pagination |
0
|
update_snapshot_metadata(snapshot_id, title=None, description=None)
async
¶
Update snapshot title and/or description.
create_snapshot(data, created_by=None, progress_callback=None)
async
¶
Create a new snapshot by reading all PVs from EPICS.
This is the critical high-performance path.
create_snapshot_from_cache(data, created_by=None, progress_callback=None)
async
¶
Create snapshot by reading from Redis cache (instant).
This is much faster than direct EPICS reads when the cache is populated by the PV Monitor background service.
restore_snapshot(snapshot_id, request=None)
async
¶
Restore PV values from a snapshot to EPICS.
This is the critical high-performance path for writes.
compare_snapshots(snapshot1_id, snapshot2_id)
async
¶
Compare two snapshots and return differences.
delete_snapshot(snapshot_id)
async
¶
Delete a snapshot and all its values.
Models¶
PV Model¶
Snapshot Model¶
app.models.snapshot
¶
Tag Model¶
app.models.tag
¶
Job Model¶
app.models.job
¶
Schemas (DTOs)¶
PV Schemas¶
app.schemas.pv
¶
Snapshot Schemas¶
app.schemas.snapshot
¶
EpicsValueDTO
¶
Bases: BaseModel
EPICS value with metadata.
TagInfoDTO
¶
Bases: BaseModel
Lightweight tag info for PV values.
PVValueDTO
¶
Bases: BaseModel
PV value in a snapshot.
NewSnapshotDTO
¶
Bases: SnapshotBase
DTO for creating a snapshot. PVs are automatically included.
SnapshotSummaryDTO
¶
Bases: SnapshotBase
Summary DTO for listing snapshots.
SnapshotDTO
¶
UpdateSnapshotDTO
¶
Bases: BaseModel
DTO for updating snapshot metadata.
RestoreRequestDTO
¶
Bases: BaseModel
Request to restore PV values from a snapshot.
RestoreResultDTO
¶
Bases: BaseModel
Result of a restore operation.
ComparisonResultDTO
¶
Bases: BaseModel
Comparison between two snapshots.