#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Process Device Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import threading
import time
import pyrogue as pr
[docs]
class Process(pr.Device):
"""Special base class to execute processes."""
def __init__(self, *, argVariable=None, returnVariable=None, function=None, **kwargs):
pr.Device.__init__(self, **kwargs)
self._lock = threading.Lock()
self._thread = None
self._runEn = False
self._argVar = argVariable
self._retVar = returnVariable
self._function = function
self._functionWrap = pr.functionWrapper(function=self._function, callArgs=['root', 'dev', 'arg'])
self.add(pr.LocalCommand(
name='Start',
function=self._startProcess,
description='Start process. No Args.'))
self.add(pr.LocalCommand(
name='Stop',
function=self._stopProcess,
description='Stop process. No Args.'))
self.add(pr.LocalVariable(
name='Running',
mode='RO',
value=False,
pollInterval=1.0,
description='Status values indicating if operation is running.'))
self.add(pr.LocalVariable(
name='Progress',
mode='RO',
units='Pct',
value=0.0,
disp = '{:1.2f}',
minimum=0.0,
maximum=1.0,
pollInterval=1.0,
description='Percent complete: 0 - 100 %.'))
self.add(pr.LocalVariable(
name='Message',
mode='RO',
value='',
pollInterval=1.0,
description="Process status message. Prefixed with 'Error:' if an error occurred."))
self.add(pr.LocalVariable(
name = 'Step',
mode = 'RO',
pollInterval=1.0,
value = 1,
description = "Current number of steps"))
self.add(pr.LocalVariable(
name = 'TotalSteps',
mode = 'RO',
value = 1,
description = "Total number of loops steps for the process"))
# Add arg variable if not already added
if self._argVar is not None and self._argVar not in self:
self.add(self._argVar)
# Add return variable if not already added
if self._retVar is not None and self._retVar not in self:
self.add(self._retVar)
def _incrementSteps(self, incr):
with self.Step.lock:
self.Step.set(self.Step.value() + incr)
self.Progress.set(self.Step.value()/self.TotalSteps.value())
def _setSteps(self, value):
self.Step.set(value)
self.Progress.set(self.Step.value()/self.TotalSteps.value())
def _startProcess(self):
""" """
with self._lock:
if self.Running.value() is False:
self._runEn = True
self._thread = threading.Thread(target=self._run)
self._thread.start()
else:
self._log.warning("Process already running!")
def _stopProcess(self):
""" """
with self._lock:
self._runEn = False
def _stop(self):
""" """
self._stopProcess()
pr.Device._stop(self)
def __call__(self,arg=None):
with self._lock:
if self.Running.value() is False:
if arg is not None and self._argVar is not None:
self.nodes[self._argVar].setDisp(arg)
self._runEn = True
self._thread = threading.Thread(target=self._run)
self._thread.start()
else:
self._log.warning("Process already running!")
return None
def _run(self):
""" """
self.Running.set(True)
try:
with self.root.updateGroup(period=1.0):
self._process()
except Exception as e:
pr.logException(self._log,e)
self.Message.setDisp("Stopped after error!")
self.Running.set(False)
def _process(self):
""" """
# User has provided a function Update status at start and end and call their function
if self._function is not None:
self.Message.setDisp("Running")
self.Progress.set(0.0)
self.Step.set(0)
if self._argVar is not None:
arg = self._argVar.get()
else:
arg = None
ret = self._functionWrap(function=self._function, root=self.root, dev=self, arg=arg)
if self._retVar is not None:
self._retVar.set(ret)
self.Message.setDisp("Done")
self.Progress.set(1.0)
# No function run example process
else:
self.Message.setDisp("Started")
self.TotalSteps.set(100)
for i in range(101):
if self._runEn is False:
break
time.sleep(1)
self._setSteps(i+1)
self.Message.setDisp(f"Running for {i} seconds.")
self.Message.setDisp("Done")