Source code for pyrogue._RunControl

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue base module - Run Control Device Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import pyrogue as pr
import threading
import time


[docs] class RunControl(pr.Device): """ Special base class to control runs. Attributes ---------- attr1 : pr.Device """ def __init__(self, *, hidden=True, rates=None, states=None, cmd=None, **kwargs): """ Initialize device class - test Parameters ---------- param1 : * param2 : hidden=True param3 : rates=None param4 : states=None param5 : cmd=None param6 : **kwargs returns: none ---------- """ if rates is None: rates={1:'1 Hz', 10:'10 Hz'} if states is None: states={0:'Stopped', 1:'Running'} pr.Device.__init__(self, hidden=hidden, **kwargs) value = [k for k,v in states.items()][0] self._thread = None self._cmd = cmd self.add(pr.LocalVariable( name='runState', value=value, mode='RW', groups=['NoConfig'], disp=states, localSet=self._setRunState, description='Run state of the system.')) value = [k for k,v in rates.items()][0] self.add(pr.LocalVariable( name='runRate', value=value, mode='RW', disp=rates, localSet=self._setRunRate, description='Run rate of the system.')) self.add(pr.LocalVariable( name='runCount', value=0, typeStr='UInt32', mode='RO', pollInterval=1, description='Run Counter updated by run thread.')) def _setRunState(self,value,changed): """ Set run state. Re-implement in sub-class. Enum of run states can also be overridden. Underlying run control must update runCount variable. Parameters ---------- param1: self param2: value param3: changed returns: ------- """ if changed: if self.runState.valueDisp() == 'Running': #print("Starting run") self._thread = threading.Thread(target=self._run) self._thread.start() elif self._thread is not None: #print("Stopping run") self._thread.join() self._thread = None def _setRunRate(self,value): """ Set run rate. Re-implement in sub-class if necessary. Parameters ---------- param1: self param2: value returns: ------- """ pass def _run(self): """ Parameters ---------- param1: self ---------- """ #print("Thread start") self.runCount.set(0) with self.root.updateGroup(period=1.0): while (self.runState.valueDisp() == 'Running'): time.sleep(1.0 / float(self.runRate.value())) if self._cmd is not None: self._cmd() with self.runCount.lock: self.runCount.set(self.runCount.value() + 1)