Skip to content

EPICS Server

The lume-epics server synchronizes process variables over Channel Access and pvAccess servers. Updates to input process variables are queued for model execution and the model output is queued for updates over both protocols.

Server Structure

Server

Server for EPICS process variables. Can be optionally initialized with only pvAccess or Channel Access protocols.

Attributes:

Name Type Description
model BaseModel

Instantiated model

input_variables Dict[str

InputVariable]): Model input variables

output_variables Dict[str

OutputVariable]): Model output variables

epics_config Optional[Dict]

...

_pva_fields List[str]

List of variables pointing to pvAccess fields

_protocols List[str]

List of protocols in use

in_queue multiprocessing.Queue
out_queues Dict[str, multiprocessing.Queue]

Queue updates to output variables to protocol servers.

exit_event multiprocessing.Event

Event triggering shutdown

_running_indicator multiprocessing.Value

Value indicating whether server is running

_process_exit_events List[multiprocessing.Event]

Exit events for each process

_model_exec_exit_event Event

Thread event for model execution exceptions

comm_thread Thread

Thread for model execution

ca_process multiprocessing.Process

Channel access server process

pva_process multiprocessing.Process

pvAccess server process

Source code in lume_epics/epics_server.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
class Server:
    """
    Server for EPICS process variables. Can be optionally initialized with only
    pvAccess or Channel Access protocols.

    Attributes:
        model (BaseModel): Instantiated model

        input_variables (Dict[str: InputVariable]): Model input variables

        output_variables (Dict[str: OutputVariable]): Model output variables

        epics_config (Optional[Dict]): ...

        _pva_fields (List[str]): List of variables pointing to pvAccess fields

        _protocols (List[str]): List of protocols in use

        in_queue (multiprocessing.Queue):

        out_queues (Dict[str, multiprocessing.Queue]): Queue updates to output
            variables to protocol servers.

        exit_event (multiprocessing.Event): Event triggering shutdown

        _running_indicator (multiprocessing.Value): Value indicating whether server is running

        _process_exit_events (List[multiprocessing.Event]): Exit events for each process

        _model_exec_exit_event (Event): Thread event for model execution exceptions

        comm_thread (Thread): Thread for model execution

        ca_process (multiprocessing.Process): Channel access server process

        pva_process (multiprocessing.Process): pvAccess server process

    """

    def __init__(
        self,
        model_class: Type[BaseModel],
        epics_config: dict,
        model_kwargs: dict = {},  # TODO DROP and use instantiated mode
        epics_env: dict = {},  # TODO drop hashable default. Should be Optional[dict]
    ) -> None:
        """Create model_class instance and configure both Channel Access and pvAccess
        servers for execution.

        Args:
            model_class (Type[BaseModel]): Model class to be instantiated

            epics_config (dict): Dictionary describing EPICS configuration for model
                variables.

            model_kwargs (dict): Kwargs to instantiate model.

            epics_env (dict): Environment variables for EPICS configuration.

        """

        # Update epics environment if programatically set
        for var in EPICS_ENV_VARS:
            if epics_env.get(var):
                os.environ[var] = epics_env[var]

        self.model = model_class(**model_kwargs)
        self.input_variables = self.model.input_variables
        self.output_variables = self.model.output_variables

        self._epics_config = epics_config

        # define programatic access to model summary
        self._pvname = None
        self._owner = None
        self._date_published = None
        self._description = None
        self._id = None

        # If configured, set up summary pv
        if "summary" in self._epics_config:
            self._pvname = self._epics_config["summary"].get("pvname")
            self._owner = self._epics_config["summary"].get("owner", "")
            self._date_published = self._epics_config["summary"].get(
                "date_published", ""
            )
            self._description = self._epics_config["summary"].get("description", "")
            self._id = self._epics_config["summary"].get("id", "")

        self._protocols = []

        ca_config = {
            var: self._epics_config[var]
            for var in self._epics_config
            if self._epics_config[var].get("protocol") in ["ca", "both"]
        }
        pva_config = {
            var: self._epics_config[var]
            for var in self._epics_config
            if self._epics_config[var].get("protocol") in ["pva", "both"]
            or var == "summary"
        }

        # track nested fields
        self._pva_fields = []
        for var, config in self._epics_config.items():
            if config.get("fields"):
                self._pva_fields += config["fields"]

        if len(ca_config) > 0:
            self._protocols.append("ca")

        if len(pva_config) > 0:
            self._protocols.append("pva")

        # set up protocol based queues
        self.in_queue = multiprocessing.Queue()
        self.out_queues = dict()
        for protocol in self._protocols:
            self.out_queues[protocol] = multiprocessing.Queue()

        # exit event for triggering shutdown
        self.exit_event = multiprocessing.Event()
        self._running_indicator = multiprocessing.Value("b", False)
        self._process_exit_events = []

        # event for shutdown on model execution exceptions
        self._model_exec_exit_event = Event()

        # we use the running marker to make sure pvs + ca don't just keep adding queue elements
        self.comm_thread = Thread(
            target=self.run_comm_thread,
            kwargs={
                "in_queue": self.in_queue,
                "out_queues": self.out_queues,
                "running_indicator": self._running_indicator,
            },
        )

        # initialize channel access server
        if "ca" in self._protocols:
            ca_input_vars = {
                var_name: var
                for var_name, var in self.model.input_variables.items()
                if var_name in ca_config
            }
            ca_output_vars = {
                var_name: var
                for var_name, var in self.model.output_variables.items()
                if var_name in ca_config
            }

            self.ca_process = CAServer(
                input_variables=ca_input_vars,
                output_variables=ca_output_vars,
                epics_config=ca_config,
                in_queue=self.in_queue,
                out_queue=self.out_queues["ca"],
                running_indicator=self._running_indicator,
            )

            self._process_exit_events.append(self.ca_process.exit_event)

        # initialize pvAccess server
        if "pva" in self._protocols:
            pva_input_vars = {
                var_name: var
                for var_name, var in self.input_variables.items()
                if var_name in pva_config
            }
            pva_output_vars = {
                var_name: var
                for var_name, var in self.output_variables.items()
                if var_name in pva_config
            }

            self.pva_process = PVAServer(
                input_variables=pva_input_vars,
                output_variables=pva_output_vars,
                epics_config=pva_config,
                in_queue=self.in_queue,
                out_queue=self.out_queues["pva"],
                running_indicator=self._running_indicator,
            )

            self._process_exit_events.append(self.pva_process.exit_event)

    def __enter__(self):
        """Handle server startup"""
        self.start(monitor=False)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Handle server shutdown"""
        self.stop()

    def run_comm_thread(
        self,
        *,
        running_indicator: multiprocessing.Value,
        in_queue: Optional[multiprocessing.Queue],
        out_queues: Optional[Dict[str, multiprocessing.Queue]],
    ):
        """Handles communications between pvAccess server, Channel Access server, and
             dmodel.

        Arguments:
            running_indicator (multiprocessing.Value): Indicates whether main server
                process active.

            in_queue (Optional[multiprocessing.Queue]): Queue receiving input variable
                inputs.

            out_queues (Optional[Dict[str, multiprocessing.Queue]]): Queue for communicating
                output vars with servers.

        """
        model = self.model
        inputs_initialized = 0

        while not self.exit_event.is_set():
            try:
                data = in_queue.get(timeout=0.1)

                # mark running
                running_indicator.value = True

                for var in data["vars"]:
                    self.input_variables[var] = data["vars"][var]

                # check no input values are None
                if not any(
                    [var.value is None for var in self.input_variables.values()]
                ):
                    inputs_initialized = 1

                # update output variable state
                if inputs_initialized:

                    # sync pva/ca if duplicated
                    for protocol, queue in out_queues.items():
                        if protocol != data["protocol"]:
                            inputs = {
                                var: self.input_variables[var]
                                for var in data["vars"]
                                if self._epics_config[var]["protocol"]
                                in [protocol, "both"]
                            }

                            if len(inputs):
                                queue.put({"input_variables": inputs})

                    model_input = self.input_variables

                    try:
                        predicted_output = model.evaluate(model_input)

                        for protocol, queue in out_queues.items():
                            outputs = {
                                var.name: var
                                for var in predicted_output.values()
                                if var.name in self._pva_fields
                                or self._epics_config[var.name]["protocol"]
                                in [protocol, "both"]
                            }
                            queue.put({"output_variables": outputs}, timeout=0.1)

                    except Exception as e:
                        traceback.print_exc()
                        self._model_exec_exit_event.set()

                running_indicator.value = False

            except Empty:
                continue

            except Full:
                logger.error(f"{protocol} queue is full.")

        logger.info("Stopping execution thread")

    def start(self, monitor: bool = True) -> None:
        """Starts server using set server protocol(s).

        Args:
            monitor (bool): Indicates whether to run the server in the background or to
                continually monitor. If monitor = False, the server must be explicitly
                stopped using server.stop()

        """
        self.comm_thread.start()

        if "ca" in self._protocols:
            self.ca_process.start()

        if "pva" in self._protocols:
            self.pva_process.start()

        if monitor:
            try:
                while not any(
                    [
                        exit_event.is_set()
                        for exit_event in self._process_exit_events
                        + [self._model_exec_exit_event]
                    ]
                ):
                    time.sleep(0.1)

                # shut down server if process exited.
                self.stop()

            except KeyboardInterrupt:
                self.stop()

    def stop(self) -> None:
        """Stops the server."""
        logger.info("Stopping server.")
        self.exit_event.set()
        self.comm_thread.join()

        if "ca" in self._protocols:
            self.ca_process.shutdown()

        if "pva" in self._protocols:
            self.pva_process.shutdown()

        logger.info("Server is stopped.")

    @property
    def summary(self):
        return {
            "pvname": self._pvname,
            "owner": self._owner,
            "date published": self._date_published,
            "description": self._description,
            "id": self._id,
        }

    @property
    def owner(self):
        return self._owner

    @property
    def summary_pvname(self):
        return self._pvname

    @property
    def date_published(self):
        return self._date_published

    @property
    def description(self):
        return self._description

    @property
    def id(self):
        return self._id

__enter__()

Handle server startup

Source code in lume_epics/epics_server.py
221
222
223
224
def __enter__(self):
    """Handle server startup"""
    self.start(monitor=False)
    return self

__exit__(exc_type, exc_val, exc_tb)

Handle server shutdown

Source code in lume_epics/epics_server.py
226
227
228
def __exit__(self, exc_type, exc_val, exc_tb):
    """Handle server shutdown"""
    self.stop()

__init__(model_class, epics_config, model_kwargs={}, epics_env={})

Create model_class instance and configure both Channel Access and pvAccess servers for execution.

Parameters:

Name Type Description Default
model_class Type[BaseModel]

Model class to be instantiated

required
epics_config dict

Dictionary describing EPICS configuration for model variables.

required
model_kwargs dict

Kwargs to instantiate model.

{}
epics_env dict

Environment variables for EPICS configuration.

{}
Source code in lume_epics/epics_server.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def __init__(
    self,
    model_class: Type[BaseModel],
    epics_config: dict,
    model_kwargs: dict = {},  # TODO DROP and use instantiated mode
    epics_env: dict = {},  # TODO drop hashable default. Should be Optional[dict]
) -> None:
    """Create model_class instance and configure both Channel Access and pvAccess
    servers for execution.

    Args:
        model_class (Type[BaseModel]): Model class to be instantiated

        epics_config (dict): Dictionary describing EPICS configuration for model
            variables.

        model_kwargs (dict): Kwargs to instantiate model.

        epics_env (dict): Environment variables for EPICS configuration.

    """

    # Update epics environment if programatically set
    for var in EPICS_ENV_VARS:
        if epics_env.get(var):
            os.environ[var] = epics_env[var]

    self.model = model_class(**model_kwargs)
    self.input_variables = self.model.input_variables
    self.output_variables = self.model.output_variables

    self._epics_config = epics_config

    # define programatic access to model summary
    self._pvname = None
    self._owner = None
    self._date_published = None
    self._description = None
    self._id = None

    # If configured, set up summary pv
    if "summary" in self._epics_config:
        self._pvname = self._epics_config["summary"].get("pvname")
        self._owner = self._epics_config["summary"].get("owner", "")
        self._date_published = self._epics_config["summary"].get(
            "date_published", ""
        )
        self._description = self._epics_config["summary"].get("description", "")
        self._id = self._epics_config["summary"].get("id", "")

    self._protocols = []

    ca_config = {
        var: self._epics_config[var]
        for var in self._epics_config
        if self._epics_config[var].get("protocol") in ["ca", "both"]
    }
    pva_config = {
        var: self._epics_config[var]
        for var in self._epics_config
        if self._epics_config[var].get("protocol") in ["pva", "both"]
        or var == "summary"
    }

    # track nested fields
    self._pva_fields = []
    for var, config in self._epics_config.items():
        if config.get("fields"):
            self._pva_fields += config["fields"]

    if len(ca_config) > 0:
        self._protocols.append("ca")

    if len(pva_config) > 0:
        self._protocols.append("pva")

    # set up protocol based queues
    self.in_queue = multiprocessing.Queue()
    self.out_queues = dict()
    for protocol in self._protocols:
        self.out_queues[protocol] = multiprocessing.Queue()

    # exit event for triggering shutdown
    self.exit_event = multiprocessing.Event()
    self._running_indicator = multiprocessing.Value("b", False)
    self._process_exit_events = []

    # event for shutdown on model execution exceptions
    self._model_exec_exit_event = Event()

    # we use the running marker to make sure pvs + ca don't just keep adding queue elements
    self.comm_thread = Thread(
        target=self.run_comm_thread,
        kwargs={
            "in_queue": self.in_queue,
            "out_queues": self.out_queues,
            "running_indicator": self._running_indicator,
        },
    )

    # initialize channel access server
    if "ca" in self._protocols:
        ca_input_vars = {
            var_name: var
            for var_name, var in self.model.input_variables.items()
            if var_name in ca_config
        }
        ca_output_vars = {
            var_name: var
            for var_name, var in self.model.output_variables.items()
            if var_name in ca_config
        }

        self.ca_process = CAServer(
            input_variables=ca_input_vars,
            output_variables=ca_output_vars,
            epics_config=ca_config,
            in_queue=self.in_queue,
            out_queue=self.out_queues["ca"],
            running_indicator=self._running_indicator,
        )

        self._process_exit_events.append(self.ca_process.exit_event)

    # initialize pvAccess server
    if "pva" in self._protocols:
        pva_input_vars = {
            var_name: var
            for var_name, var in self.input_variables.items()
            if var_name in pva_config
        }
        pva_output_vars = {
            var_name: var
            for var_name, var in self.output_variables.items()
            if var_name in pva_config
        }

        self.pva_process = PVAServer(
            input_variables=pva_input_vars,
            output_variables=pva_output_vars,
            epics_config=pva_config,
            in_queue=self.in_queue,
            out_queue=self.out_queues["pva"],
            running_indicator=self._running_indicator,
        )

        self._process_exit_events.append(self.pva_process.exit_event)

run_comm_thread(*, running_indicator, in_queue, out_queues)

Handles communications between pvAccess server, Channel Access server, and dmodel.

Parameters:

Name Type Description Default
running_indicator multiprocessing.Value

Indicates whether main server process active.

required
in_queue Optional[multiprocessing.Queue]

Queue receiving input variable inputs.

required
out_queues Optional[Dict[str, multiprocessing.Queue]]

Queue for communicating output vars with servers.

required
Source code in lume_epics/epics_server.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def run_comm_thread(
    self,
    *,
    running_indicator: multiprocessing.Value,
    in_queue: Optional[multiprocessing.Queue],
    out_queues: Optional[Dict[str, multiprocessing.Queue]],
):
    """Handles communications between pvAccess server, Channel Access server, and
         dmodel.

    Arguments:
        running_indicator (multiprocessing.Value): Indicates whether main server
            process active.

        in_queue (Optional[multiprocessing.Queue]): Queue receiving input variable
            inputs.

        out_queues (Optional[Dict[str, multiprocessing.Queue]]): Queue for communicating
            output vars with servers.

    """
    model = self.model
    inputs_initialized = 0

    while not self.exit_event.is_set():
        try:
            data = in_queue.get(timeout=0.1)

            # mark running
            running_indicator.value = True

            for var in data["vars"]:
                self.input_variables[var] = data["vars"][var]

            # check no input values are None
            if not any(
                [var.value is None for var in self.input_variables.values()]
            ):
                inputs_initialized = 1

            # update output variable state
            if inputs_initialized:

                # sync pva/ca if duplicated
                for protocol, queue in out_queues.items():
                    if protocol != data["protocol"]:
                        inputs = {
                            var: self.input_variables[var]
                            for var in data["vars"]
                            if self._epics_config[var]["protocol"]
                            in [protocol, "both"]
                        }

                        if len(inputs):
                            queue.put({"input_variables": inputs})

                model_input = self.input_variables

                try:
                    predicted_output = model.evaluate(model_input)

                    for protocol, queue in out_queues.items():
                        outputs = {
                            var.name: var
                            for var in predicted_output.values()
                            if var.name in self._pva_fields
                            or self._epics_config[var.name]["protocol"]
                            in [protocol, "both"]
                        }
                        queue.put({"output_variables": outputs}, timeout=0.1)

                except Exception as e:
                    traceback.print_exc()
                    self._model_exec_exit_event.set()

            running_indicator.value = False

        except Empty:
            continue

        except Full:
            logger.error(f"{protocol} queue is full.")

    logger.info("Stopping execution thread")

start(monitor=True)

Starts server using set server protocol(s).

Parameters:

Name Type Description Default
monitor bool

Indicates whether to run the server in the background or to continually monitor. If monitor = False, the server must be explicitly stopped using server.stop()

True
Source code in lume_epics/epics_server.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def start(self, monitor: bool = True) -> None:
    """Starts server using set server protocol(s).

    Args:
        monitor (bool): Indicates whether to run the server in the background or to
            continually monitor. If monitor = False, the server must be explicitly
            stopped using server.stop()

    """
    self.comm_thread.start()

    if "ca" in self._protocols:
        self.ca_process.start()

    if "pva" in self._protocols:
        self.pva_process.start()

    if monitor:
        try:
            while not any(
                [
                    exit_event.is_set()
                    for exit_event in self._process_exit_events
                    + [self._model_exec_exit_event]
                ]
            ):
                time.sleep(0.1)

            # shut down server if process exited.
            self.stop()

        except KeyboardInterrupt:
            self.stop()

stop()

Stops the server.

Source code in lume_epics/epics_server.py
349
350
351
352
353
354
355
356
357
358
359
360
361
def stop(self) -> None:
    """Stops the server."""
    logger.info("Stopping server.")
    self.exit_event.set()
    self.comm_thread.join()

    if "ca" in self._protocols:
        self.ca_process.shutdown()

    if "pva" in self._protocols:
        self.pva_process.shutdown()

    logger.info("Server is stopped.")

CADriver

Bases: Driver

Class for handling read and write requests to Channel Access process variables.

Source code in lume_epics/epics_ca_server.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
class CADriver(Driver):
    """
    Class for handling read and write requests to Channel Access process variables.
    """

    def __init__(self, server) -> None:
        """Initialize the Channel Access driver. Store input state and output state."""
        super(CADriver, self).__init__()
        self.server = server

    def read(self, pvname: str) -> Union[float, np.ndarray]:
        """Method executed by server when clients read a Channel Access process
        variable.

        Args:
            pvname (str): Process variable name.

        """
        return self.getParam(pvname)

    def write(self, pvname: str, value: Union[float, np.ndarray]) -> bool:
        """Method executed by server when clients write to a Channel Access process
        variable.


        Args:
            pvname (str): Process variable name.

            value (Union[float, np.ndarray]): Value to assign to the process variable.

        """

        # handle area detector types
        model_var_name = self.server._pvname_to_varname_map.get(pvname)

        if pvname in self.server._child_to_parent_map:
            model_var_name = self.server._child_to_parent_map[pvname]

        if model_var_name in self.server._output_variables:
            logger.warning(
                "Cannot update variable %s. Output variables can only be updated via surrogate model callback.",
                pvname,
            )
            return False

        if value is None:
            logger.debug(f"None value provided for {pvname}")
            return False

        if model_var_name in self.server._input_variables:

            if self.server._input_variables[model_var_name].is_constant:
                logger.debug("Unable to update constant variable %s", model_var_name)

            else:
                self.setParam(pvname, value)
                self.updatePVs()
                logger.debug(
                    "Channel Access process variable %s updated with value %s",
                    pvname,
                    value,
                )

                self.server.update_pv(pvname=pvname, value=value)
                return True

        else:
            logger.error("%s not found in server variables.", pvname)
            return False

    def update_pvs(self, variables: List[Variable]) -> None:
        """Update output Channel Access process variables after model execution.

        Args:
            variables (List[Variable]): List of variables.
        """
        for variable in variables:
            pvname = self.server._varname_to_pvname_map[variable.name]
            if variable.name in self.server._input_variables and variable.is_constant:
                logger.debug(
                    "Cannot update constant variable %s, %s", variable.name, pvname
                )

            else:
                if variable.variable_type == "image":
                    logger.debug(
                        "Channel Access image process variable %s updated.",
                        pvname,
                    )
                    self.setParam(pvname + ":ArrayData_RBV", variable.value.flatten())
                    self.setParam(pvname + ":MinX_RBV", variable.x_min)
                    self.setParam(pvname + ":MinY_RBV", variable.y_min)
                    self.setParam(pvname + ":MaxX_RBV", variable.x_max)
                    self.setParam(pvname + ":MaxY_RBV", variable.y_max)

                elif variable.variable_type == "scalar":
                    logger.debug(
                        "Channel Access process variable %s updated wth value %s.",
                        pvname,
                        variable.value,
                    )
                    self.setParam(pvname, variable.value)

                elif variable.variable_type == "array":
                    logger.debug(
                        "Channel Access image process variable %s updated.",
                        pvname,
                    )

                    self.setParam(pvname + ":ArrayData_RBV", variable.value.flatten())

                else:
                    logger.debug(
                        "No instructions for handling variable %s of type %s",
                        variable.name,
                        variable.variable_type,
                    )

        self.updatePVs()

__init__(server)

Initialize the Channel Access driver. Store input state and output state.

Source code in lume_epics/epics_ca_server.py
569
570
571
572
def __init__(self, server) -> None:
    """Initialize the Channel Access driver. Store input state and output state."""
    super(CADriver, self).__init__()
    self.server = server

read(pvname)

Method executed by server when clients read a Channel Access process variable.

Parameters:

Name Type Description Default
pvname str

Process variable name.

required
Source code in lume_epics/epics_ca_server.py
574
575
576
577
578
579
580
581
582
def read(self, pvname: str) -> Union[float, np.ndarray]:
    """Method executed by server when clients read a Channel Access process
    variable.

    Args:
        pvname (str): Process variable name.

    """
    return self.getParam(pvname)

update_pvs(variables)

Update output Channel Access process variables after model execution.

Parameters:

Name Type Description Default
variables List[Variable]

List of variables.

required
Source code in lume_epics/epics_ca_server.py
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
def update_pvs(self, variables: List[Variable]) -> None:
    """Update output Channel Access process variables after model execution.

    Args:
        variables (List[Variable]): List of variables.
    """
    for variable in variables:
        pvname = self.server._varname_to_pvname_map[variable.name]
        if variable.name in self.server._input_variables and variable.is_constant:
            logger.debug(
                "Cannot update constant variable %s, %s", variable.name, pvname
            )

        else:
            if variable.variable_type == "image":
                logger.debug(
                    "Channel Access image process variable %s updated.",
                    pvname,
                )
                self.setParam(pvname + ":ArrayData_RBV", variable.value.flatten())
                self.setParam(pvname + ":MinX_RBV", variable.x_min)
                self.setParam(pvname + ":MinY_RBV", variable.y_min)
                self.setParam(pvname + ":MaxX_RBV", variable.x_max)
                self.setParam(pvname + ":MaxY_RBV", variable.y_max)

            elif variable.variable_type == "scalar":
                logger.debug(
                    "Channel Access process variable %s updated wth value %s.",
                    pvname,
                    variable.value,
                )
                self.setParam(pvname, variable.value)

            elif variable.variable_type == "array":
                logger.debug(
                    "Channel Access image process variable %s updated.",
                    pvname,
                )

                self.setParam(pvname + ":ArrayData_RBV", variable.value.flatten())

            else:
                logger.debug(
                    "No instructions for handling variable %s of type %s",
                    variable.name,
                    variable.variable_type,
                )

    self.updatePVs()

write(pvname, value)

Method executed by server when clients write to a Channel Access process variable.

Parameters:

Name Type Description Default
pvname str

Process variable name.

required
value Union[float, np.ndarray]

Value to assign to the process variable.

required
Source code in lume_epics/epics_ca_server.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
def write(self, pvname: str, value: Union[float, np.ndarray]) -> bool:
    """Method executed by server when clients write to a Channel Access process
    variable.


    Args:
        pvname (str): Process variable name.

        value (Union[float, np.ndarray]): Value to assign to the process variable.

    """

    # handle area detector types
    model_var_name = self.server._pvname_to_varname_map.get(pvname)

    if pvname in self.server._child_to_parent_map:
        model_var_name = self.server._child_to_parent_map[pvname]

    if model_var_name in self.server._output_variables:
        logger.warning(
            "Cannot update variable %s. Output variables can only be updated via surrogate model callback.",
            pvname,
        )
        return False

    if value is None:
        logger.debug(f"None value provided for {pvname}")
        return False

    if model_var_name in self.server._input_variables:

        if self.server._input_variables[model_var_name].is_constant:
            logger.debug("Unable to update constant variable %s", model_var_name)

        else:
            self.setParam(pvname, value)
            self.updatePVs()
            logger.debug(
                "Channel Access process variable %s updated with value %s",
                pvname,
                value,
            )

            self.server.update_pv(pvname=pvname, value=value)
            return True

    else:
        logger.error("%s not found in server variables.", pvname)
        return False

CAServer

Bases: CAProcess

Process-based implementation of Channel Access server.

Attributes:

Name Type Description
_ca_server SimpleServer

pcaspy SimpleServer instance

_ca_driver Driver

pcaspy Driver instance

_input_variables Dict[str, InputVariable]

Mapping of input variable name to variable

_output_variables Dict[str, InputVariable]

Mapping of output variable name to variable

_server_thread ServerThread

Thread for running the server

shutdown_event multiprocessing.Event

Event indicating shutdown

exit_event multiprocessing.Event

Event indicating early exit

_running_indicator multiprocessing.Value

Value indicating whether model execution ongoing

_epics_config dict

Dictionary describing EPICS configuration for model variables

_in_queue multiprocessing.Queue

Queue for pushing updated input variables to model execution

_out_queue multiprocessing.Queue

Process model output variables and sync with pvAccess server

Source code in lume_epics/epics_ca_server.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
class CAServer(CAProcess):
    """
    Process-based implementation of Channel Access server.

    Attributes:
        _ca_server (SimpleServer): pcaspy SimpleServer instance

        _ca_driver (Driver): pcaspy Driver instance

        _input_variables (Dict[str, InputVariable]): Mapping of input variable name to variable

        _output_variables (Dict[str, InputVariable]): Mapping of output variable name to variable

        _server_thread (ServerThread): Thread for running the server

        shutdown_event (multiprocessing.Event): Event indicating shutdown

        exit_event (multiprocessing.Event): Event indicating early exit

        _running_indicator (multiprocessing.Value): Value indicating whether model execution ongoing

        _epics_config (dict): Dictionary describing EPICS configuration for model variables

        _in_queue (multiprocessing.Queue): Queue for pushing updated input variables to model execution

        _out_queue (multiprocessing.Queue): Process model output variables and sync with pvAccess server

    """

    protocol = "ca"

    def __init__(
        self,
        input_variables: Dict[str, InputVariable],
        output_variables: Dict[str, OutputVariable],
        epics_config: dict,
        in_queue: multiprocessing.Queue,
        out_queue: multiprocessing.Queue,
        running_indicator: multiprocessing.Value,
        *args,
        **kwargs,
    ) -> None:
        """Initialize server process.

        Args:
            input_variables (Dict[str, InputVariable]): Dictionary mapping pvname to lume-model input variable.
            output_variables (Dict[str, OutputVariable]):Dictionary mapping pvname to lume-model output variable.
            epics_config (dict): Dictionary mapping pvname to EPICS configuration.
            in_queue (multiprocessing.Queue): Queue for tracking updates to input variables.
            out_queue (multiprocessing.Queue): Queue for tracking updates to output variables.
            running_indicator (multiprocessing.Value): Multiprocessing value for indicating if server running.

        """
        super().__init__(*args, **kwargs)
        self._ca_server = None
        self._ca_driver = None
        self._server_thread = None
        self._input_variables = input_variables
        self._output_variables = output_variables
        self._in_queue = in_queue
        self._out_queue = out_queue
        self._providers = {}
        self._running_indicator = running_indicator
        self._epics_config = epics_config
        self.exit_event = multiprocessing.Event()
        self.shutdown_event = multiprocessing.Event()

        # utility maps
        self._pvname_to_varname_map = {
            config["pvname"]: var_name for var_name, config in epics_config.items()
        }
        self._varname_to_pvname_map = {
            var_name: config["pvname"] for var_name, config in epics_config.items()
        }

        # cached pv values
        self._cached_values = {}
        self._monitors = {}

    def update_pv(self, pvname, value) -> None:
        """Adds update to input process variable to the input queue.

        Args:
            pvname (str): Name of process variable

            value (Union[np.ndarray, float]): Value to set

        """
        model_var_name = self._pvname_to_varname_map.get(pvname)
        if pvname in self._child_to_parent_map:
            model_var_name = self._child_to_parent_map[pvname]

        variable = self._input_variables[model_var_name]

        # check for already cached variable
        variable = self._cached_values.get(model_var_name, variable)

        # check for image variable and proper assignments
        if variable.variable_type == "image":

            attr_type = pvname.split(":")[-1]

            if attr_type == "ArrayData_RBV":
                value = np.array(value)
                value = value.reshape(variable.shape)
                variable.value = value

            if attr_type == "MinX_RBV":
                variable.x_min = value

            if attr_type == "MinY_RBV":
                variable.y_min = value

            if attr_type == "MaxX_RBV":
                variable.x_max = value

            if attr_type == "MaxY_RBV":
                variable.y_max = value

        # assign value
        else:
            variable.value = value

        self._cached_values[model_var_name] = variable

        # only update if not running
        if not self._running_indicator.value:
            self._in_queue.put({"protocol": "ca", "vars": self._cached_values})
            self._cached_values = {}

    def _monitor_callback(self, pvname=None, value=None, **kwargs) -> None:
        """Callback executed on value change events."""
        model_var_name = self._pvname_to_varname_map.get(pvname)

        variable = self._input_variables.get(model_var_name)
        if not variable:
            variable = self._output_variables.get(model_var_name)

        # check for already cached variable
        variable = self._cached_values.get(model_var_name, variable)

        # check for image variable and proper assignments
        if variable.variable_type == "image":

            attr_type = pvname.split(":")[-1]

            if attr_type == "ArrayData_RBV":
                value = value.reshape(variable.shape())
                variable.value = value

            if attr_type == "MinX_RBV":
                variable.x_min = value

            if attr_type == "MinY_RBV":
                variable.y_mix = value

            if attr_type == "MaxX_RBV":
                variable.x_max = value

            if attr_type == "MaxY_RBV":
                variable.y_max = value

        # assign value
        else:
            variable.value = value

        self._cached_values[model_var_name] = variable

        # only update if not running
        if not self._running_indicator.value:
            self._in_queue.put({"protocol": "ca", "vars": self._cached_values})
            self._cached_values = {}

    def _initialize_model(self):
        """Initialize model"""
        self._in_queue.put({"protocol": "ca", "vars": self._input_variables})

    def setup_server(self) -> None:
        """Configure and start server."""
        # ignore interrupt in subprocess
        signal.signal(signal.SIGINT, signal.SIG_IGN)

        logger.info("Initializing CA server")

        # update value with stored defaults
        for var_name in self._input_variables:
            if self._epics_config[var_name]["serve"]:
                self._input_variables[var_name].value = self._input_variables[
                    var_name
                ].default

            else:
                pvname = self._varname_to_pvname_map[var_name]
                val = epics.caget(pvname)
                if val is None:
                    logger.error(
                        f"Unable to connect to {self._varname_to_pvname_map[var_name]}"
                    )
                    self.exit_event.set()
                    return False

                self._input_variables[var_name].value = val

        # initialize channel access server
        self._ca_server = SimpleServer()

        # update output variable values
        self._initialize_model()
        model_outputs = None
        while not self.shutdown_event.is_set() and model_outputs is None:

            try:
                model_outputs = self._out_queue.get(timeout=0.1)
            except Empty:
                pass

        if self.shutdown_event.is_set():
            pass

        model_output_vars = model_outputs.get("output_variables", {})
        self._output_variables.update(model_output_vars)

        # differentiate between values to serve and not to serve
        to_serve = []
        external = []
        variables = copy.deepcopy(self._input_variables)
        variables.update(self._output_variables)

        for var in variables:
            if var in self._epics_config:
                if self._epics_config[var]["serve"]:
                    to_serve.append(var)

                else:
                    external.append(var)

        # build pvdb and child to parent map for area detector scheme
        pvdb, self._child_to_parent_map = build_pvdb(
            [variables[var_name] for var_name in to_serve], self._epics_config
        )

        # for external variables create monitors
        for var_name in external:
            self._monitors[var_name] = epics.pv.get_pv(
                self._varname_to_pvname_map[var_name]
            )
            self._monitors[var_name].add_callback(self._monitor_callback)

        # Register pvs with server if serving
        if len(pvdb):
            self._ca_server.createPV("", pvdb)

            # set up driver for handing read and write requests to process variables
            self._ca_driver = CADriver(server=self)

            # start the server thread
            self._server_thread = CAServerThread(self._ca_server)
            self._server_thread.start()

        logger.info("CA server started")
        return True

    def update_pvs(
        self,
        input_variables: Dict[str, InputVariable],
        output_variables: Dict[str, OutputVariable],
    ) -> None:
        """Update process variables over Channel Access.

        Args:
            input_variables (Dict[str, InputVariable]): List of lume-epics output variables.

            output_variables (Dict[str, OutputVariable]): List of lume-model output variables.

        """
        variables = input_variables
        variables.update(output_variables)

        # update variables if the driver is running
        if self._ca_driver is not None:
            self._ca_driver.update_pvs(list(variables.values()))

    def run(self) -> None:
        """Start server process."""
        started = self.setup_server()
        if started:
            while not self.shutdown_event.is_set():
                try:
                    data = self._out_queue.get_nowait()
                    inputs = data.get("input_variables", {})
                    outputs = data.get("output_variables", {})
                    self.update_pvs(inputs, outputs)

                except Empty:
                    time.sleep(0.05)
                    logger.debug("out queue empty")

            # if server thread running
            if self._server_thread is not None:
                self._server_thread.stop()

            logger.info("Channel access server stopped.")
        else:
            logger.info("Unable to set up server. Shutting down.")

    def shutdown(self):
        """Safely shutdown the server process."""
        self.shutdown_event.set()

__init__(input_variables, output_variables, epics_config, in_queue, out_queue, running_indicator, *args, **kwargs)

Initialize server process.

Parameters:

Name Type Description Default
input_variables Dict[str, InputVariable]

Dictionary mapping pvname to lume-model input variable.

required
output_variables Dict[str, OutputVariable]

Dictionary mapping pvname to lume-model output variable.

required
epics_config dict

Dictionary mapping pvname to EPICS configuration.

required
in_queue multiprocessing.Queue

Queue for tracking updates to input variables.

required
out_queue multiprocessing.Queue

Queue for tracking updates to output variables.

required
running_indicator multiprocessing.Value

Multiprocessing value for indicating if server running.

required
Source code in lume_epics/epics_ca_server.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def __init__(
    self,
    input_variables: Dict[str, InputVariable],
    output_variables: Dict[str, OutputVariable],
    epics_config: dict,
    in_queue: multiprocessing.Queue,
    out_queue: multiprocessing.Queue,
    running_indicator: multiprocessing.Value,
    *args,
    **kwargs,
) -> None:
    """Initialize server process.

    Args:
        input_variables (Dict[str, InputVariable]): Dictionary mapping pvname to lume-model input variable.
        output_variables (Dict[str, OutputVariable]):Dictionary mapping pvname to lume-model output variable.
        epics_config (dict): Dictionary mapping pvname to EPICS configuration.
        in_queue (multiprocessing.Queue): Queue for tracking updates to input variables.
        out_queue (multiprocessing.Queue): Queue for tracking updates to output variables.
        running_indicator (multiprocessing.Value): Multiprocessing value for indicating if server running.

    """
    super().__init__(*args, **kwargs)
    self._ca_server = None
    self._ca_driver = None
    self._server_thread = None
    self._input_variables = input_variables
    self._output_variables = output_variables
    self._in_queue = in_queue
    self._out_queue = out_queue
    self._providers = {}
    self._running_indicator = running_indicator
    self._epics_config = epics_config
    self.exit_event = multiprocessing.Event()
    self.shutdown_event = multiprocessing.Event()

    # utility maps
    self._pvname_to_varname_map = {
        config["pvname"]: var_name for var_name, config in epics_config.items()
    }
    self._varname_to_pvname_map = {
        var_name: config["pvname"] for var_name, config in epics_config.items()
    }

    # cached pv values
    self._cached_values = {}
    self._monitors = {}

run()

Start server process.

Source code in lume_epics/epics_ca_server.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def run(self) -> None:
    """Start server process."""
    started = self.setup_server()
    if started:
        while not self.shutdown_event.is_set():
            try:
                data = self._out_queue.get_nowait()
                inputs = data.get("input_variables", {})
                outputs = data.get("output_variables", {})
                self.update_pvs(inputs, outputs)

            except Empty:
                time.sleep(0.05)
                logger.debug("out queue empty")

        # if server thread running
        if self._server_thread is not None:
            self._server_thread.stop()

        logger.info("Channel access server stopped.")
    else:
        logger.info("Unable to set up server. Shutting down.")

setup_server()

Configure and start server.

Source code in lume_epics/epics_ca_server.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def setup_server(self) -> None:
    """Configure and start server."""
    # ignore interrupt in subprocess
    signal.signal(signal.SIGINT, signal.SIG_IGN)

    logger.info("Initializing CA server")

    # update value with stored defaults
    for var_name in self._input_variables:
        if self._epics_config[var_name]["serve"]:
            self._input_variables[var_name].value = self._input_variables[
                var_name
            ].default

        else:
            pvname = self._varname_to_pvname_map[var_name]
            val = epics.caget(pvname)
            if val is None:
                logger.error(
                    f"Unable to connect to {self._varname_to_pvname_map[var_name]}"
                )
                self.exit_event.set()
                return False

            self._input_variables[var_name].value = val

    # initialize channel access server
    self._ca_server = SimpleServer()

    # update output variable values
    self._initialize_model()
    model_outputs = None
    while not self.shutdown_event.is_set() and model_outputs is None:

        try:
            model_outputs = self._out_queue.get(timeout=0.1)
        except Empty:
            pass

    if self.shutdown_event.is_set():
        pass

    model_output_vars = model_outputs.get("output_variables", {})
    self._output_variables.update(model_output_vars)

    # differentiate between values to serve and not to serve
    to_serve = []
    external = []
    variables = copy.deepcopy(self._input_variables)
    variables.update(self._output_variables)

    for var in variables:
        if var in self._epics_config:
            if self._epics_config[var]["serve"]:
                to_serve.append(var)

            else:
                external.append(var)

    # build pvdb and child to parent map for area detector scheme
    pvdb, self._child_to_parent_map = build_pvdb(
        [variables[var_name] for var_name in to_serve], self._epics_config
    )

    # for external variables create monitors
    for var_name in external:
        self._monitors[var_name] = epics.pv.get_pv(
            self._varname_to_pvname_map[var_name]
        )
        self._monitors[var_name].add_callback(self._monitor_callback)

    # Register pvs with server if serving
    if len(pvdb):
        self._ca_server.createPV("", pvdb)

        # set up driver for handing read and write requests to process variables
        self._ca_driver = CADriver(server=self)

        # start the server thread
        self._server_thread = CAServerThread(self._ca_server)
        self._server_thread.start()

    logger.info("CA server started")
    return True

shutdown()

Safely shutdown the server process.

Source code in lume_epics/epics_ca_server.py
367
368
369
def shutdown(self):
    """Safely shutdown the server process."""
    self.shutdown_event.set()

update_pv(pvname, value)

Adds update to input process variable to the input queue.

Parameters:

Name Type Description Default
pvname str

Name of process variable

required
value Union[np.ndarray, float]

Value to set

required
Source code in lume_epics/epics_ca_server.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def update_pv(self, pvname, value) -> None:
    """Adds update to input process variable to the input queue.

    Args:
        pvname (str): Name of process variable

        value (Union[np.ndarray, float]): Value to set

    """
    model_var_name = self._pvname_to_varname_map.get(pvname)
    if pvname in self._child_to_parent_map:
        model_var_name = self._child_to_parent_map[pvname]

    variable = self._input_variables[model_var_name]

    # check for already cached variable
    variable = self._cached_values.get(model_var_name, variable)

    # check for image variable and proper assignments
    if variable.variable_type == "image":

        attr_type = pvname.split(":")[-1]

        if attr_type == "ArrayData_RBV":
            value = np.array(value)
            value = value.reshape(variable.shape)
            variable.value = value

        if attr_type == "MinX_RBV":
            variable.x_min = value

        if attr_type == "MinY_RBV":
            variable.y_min = value

        if attr_type == "MaxX_RBV":
            variable.x_max = value

        if attr_type == "MaxY_RBV":
            variable.y_max = value

    # assign value
    else:
        variable.value = value

    self._cached_values[model_var_name] = variable

    # only update if not running
    if not self._running_indicator.value:
        self._in_queue.put({"protocol": "ca", "vars": self._cached_values})
        self._cached_values = {}

update_pvs(input_variables, output_variables)

Update process variables over Channel Access.

Parameters:

Name Type Description Default
input_variables Dict[str, InputVariable]

List of lume-epics output variables.

required
output_variables Dict[str, OutputVariable]

List of lume-model output variables.

required
Source code in lume_epics/epics_ca_server.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def update_pvs(
    self,
    input_variables: Dict[str, InputVariable],
    output_variables: Dict[str, OutputVariable],
) -> None:
    """Update process variables over Channel Access.

    Args:
        input_variables (Dict[str, InputVariable]): List of lume-epics output variables.

        output_variables (Dict[str, OutputVariable]): List of lume-model output variables.

    """
    variables = input_variables
    variables.update(output_variables)

    # update variables if the driver is running
    if self._ca_driver is not None:
        self._ca_driver.update_pvs(list(variables.values()))

CAServerThread

Bases: CAThread

A helper class to run server in a thread.

Source code in lume_epics/epics_ca_server.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class CAServerThread(CAThread):
    """
    A helper class to run server in a thread.
    """

    def __init__(self, server):
        """

        Args:
            server (pcaspy.SimpleServer): Pcaspy server to run in thread.
        """
        super(CAThread, self).__init__()
        self.server = server
        self.running = True

    def run(self):
        """
        Start the server processing
        """
        while self.running:
            self.server.process(0.1)

    def stop(self):
        """
        Stop the server processing
        """
        self.running = False

__init__(server)

Parameters:

Name Type Description Default
server pcaspy.SimpleServer

Pcaspy server to run in thread.

required
Source code in lume_epics/epics_ca_server.py
38
39
40
41
42
43
44
45
46
def __init__(self, server):
    """

    Args:
        server (pcaspy.SimpleServer): Pcaspy server to run in thread.
    """
    super(CAThread, self).__init__()
    self.server = server
    self.running = True

run()

Start the server processing

Source code in lume_epics/epics_ca_server.py
48
49
50
51
52
53
def run(self):
    """
    Start the server processing
    """
    while self.running:
        self.server.process(0.1)

stop()

Stop the server processing

Source code in lume_epics/epics_ca_server.py
55
56
57
58
59
def stop(self):
    """
    Stop the server processing
    """
    self.running = False

build_pvdb(variables, epics_config)

Utility function for building dictionary (pvdb) used to initialize the channel access server.

Parameters:

Name Type Description Default
variables List[Variable]

List of lume_model variables to be served with channel access server.

required
epics_config dict

Epics pvnames for each variable

required

Returns:

Name Type Description
tuple

pvdb (dict)

child_to_parent_map dict

Mapping of child pvs to parent model variables

Source code in lume_epics/epics_ca_server.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def build_pvdb(variables: List[Variable], epics_config: dict) -> tuple:
    """Utility function for building dictionary (pvdb) used to initialize the channel
    access server.

    Args:
        variables (List[Variable]): List of lume_model variables to be served with
            channel access server.

        epics_config (dict): Epics pvnames for each variable

    Returns:
        pvdb (dict)
        child_to_parent_map (dict): Mapping of child pvs to parent model variables

    """
    pvdb = {}
    child_to_parent_map = {}

    for variable in variables:
        pvname = epics_config.get(variable.name)["pvname"]

        if variable.variable_type == "image":

            if variable.value is None:
                ndim = np.nan
                shape = np.nan
                array_size_x = np.nan
                array_size_y = np.nan
                array_size = np.nan
                array_data = np.nan
                count = np.nan

            else:
                ndim = variable.value.ndim
                shape = variable.value.shape
                array_size_x = variable.value.shape[0]
                array_size_y = variable.value.shape[1]
                array_size = int(np.prod(variable.value.shape))
                array_data = variable.value.flatten()
                count = int(np.prod(variable.value.shape))

            # infer color mode
            if ndim == 2:
                color_mode = 0

            elif ndim == 3:
                color_mode = 1

            else:
                logger.info("Color mode cannot be inferred from image shape %s.", ndim)
                color_mode = np.nan

            # assign default PVS
            pvdb.update(
                {
                    f"{pvname}:NDimensions_RBV": {
                        "type": "float",
                        "prec": variable.precision,
                        "value": ndim,
                    },
                    f"{pvname}:Dimensions_RBV": {
                        "type": "int",
                        "prec": variable.precision,
                        "count": ndim,
                        "value": shape,
                    },
                    f"{pvname}:ArraySizeX_RBV": {
                        "type": "int",
                        "value": array_size_x,
                    },
                    f"{pvname}:ArraySizeY_RBV": {
                        "type": "int",
                        "value": array_size_y,
                    },
                    f"{pvname}:ArraySize_RBV": {
                        "type": "int",
                        "value": array_size,
                    },
                    f"{pvname}:ArrayData_RBV": {
                        "type": "float",
                        "prec": variable.precision,
                        "count": count,
                        "value": array_data,
                    },
                    f"{pvname}:MinX_RBV": {
                        "type": "float",
                        "value": variable.x_min,
                    },
                    f"{pvname}:MinY_RBV": {
                        "type": "float",
                        "value": variable.y_min,
                    },
                    f"{pvname}:MaxX_RBV": {
                        "type": "float",
                        "value": variable.x_max,
                    },
                    f"{pvname}:MaxY_RBV": {
                        "type": "float",
                        "value": variable.y_max,
                    },
                    f"{pvname}:ColorMode_RBV": {
                        "type": "int",
                        "value": color_mode,
                    },
                }
            )

            child_to_parent_map.update(
                {
                    f"{pvname}:{child}": variable.name
                    for child in [
                        "NDimensions_RBV",
                        "Dimensions_RBV",
                        "ArraySizeX_RBV",
                        "ArraySizeY_RBV",
                        "ArraySize_RBV",
                        "ArrayData_RBV",
                        "MinX_RBV",
                        "MinY_RBV",
                        "MaxX_RBV",
                        "MaxY_RBV",
                        "ColorMode_RBV",
                    ]
                }
            )

            if "units" in variable.__fields_set__:
                pvdb[f"{pvname}:ArrayData_RBV"]["unit"] = variable.units

            # handle rgb arrays
            if ndim > 2:
                pvdb[f"{pvname}:ArraySizeZ_RBV"] = {
                    "type": "int",
                    "value": variable.value.shape[2],
                }

        elif variable.variable_type == "scalar":
            pvdb[pvname] = variable.dict(exclude_unset=True, by_alias=True)
            if variable.value_range is not None:
                pvdb[pvname]["hilim"] = variable.value_range[1]
                pvdb[pvname]["lolim"] = variable.value_range[0]

            if variable.units is not None:
                pvdb[pvname]["unit"] = variable.units

        elif variable.variable_type == "array":

            # assign default PVS
            pvdb.update(
                {
                    f"{pvname}:NDimensions_RBV": {
                        "type": "float",
                        "prec": variable.precision,
                        "value": variable.value.ndim,
                    },
                    f"{pvname}:Dimensions_RBV": {
                        "type": "int",
                        "prec": variable.precision,
                        "count": variable.value.ndim,
                        "value": variable.value.shape,
                    },
                    f"{pvname}:ArrayData_RBV": {
                        "type": variable.value_type,
                        "prec": variable.precision,
                        "count": int(np.prod(variable.value.shape)),
                        "value": variable.value.flatten(),
                    },
                    f"{pvname}:ArraySize_RBV": {
                        "type": "int",
                        "value": int(np.prod(variable.value.shape)),
                    },
                }
            )

            child_to_parent_map.update(
                {
                    f"{pvname}:{child}": variable.name
                    for child in [
                        "NDimensions_RBV",
                        "Dimensions_RBV",
                        "ArraySize_RBV",
                        "ArrayData_RBV",
                    ]
                }
            )

            if "units" in variable.__fields_set__:
                pvdb[f"{pvname}:ArrayData_RBV"]["unit"] = variable.units

    return pvdb, child_to_parent_map

PVAServer

Bases: multiprocessing.Process

Process-based implementation of Channel Access server.

Attributes:

Name Type Description
pva_server P4PServer

p4p server instance

exit_event multiprocessing.Event

Event indicating pvAccess server error and communicating to main

shutdown_event multiprocessing.Event

Event indicating shutdown

_input_variables List[InputVariable]

List of input variables

_output_variables List[OutputVariable]

List of output variables

_in_queue multiprocessing.Queue

input variable queue

_out_queue multiprocessing.Queue

output variable update queue

_providers dict

Dictionary mapping pvname to p4p provider

_running_indicator multiprocessing.Value

Boolean indicator of running model execution

_monitors dict

Dictionary of monitor objects for read-only server

_cached_values dict

Dict for caching values while model executes

_pvname_to_varname_map dict

Mapping of pvname to variable name

_varname_to_pvname_map dict

Mapping of variable name to pvame

Source code in lume_epics/epics_pva_server.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
class PVAServer(multiprocessing.Process):
    """
    Process-based implementation of Channel Access server.

    Attributes:
        pva_server (P4PServer): p4p server instance
        exit_event (multiprocessing.Event): Event indicating pvAccess server error and communicating to main
        shutdown_event (multiprocessing.Event): Event indicating shutdown
        _input_variables (List[InputVariable]): List of input variables
        _output_variables (List[OutputVariable]): List of output variables
        _in_queue (multiprocessing.Queue): input variable queue
        _out_queue (multiprocessing.Queue): output variable update queue
        _providers (dict): Dictionary mapping pvname to p4p provider
        _running_indicator (multiprocessing.Value): Boolean indicator of running model execution
        _monitors (dict): Dictionary of monitor objects for read-only server
        _cached_values (dict): Dict for caching values while model executes
        _pvname_to_varname_map (dict): Mapping of pvname to variable name
        _varname_to_pvname_map (dict): Mapping of variable name to pvame

    """

    protocol = "pva"

    def __init__(
        self,
        input_variables: List[InputVariable],
        output_variables: List[OutputVariable],
        epics_config: dict,
        in_queue: multiprocessing.Queue,
        out_queue: multiprocessing.Queue,
        running_indicator: multiprocessing.Value,
        *args,
        **kwargs,
    ) -> None:
        """Initialize server process.

        Args:
            input_variables (Dict[str, InputVariable]): Dictionary mapping pvname to lume-model input variable.

            output_variables (Dict[str, OutputVariable]):Dictionary mapping pvname to lume-model output variable.

            epics_config (dict): Dictionary describing EPICS configuration for model variables

            in_queue (multiprocessing.Queue): Queue for tracking updates to input variables

            out_queue (multiprocessing.Queue): Queue for tracking updates to output variables

            running_indicator (multiprocessing.Value): Boolean indicator indicating running model execution

        """

        super().__init__(*args, **kwargs)
        self.pva_server = None
        self.exit_event = multiprocessing.Event()
        self.shutdown_event = multiprocessing.Event()
        self._input_variables = input_variables
        self._output_variables = output_variables
        self._epics_config = epics_config
        self._in_queue = in_queue
        self._out_queue = out_queue
        self._providers = {}
        self._running_indicator = running_indicator
        # monitors for read only
        self._monitors = {}
        self._cached_values = {}
        self._field_to_parent_map = {}

        # utility maps
        self._pvname_to_varname_map = {
            config["pvname"]: var_name for var_name, config in epics_config.items()
        }
        self._varname_to_pvname_map = {
            var_name: config["pvname"] for var_name, config in epics_config.items()
        }

    def update_pv(self, pvname: str, value: Union[np.ndarray, float]) -> None:
        """Adds update to input process variable to the input queue.

        Args:
            pvname (str): Name of process variable

            value (Union[np.ndarray, float]): Value to set

        """
        # Hack for now to get the pickable value
        value = value.raw.value

        varname = self._pvname_to_varname_map[pvname]
        model_variable = self._input_variables[varname]

        # check for already cached variable
        model_variable = self._cached_values.get(varname, model_variable)

        if model_variable.variable_type == "image":
            model_variable.x_min = value.attrib["x_min"]
            model_variable.x_max = value.attrib["x_max"]
            model_variable.y_min = value.attrib["y_min"]
            model_variable.y_max = value.attrib["y_max"]
        else:
            model_variable.value = value

        self._cached_values[varname] = model_variable

        # only update if not running
        if not self._running_indicator.value:
            self._in_queue.put({"protocol": self.protocol, "vars": self._cached_values})
            self._cached_values = {}

    def _monitor_callback(self, pvname, V) -> None:
        """Callback function used for updating read_only process variables."""
        value = V.raw.value
        varname = self._pvname_to_varname_map[pvname]
        model_variable = self._input_variables[varname]

        if not model_variable:
            model_variable = self._output_variables[varname]

        # check for already cached variable
        model_variable = self._cached_values.get(varname, model_variable)

        if model_variable.variable_type == "image":
            model_variable.x_min = value.attrib["x_min"]
            model_variable.x_max = value.attrib["x_max"]
            model_variable.y_min = value.attrib["y_min"]
            model_variable.y_max = value.attrib["y_max"]

        self._cached_values[varname] = model_variable

        # only update if not running
        if not self._running_indicator.value:
            self._in_queue.put({"protocol": self.protocol, "vars": self._cached_values})
            self._cached_values = {}

    def _initialize_model(self):
        """Initialize model"""

        rep = {"protocol": "pva", "vars": self._input_variables}

        self._in_queue.put(rep)

    def setup_server(self) -> None:
        """Configure and start server."""

        self._context = Context()

        # update value with stored defaults
        for var_name in self._input_variables:
            if self._epics_config[var_name]["serve"]:
                self._input_variables[var_name].value = self._input_variables[
                    var_name
                ].default

            else:
                if self._context is None:
                    self._context = Context("pva")

                try:
                    val = self._context.get(self._varname_to_pvname_map[var_name])
                    val = val.raw.value

                except:
                    self.exit_event.set()
                    raise ValueError(
                        f"Unable to connect to {self._varname_to_pvname_map[var_name]}"
                    )

                self._input_variables[var_name].value = val

        # update output variable values
        self._initialize_model()
        model_outputs = None
        while not self.shutdown_event.is_set() and model_outputs is None:
            try:
                model_outputs = self._out_queue.get(timeout=0.1)
            except Empty:
                pass

        if self.shutdown_event.is_set():
            pass

        # if startup hasn't failed
        else:
            model_output_vars = model_outputs.get("output_variables", {})
            self._output_variables.update(model_output_vars)

            variables = copy.deepcopy(self._input_variables)
            variables.update(self._output_variables)

            # ignore interrupt in subprocess
            signal.signal(signal.SIGINT, signal.SIG_IGN)

            logger.info("Initializing pvAccess server")

            # initialize global inputs
            self._structures = {}
            self._structure_specs = {}
            for variable_name, config in self._epics_config.items():
                if config["serve"]:
                    fields = config.get("fields")
                    pvname = config.get("pvname")

                    if fields is not None:
                        spec = []
                        structure = {}

                        for field in fields:
                            # track fields in dict
                            self._field_to_parent_map[field] = variable_name

                            variable = variables[field]

                            if variable is None:
                                raise ValueError(
                                    f"Field {field} for {variable_name} not found in variable list"
                                )

                            if variable.variable_type == "scalar":
                                spec.append((field, "d"))
                                nt = NTScalar("d")
                                initial = variable.value

                            if variable.variable_type == "table":
                                spec.append((field, "v"))
                                table_rep = ()
                                for col in variable.columns:
                                    # here we assume double type in tables...
                                    table_rep += (col, "ad")

                                nt = NTTable(table_rep)
                                initial = nt.wrap(variable.value)

                            if variable.variable_type == "array":
                                spec.append((field, "v"))

                                if variable.value_type == "str":
                                    nt = NTScalar("s")
                                    initial = variable.value

                                else:
                                    nd_array = variable.value.view(NTNDArrayData)
                                    nt = NTNDArray()
                                    initial = nt.wrap(nd_array)

                            if variable.variable_type == "image":
                                spec.append((field, "v"))

                                nd_array = variable.value.view(NTNDArrayData)
                                nd_array.attrib = {
                                    "x_min": variable.x_min,
                                    "y_min": variable.y_min,
                                    "x_max": variable.x_max,
                                    "y_max": variable.y_max,
                                }

                                nt = NTNDArray()
                                initial = nt.wrap(nd_array)

                            structure[field] = initial

                        # assemble pv
                        self._structures[variable_name] = structure
                        self._structure_specs[variable_name] = spec
                        struct_type = Type(id=variable_name, spec=spec)
                        struct_value = Value(struct_type, structure)
                        pv = SharedPV(initial=struct_value)
                        self._providers[pvname] = pv

                    else:
                        variable = variables[variable_name]
                        # prepare scalar variable types
                        if variable.variable_type == "scalar":
                            nt = NTScalar("d")
                            initial = variable.value

                        # prepare image variable types
                        elif variable.variable_type == "image":
                            nd_array = variable.value.view(NTNDArrayData)
                            nd_array.attrib = {
                                "x_min": variable.x_min,
                                "y_min": variable.y_min,
                                "x_max": variable.x_max,
                                "y_max": variable.y_max,
                            }
                            nt = NTNDArray()
                            initial = nd_array

                        elif variable.variable_type == "table":
                            table_rep = ()
                            for col in variable.columns:
                                # here we assume double type in tables...
                                table_rep += (col, "ad")

                            nt = NTTable(table_rep)
                            initial = nt.wrap(variable.value)

                        elif variable.variable_type == "array":
                            if variable.value_type == "str":
                                nt = NTScalar("as")
                                initial = variable.value

                            else:
                                nd_array = variable.value.view(NTNDArrayData)
                                nt = NTNDArray()
                                initial = nd_array

                        else:
                            raise ValueError(
                                "Unsupported variable type provided: %s",
                                variable.variable_type,
                            )

                        if variable.name in self._input_variables:
                            handler = PVAccessInputHandler(
                                pvname=pvname,
                                is_constant=variable.is_constant,
                                server=self,
                            )

                            pv = SharedPV(handler=handler, nt=nt, initial=initial)

                        else:
                            pv = SharedPV(nt=nt, initial=initial)

                        self._providers[pvname] = pv

                # if not serving pv, set up monitor
                else:
                    if variable.name in self._input_variables:
                        self._monitors[pvname] = self._context.monitor(
                            pvname, partial(self._monitor_callback, pvname)
                        )

                    # in this case, externally hosted output variable
                    else:
                        self._providers[pvname] = None

            if "summary" in self._epics_config:
                pvname = self._epics_config["summary"].get("pvname")
                owner = self._epics_config["summary"].get("owner")
                date_published = self._epics_config["summary"].get("date_published")
                description = self._epics_config["summary"].get("description")
                id = self._epics_config["summary"].get("id")

                spec = [
                    ("id", "s"),
                    ("owner", "s"),
                    ("date_published", "s"),
                    ("description", "s"),
                    ("input_variables", "as"),
                    ("output_variables", "as"),
                ]
                values = {
                    "id": id,
                    "date_published": date_published,
                    "description": description,
                    "owner": owner,
                    "input_variables": [
                        self._epics_config[var]["pvname"]
                        for var in self._input_variables
                    ],
                    "output_variables": [
                        self._epics_config[var]["pvname"]
                        for var in self._input_variables
                    ],
                }

                pv_type = Type(id="summary", spec=spec)
                value = Value(pv_type, values)
                pv = SharedPV(initial=value)
                self._providers[pvname] = pv

            # initialize pva server
            self.pva_server = P4PServer(providers=[self._providers])

            logger.info("pvAccess server started")

    def update_pvs(
        self,
        input_variables: Dict[str, InputVariable],
        output_variables: Dict[str, OutputVariable],
    ) -> None:
        """Update process variables over pvAccess.

        Args:
            input_variables (Dict[str, InputVariable]): Dict of lume-epics output variables.

            output_variables (Dict[str, OutputVariable]): Dict of lume-model output variables.

        """
        variables = input_variables
        variables.update(output_variables)

        for variable in variables.values():
            parent = self._field_to_parent_map.get(variable.name)

            if variable.name in self._input_variables and variable.is_constant:
                logger.debug("Cannot update constant variable.")

            else:
                if variable.variable_type == "image":
                    logger.debug(
                        "pvAccess image process variable %s updated.", variable.name
                    )
                    nd_array = variable.value.view(NTNDArrayData)

                    # get dw and dh from model output
                    nd_array.attrib = {
                        "x_min": variable.x_min,
                        "y_min": variable.y_min,
                        "x_max": variable.x_max,
                        "y_max": variable.y_max,
                    }
                    value = nd_array

                elif variable.variable_type == "array":
                    logger.debug(
                        "pvAccess array process variable %s updated.", variable.name
                    )
                    if variable.value_type == "str":
                        value = variable.value

                    else:
                        value = variable.value.view(NTNDArrayData)

                # do not build attribute pvs
                else:
                    logger.debug(
                        "pvAccess process variable %s updated with value %s.",
                        variable.name,
                        variable.value,
                    )
                    value = variable.value

            # update structure or pv
            if parent:
                self._structures[parent][variable.name] = value
                struct_type = Type(id=parent, spec=self._structure_specs[parent])
                value = Value(struct_type, self._structures[parent])
                pvname = self._varname_to_pvname_map[parent]
                output_provider = self._providers[pvname]

            else:
                pvname = self._varname_to_pvname_map[variable.name]
                output_provider = self._providers[pvname]

            if output_provider:
                output_provider.post(value)

            # in this case externally hosted
            else:
                try:
                    self._context.put(pvname, value)
                except:
                    self.exit_event.set()
                    self.shutdown()

    def run(self) -> None:
        """Start server process."""
        self.setup_server()

        # mark running
        while not self.shutdown_event.is_set():
            try:
                data = self._out_queue.get_nowait()
                inputs = data.get("input_variables", {})
                outputs = data.get("output_variables", {})
                self.update_pvs(inputs, outputs)

                # check cached values
                if len(self._cached_values) > 0 and not self._running_indicator.value:
                    self._in_queue.put(
                        {"protocol": self.protocol, "vars": self._cached_values}
                    )

            except Empty:
                time.sleep(0.1)
                logger.debug("out queue empty")

        self._context.close()
        if self.pva_server is not None:
            self.pva_server.stop()

        logger.info("pvAccess server stopped.")

    def shutdown(self):
        """Safely shutdown the server process."""
        self.shutdown_event.set()

__init__(input_variables, output_variables, epics_config, in_queue, out_queue, running_indicator, *args, **kwargs)

Initialize server process.

Parameters:

Name Type Description Default
input_variables Dict[str, InputVariable]

Dictionary mapping pvname to lume-model input variable.

required
output_variables Dict[str, OutputVariable]

Dictionary mapping pvname to lume-model output variable.

required
epics_config dict

Dictionary describing EPICS configuration for model variables

required
in_queue multiprocessing.Queue

Queue for tracking updates to input variables

required
out_queue multiprocessing.Queue

Queue for tracking updates to output variables

required
running_indicator multiprocessing.Value

Boolean indicator indicating running model execution

required
Source code in lume_epics/epics_pva_server.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def __init__(
    self,
    input_variables: List[InputVariable],
    output_variables: List[OutputVariable],
    epics_config: dict,
    in_queue: multiprocessing.Queue,
    out_queue: multiprocessing.Queue,
    running_indicator: multiprocessing.Value,
    *args,
    **kwargs,
) -> None:
    """Initialize server process.

    Args:
        input_variables (Dict[str, InputVariable]): Dictionary mapping pvname to lume-model input variable.

        output_variables (Dict[str, OutputVariable]):Dictionary mapping pvname to lume-model output variable.

        epics_config (dict): Dictionary describing EPICS configuration for model variables

        in_queue (multiprocessing.Queue): Queue for tracking updates to input variables

        out_queue (multiprocessing.Queue): Queue for tracking updates to output variables

        running_indicator (multiprocessing.Value): Boolean indicator indicating running model execution

    """

    super().__init__(*args, **kwargs)
    self.pva_server = None
    self.exit_event = multiprocessing.Event()
    self.shutdown_event = multiprocessing.Event()
    self._input_variables = input_variables
    self._output_variables = output_variables
    self._epics_config = epics_config
    self._in_queue = in_queue
    self._out_queue = out_queue
    self._providers = {}
    self._running_indicator = running_indicator
    # monitors for read only
    self._monitors = {}
    self._cached_values = {}
    self._field_to_parent_map = {}

    # utility maps
    self._pvname_to_varname_map = {
        config["pvname"]: var_name for var_name, config in epics_config.items()
    }
    self._varname_to_pvname_map = {
        var_name: config["pvname"] for var_name, config in epics_config.items()
    }

run()

Start server process.

Source code in lume_epics/epics_pva_server.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def run(self) -> None:
    """Start server process."""
    self.setup_server()

    # mark running
    while not self.shutdown_event.is_set():
        try:
            data = self._out_queue.get_nowait()
            inputs = data.get("input_variables", {})
            outputs = data.get("output_variables", {})
            self.update_pvs(inputs, outputs)

            # check cached values
            if len(self._cached_values) > 0 and not self._running_indicator.value:
                self._in_queue.put(
                    {"protocol": self.protocol, "vars": self._cached_values}
                )

        except Empty:
            time.sleep(0.1)
            logger.debug("out queue empty")

    self._context.close()
    if self.pva_server is not None:
        self.pva_server.stop()

    logger.info("pvAccess server stopped.")

setup_server()

Configure and start server.

Source code in lume_epics/epics_pva_server.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def setup_server(self) -> None:
    """Configure and start server."""

    self._context = Context()

    # update value with stored defaults
    for var_name in self._input_variables:
        if self._epics_config[var_name]["serve"]:
            self._input_variables[var_name].value = self._input_variables[
                var_name
            ].default

        else:
            if self._context is None:
                self._context = Context("pva")

            try:
                val = self._context.get(self._varname_to_pvname_map[var_name])
                val = val.raw.value

            except:
                self.exit_event.set()
                raise ValueError(
                    f"Unable to connect to {self._varname_to_pvname_map[var_name]}"
                )

            self._input_variables[var_name].value = val

    # update output variable values
    self._initialize_model()
    model_outputs = None
    while not self.shutdown_event.is_set() and model_outputs is None:
        try:
            model_outputs = self._out_queue.get(timeout=0.1)
        except Empty:
            pass

    if self.shutdown_event.is_set():
        pass

    # if startup hasn't failed
    else:
        model_output_vars = model_outputs.get("output_variables", {})
        self._output_variables.update(model_output_vars)

        variables = copy.deepcopy(self._input_variables)
        variables.update(self._output_variables)

        # ignore interrupt in subprocess
        signal.signal(signal.SIGINT, signal.SIG_IGN)

        logger.info("Initializing pvAccess server")

        # initialize global inputs
        self._structures = {}
        self._structure_specs = {}
        for variable_name, config in self._epics_config.items():
            if config["serve"]:
                fields = config.get("fields")
                pvname = config.get("pvname")

                if fields is not None:
                    spec = []
                    structure = {}

                    for field in fields:
                        # track fields in dict
                        self._field_to_parent_map[field] = variable_name

                        variable = variables[field]

                        if variable is None:
                            raise ValueError(
                                f"Field {field} for {variable_name} not found in variable list"
                            )

                        if variable.variable_type == "scalar":
                            spec.append((field, "d"))
                            nt = NTScalar("d")
                            initial = variable.value

                        if variable.variable_type == "table":
                            spec.append((field, "v"))
                            table_rep = ()
                            for col in variable.columns:
                                # here we assume double type in tables...
                                table_rep += (col, "ad")

                            nt = NTTable(table_rep)
                            initial = nt.wrap(variable.value)

                        if variable.variable_type == "array":
                            spec.append((field, "v"))

                            if variable.value_type == "str":
                                nt = NTScalar("s")
                                initial = variable.value

                            else:
                                nd_array = variable.value.view(NTNDArrayData)
                                nt = NTNDArray()
                                initial = nt.wrap(nd_array)

                        if variable.variable_type == "image":
                            spec.append((field, "v"))

                            nd_array = variable.value.view(NTNDArrayData)
                            nd_array.attrib = {
                                "x_min": variable.x_min,
                                "y_min": variable.y_min,
                                "x_max": variable.x_max,
                                "y_max": variable.y_max,
                            }

                            nt = NTNDArray()
                            initial = nt.wrap(nd_array)

                        structure[field] = initial

                    # assemble pv
                    self._structures[variable_name] = structure
                    self._structure_specs[variable_name] = spec
                    struct_type = Type(id=variable_name, spec=spec)
                    struct_value = Value(struct_type, structure)
                    pv = SharedPV(initial=struct_value)
                    self._providers[pvname] = pv

                else:
                    variable = variables[variable_name]
                    # prepare scalar variable types
                    if variable.variable_type == "scalar":
                        nt = NTScalar("d")
                        initial = variable.value

                    # prepare image variable types
                    elif variable.variable_type == "image":
                        nd_array = variable.value.view(NTNDArrayData)
                        nd_array.attrib = {
                            "x_min": variable.x_min,
                            "y_min": variable.y_min,
                            "x_max": variable.x_max,
                            "y_max": variable.y_max,
                        }
                        nt = NTNDArray()
                        initial = nd_array

                    elif variable.variable_type == "table":
                        table_rep = ()
                        for col in variable.columns:
                            # here we assume double type in tables...
                            table_rep += (col, "ad")

                        nt = NTTable(table_rep)
                        initial = nt.wrap(variable.value)

                    elif variable.variable_type == "array":
                        if variable.value_type == "str":
                            nt = NTScalar("as")
                            initial = variable.value

                        else:
                            nd_array = variable.value.view(NTNDArrayData)
                            nt = NTNDArray()
                            initial = nd_array

                    else:
                        raise ValueError(
                            "Unsupported variable type provided: %s",
                            variable.variable_type,
                        )

                    if variable.name in self._input_variables:
                        handler = PVAccessInputHandler(
                            pvname=pvname,
                            is_constant=variable.is_constant,
                            server=self,
                        )

                        pv = SharedPV(handler=handler, nt=nt, initial=initial)

                    else:
                        pv = SharedPV(nt=nt, initial=initial)

                    self._providers[pvname] = pv

            # if not serving pv, set up monitor
            else:
                if variable.name in self._input_variables:
                    self._monitors[pvname] = self._context.monitor(
                        pvname, partial(self._monitor_callback, pvname)
                    )

                # in this case, externally hosted output variable
                else:
                    self._providers[pvname] = None

        if "summary" in self._epics_config:
            pvname = self._epics_config["summary"].get("pvname")
            owner = self._epics_config["summary"].get("owner")
            date_published = self._epics_config["summary"].get("date_published")
            description = self._epics_config["summary"].get("description")
            id = self._epics_config["summary"].get("id")

            spec = [
                ("id", "s"),
                ("owner", "s"),
                ("date_published", "s"),
                ("description", "s"),
                ("input_variables", "as"),
                ("output_variables", "as"),
            ]
            values = {
                "id": id,
                "date_published": date_published,
                "description": description,
                "owner": owner,
                "input_variables": [
                    self._epics_config[var]["pvname"]
                    for var in self._input_variables
                ],
                "output_variables": [
                    self._epics_config[var]["pvname"]
                    for var in self._input_variables
                ],
            }

            pv_type = Type(id="summary", spec=spec)
            value = Value(pv_type, values)
            pv = SharedPV(initial=value)
            self._providers[pvname] = pv

        # initialize pva server
        self.pva_server = P4PServer(providers=[self._providers])

        logger.info("pvAccess server started")

shutdown()

Safely shutdown the server process.

Source code in lume_epics/epics_pva_server.py
514
515
516
def shutdown(self):
    """Safely shutdown the server process."""
    self.shutdown_event.set()

update_pv(pvname, value)

Adds update to input process variable to the input queue.

Parameters:

Name Type Description Default
pvname str

Name of process variable

required
value Union[np.ndarray, float]

Value to set

required
Source code in lume_epics/epics_pva_server.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def update_pv(self, pvname: str, value: Union[np.ndarray, float]) -> None:
    """Adds update to input process variable to the input queue.

    Args:
        pvname (str): Name of process variable

        value (Union[np.ndarray, float]): Value to set

    """
    # Hack for now to get the pickable value
    value = value.raw.value

    varname = self._pvname_to_varname_map[pvname]
    model_variable = self._input_variables[varname]

    # check for already cached variable
    model_variable = self._cached_values.get(varname, model_variable)

    if model_variable.variable_type == "image":
        model_variable.x_min = value.attrib["x_min"]
        model_variable.x_max = value.attrib["x_max"]
        model_variable.y_min = value.attrib["y_min"]
        model_variable.y_max = value.attrib["y_max"]
    else:
        model_variable.value = value

    self._cached_values[varname] = model_variable

    # only update if not running
    if not self._running_indicator.value:
        self._in_queue.put({"protocol": self.protocol, "vars": self._cached_values})
        self._cached_values = {}

update_pvs(input_variables, output_variables)

Update process variables over pvAccess.

Parameters:

Name Type Description Default
input_variables Dict[str, InputVariable]

Dict of lume-epics output variables.

required
output_variables Dict[str, OutputVariable]

Dict of lume-model output variables.

required
Source code in lume_epics/epics_pva_server.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def update_pvs(
    self,
    input_variables: Dict[str, InputVariable],
    output_variables: Dict[str, OutputVariable],
) -> None:
    """Update process variables over pvAccess.

    Args:
        input_variables (Dict[str, InputVariable]): Dict of lume-epics output variables.

        output_variables (Dict[str, OutputVariable]): Dict of lume-model output variables.

    """
    variables = input_variables
    variables.update(output_variables)

    for variable in variables.values():
        parent = self._field_to_parent_map.get(variable.name)

        if variable.name in self._input_variables and variable.is_constant:
            logger.debug("Cannot update constant variable.")

        else:
            if variable.variable_type == "image":
                logger.debug(
                    "pvAccess image process variable %s updated.", variable.name
                )
                nd_array = variable.value.view(NTNDArrayData)

                # get dw and dh from model output
                nd_array.attrib = {
                    "x_min": variable.x_min,
                    "y_min": variable.y_min,
                    "x_max": variable.x_max,
                    "y_max": variable.y_max,
                }
                value = nd_array

            elif variable.variable_type == "array":
                logger.debug(
                    "pvAccess array process variable %s updated.", variable.name
                )
                if variable.value_type == "str":
                    value = variable.value

                else:
                    value = variable.value.view(NTNDArrayData)

            # do not build attribute pvs
            else:
                logger.debug(
                    "pvAccess process variable %s updated with value %s.",
                    variable.name,
                    variable.value,
                )
                value = variable.value

        # update structure or pv
        if parent:
            self._structures[parent][variable.name] = value
            struct_type = Type(id=parent, spec=self._structure_specs[parent])
            value = Value(struct_type, self._structures[parent])
            pvname = self._varname_to_pvname_map[parent]
            output_provider = self._providers[pvname]

        else:
            pvname = self._varname_to_pvname_map[variable.name]
            output_provider = self._providers[pvname]

        if output_provider:
            output_provider.post(value)

        # in this case externally hosted
        else:
            try:
                self._context.put(pvname, value)
            except:
                self.exit_event.set()
                self.shutdown()

PVAccessInputHandler

Handler object that defines the callbacks to execute on put operations to input process variables.

Source code in lume_epics/epics_pva_server.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
class PVAccessInputHandler:
    """
    Handler object that defines the callbacks to execute on put operations to input
    process variables.
    """

    def __init__(self, pvname: str, is_constant: bool, server: PVAServer):
        """
        Initialize the handler with prefix and image pv attributes

        Args:
            pvname (str): The PV being handled
            is_constant (bool): Indicator of constant variable
            server (PVAServer): Reference to the server holding this PV

        """
        self.is_constant = is_constant
        self.pvname = pvname
        self.server = server

    def put(self, pv: SharedPV, op: ServOpWrap) -> None:
        """Updates the global input process variable state, posts the input process
        variable value change, runs the thread local BaseModel instance
        using the updated global input process variable states, and posts the model
        output values to the output process variables.

        Args:
            pv (SharedPV): Input process variable on which the put operates.

            op (ServOpWrap): Server operation initiated by the put call.

        """
        # update input values and global input process variable state
        if not self.is_constant and op.value() is not None:
            pv.post(op.value())
            self.server.update_pv(pvname=self.pvname, value=op.value())
        # mark server operation as complete
        op.done()

__init__(pvname, is_constant, server)

Initialize the handler with prefix and image pv attributes

Parameters:

Name Type Description Default
pvname str

The PV being handled

required
is_constant bool

Indicator of constant variable

required
server PVAServer

Reference to the server holding this PV

required
Source code in lume_epics/epics_pva_server.py
525
526
527
528
529
530
531
532
533
534
535
536
537
def __init__(self, pvname: str, is_constant: bool, server: PVAServer):
    """
    Initialize the handler with prefix and image pv attributes

    Args:
        pvname (str): The PV being handled
        is_constant (bool): Indicator of constant variable
        server (PVAServer): Reference to the server holding this PV

    """
    self.is_constant = is_constant
    self.pvname = pvname
    self.server = server

put(pv, op)

Updates the global input process variable state, posts the input process variable value change, runs the thread local BaseModel instance using the updated global input process variable states, and posts the model output values to the output process variables.

Parameters:

Name Type Description Default
pv SharedPV

Input process variable on which the put operates.

required
op ServOpWrap

Server operation initiated by the put call.

required
Source code in lume_epics/epics_pva_server.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
def put(self, pv: SharedPV, op: ServOpWrap) -> None:
    """Updates the global input process variable state, posts the input process
    variable value change, runs the thread local BaseModel instance
    using the updated global input process variable states, and posts the model
    output values to the output process variables.

    Args:
        pv (SharedPV): Input process variable on which the put operates.

        op (ServOpWrap): Server operation initiated by the put call.

    """
    # update input values and global input process variable state
    if not self.is_constant and op.value() is not None:
        pv.post(op.value())
        self.server.update_pv(pvname=self.pvname, value=op.value())
    # mark server operation as complete
    op.done()