Skip to content

Python Reference

Auto-generated documentation from Python source code.

Modules

Configuration

app.config

Settings

Bases: BaseSettings

Services

EPICS Service

app.services.epics_service

EpicsService(enable_circuit_breaker=True)

Native async EPICS service using aioca.

No ThreadPool needed - aioca is natively async.

Features: - Circuit breaker integration for failure isolation - Per-IOC circuit breakers (extracted from PV name prefix) - Automatic recovery when IOCs become healthy

connect_pv(pv_name) async

Pre-connect to a PV.

connect_many(pv_names) async

Pre-connect to multiple PVs.

get_single(pv_name, timeout=None) async

Read a single PV with metadata.

get_many(pv_names) async

Get values for multiple PVs in parallel.

aioca handles parallel connections internally when given a list.

get_many_with_progress(pv_names, progress_callback=None) async

Get values for multiple PVs with progress tracking.

Processes in batches to allow progress updates while maintaining aioca's efficient parallel connections within each batch.

put_single(pv_name, value) async

Write a value to a single PV.

put_many(values) async

Put values to multiple PVs.

aioca caput with lists writes in sequence if PVs are pre-connected.

shutdown() async

Cleanup resources.

get_epics_service()

Get or create the EPICS service singleton.

Redis Service

app.services.redis_service

PVCacheEntry(value, connected, updated_at, status=None, severity=None, timestamp=None, units=None, error=None) dataclass

Complete PV state stored in Redis.

Includes connection tracking and timestamps for health monitoring.

to_dict()

Convert to dictionary for JSON serialization.

from_dict(data) classmethod

Create from dictionary.

RedisService()

Manages Redis connection and PV value cache with connection tracking.

Enhanced for 40k PV monitoring with: - Per-PV connection state tracking (connected/disconnected) - Heartbeat mechanism for system health monitoring - Disconnected PV set for quick lookup - Timestamps for stale data detection

connect() async

Establish Redis connection.

disconnect() async

Close Redis connection.

is_connected()

Check if Redis is connected.

set_pv_value(pv_name, value, connected=True, status=None, severity=None, timestamp=None, units=None, error=None) async

Set a single PV value in the cache with full metadata.

Parameters:

Name Type Description Default
pv_name str

The PV name/address

required
value Any

The PV value (can be scalar or array)

required
connected bool

Whether the PV is currently connected

True
status str | None

EPICS alarm status string

None
severity int | None

EPICS alarm severity (0-3)

None
timestamp float | None

EPICS timestamp

None
units str | None

Engineering units

None
error str | None

Error message if disconnected

None

set_pv_connected(pv_name, connected, error=None) async

Update only the connection state of a PV.

Called from connection callbacks when PV connects/disconnects.

set_pv_values_bulk(values) async

Set multiple PV values in the cache (atomic).

Parameters:

Name Type Description Default
values dict[str, PVCacheEntry | dict]

Dict of pv_name -> PVCacheEntry or dict

required

get_pv_value(pv_name) async

Get a single PV value from the cache.

get_pv_values_bulk(pv_names) async

Get multiple PV values from the cache.

get_all_pv_values() async

Get all cached PV values.

get_all_pv_values_as_dict() async

Get all cached PV values as plain dicts (for JSON serialization).

delete_pv_value(pv_name) async

Delete a single PV value from the cache.

clear_all_pv_values() async

Clear all cached PV values.

get_cached_pv_count() async

Get the number of cached PV values.

get_disconnected_pvs() async

Get all PVs currently marked as disconnected.

get_disconnected_count() async

Get count of disconnected PVs.

get_stale_pvs(max_age_seconds=None) async

Get PVs that haven't been updated recently (potential staleness).

Parameters:

Name Type Description Default
max_age_seconds float | None

Maximum age in seconds. Defaults to config value.

None

Returns:

Type Description
list[str]

List of PV names that are stale

update_heartbeat() async

Update the monitor heartbeat timestamp.

get_heartbeat() async

Get the last heartbeat timestamp.

get_heartbeat_age() async

Get the age of the last heartbeat in seconds.

is_monitor_alive(max_age_seconds=5.0) async

Check if the monitor process is still alive.

publish_pv_update(pv_name, value=None) async

Publish a PV value update to the pub/sub channel.

If value is None, the current cached value will be fetched.

publish_pv_updates_bulk(pv_names) async

Publish multiple PV value updates.

Fetches current values from cache and publishes them.

subscribe_pv_updates(callback) async

Subscribe to PV value updates.

get_health_stats() async

Get comprehensive health statistics.

acquire_monitor_lock(instance_id) async

Acquire exclusive lock for monitor process (leader election).

Only one monitor instance should run at a time. This uses Redis SET with NX (only set if not exists) for atomic leader election.

Parameters:

Name Type Description Default
instance_id str

Unique identifier for this monitor instance

required

Returns:

Type Description
bool

True if lock acquired, False if another instance holds it

renew_monitor_lock(instance_id) async

Renew lock if we still own it.

Must be called periodically (< MONITOR_LOCK_TTL) to maintain leadership.

Parameters:

Name Type Description Default
instance_id str

Our instance ID to verify ownership

required

Returns:

Type Description
bool

True if renewed successfully, False if we lost leadership

release_monitor_lock(instance_id) async

Release the monitor lock if we own it.

Parameters:

Name Type Description Default
instance_id str

Our instance ID to verify ownership

required

Returns:

Type Description
bool

True if released, False if we didn't own it

get_monitor_lock_holder() async

Get the current monitor lock holder instance ID.

Returns:

Type Description
str | None

Instance ID of current leader, or None if no leader

get_monitor_heartbeat() async

Get the last monitor heartbeat timestamp.

Alias for get_heartbeat() for clearer API.

get_redis_service()

Get or create the Redis service singleton.

PV Monitor

app.services.pv_monitor

PVMonitor(redis_service)

Background service that monitors all PVs and updates Redis cache.

Enhanced for 40k PV reliability with: - Batched startup to prevent UDP flood - Connection callbacks for tracking connect/disconnect events - System heartbeat for health monitoring - Proper handling of disconnection events

start(pv_addresses) async

Start monitoring all PVs with batched initialization.

Batching prevents UDP flood that can cause packet loss and dropped monitors when starting 40k+ PVs simultaneously.

Parameters:

Name Type Description Default
pv_addresses list[str]

List of PV names to monitor

required

stop() async

Stop all monitors and cleanup.

restart_monitor(pv_name) async

Restart monitoring for a specific PV.

Called by watchdog when attempting to reconnect a disconnected PV.

refresh_pv_list(pv_addresses) async

Reload PV list from database (called when PVs added/removed).

Adds new monitors and removes stale ones.

get_monitored_count()

Get the number of monitored PVs.

get_active_subscription_count()

Get the number of active subscriptions.

is_running()

Check if the monitor is running.

get_status()

Get current monitor status.

get_pv_monitor(redis_service=None)

Get or create the PV Monitor singleton.

Snapshot Service

app.services.snapshot_service

SnapshotService(session, epics_service, redis_service=None)

Service for snapshot operations with EPICS integration.

list_snapshots(title=None, tag_ids=None) async

List all snapshots, optionally filtered by title and/or tags.

get_by_id(snapshot_id, limit=None, offset=0) async

Get snapshot with values.

Parameters:

Name Type Description Default
snapshot_id str

The snapshot ID

required
limit int | None

Max number of values to return (None = all)

None
offset int

Number of values to skip for pagination

0

update_snapshot_metadata(snapshot_id, title=None, description=None) async

Update snapshot title and/or description.

create_snapshot(data, created_by=None, progress_callback=None) async

Create a new snapshot by reading all PVs from EPICS.

This is the critical high-performance path.

create_snapshot_from_cache(data, created_by=None, progress_callback=None) async

Create snapshot by reading from Redis cache (instant).

This is much faster than direct EPICS reads when the cache is populated by the PV Monitor background service.

restore_snapshot(snapshot_id, request=None) async

Restore PV values from a snapshot to EPICS.

This is the critical high-performance path for writes.

compare_snapshots(snapshot1_id, snapshot2_id) async

Compare two snapshots and return differences.

delete_snapshot(snapshot_id) async

Delete a snapshot and all its values.

Models

PV Model

app.models.pv

PV

Bases: Base, UUIDMixin, TimestampMixin

Process Variable definition.

Snapshot Model

app.models.snapshot

Snapshot

Bases: Base, UUIDMixin, TimestampMixin

Snapshot of all PV values at a point in time.

SnapshotValue

Bases: Base, UUIDMixin

Captured value of a single PV in a snapshot.

Tag Model

app.models.tag

TagGroup

Bases: Base, UUIDMixin, TimestampMixin

Tag group for organizing tags.

Tag

Bases: Base, UUIDMixin, TimestampMixin

Individual tag within a group.

Job Model

app.models.job

Job model for tracking async background tasks.

JobStatus

Bases: str, Enum

Status of a background job.

JobType

Bases: str, Enum

Type of background job.

Job

Bases: Base, UUIDMixin, TimestampMixin

Model for tracking async background tasks.

Schemas (DTOs)

PV Schemas

app.schemas.pv

NewPVElementDTO

Bases: PVBase

DTO for creating a new PV.

UpdatePVElementDTO

Bases: BaseModel

DTO for updating a PV.

PVElementDTO

Bases: PVBase

Full PV response DTO.

LivePVRequest

Bases: BaseModel

DTO for requesting live PV values via POST.

Snapshot Schemas

app.schemas.snapshot

EpicsValueDTO

Bases: BaseModel

EPICS value with metadata.

TagInfoDTO

Bases: BaseModel

Lightweight tag info for PV values.

PVValueDTO

Bases: BaseModel

PV value in a snapshot.

NewSnapshotDTO

Bases: SnapshotBase

DTO for creating a snapshot. PVs are automatically included.

SnapshotSummaryDTO

Bases: SnapshotBase

Summary DTO for listing snapshots.

SnapshotDTO

Bases: SnapshotSummaryDTO

Full snapshot with all values.

UpdateSnapshotDTO

Bases: BaseModel

DTO for updating snapshot metadata.

RestoreRequestDTO

Bases: BaseModel

Request to restore PV values from a snapshot.

RestoreResultDTO

Bases: BaseModel

Result of a restore operation.

ComparisonResultDTO

Bases: BaseModel

Comparison between two snapshots.

Tag Schemas

app.schemas.tag

TagGroupSummaryDTO

Bases: BaseModel

Summary DTO without full tag list.

TagGroupDTO

Bases: TagGroupBase

Full DTO with tags.

Job Schemas

app.schemas.job

DTOs for Job-related API operations.

JobDTO

Bases: BaseModel

Job status response.

JobCreatedDTO

Bases: BaseModel

Response when a job is created.