#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Run Control Device Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import pyrogue as pr
import threading
import time
[docs]
class RunControl(pr.Device):
"""
Special base class to control runs.
Attributes
----------
attr1 :
pr.Device
"""
def __init__(self, *, hidden=True, rates=None, states=None, cmd=None, **kwargs):
"""
Initialize device class - test
Parameters
----------
param1 : *
param2 : hidden=True
param3 : rates=None
param4 : states=None
param5 : cmd=None
param6 : **kwargs
returns: none
----------
"""
if rates is None:
rates={1:'1 Hz', 10:'10 Hz'}
if states is None:
states={0:'Stopped', 1:'Running'}
pr.Device.__init__(self, hidden=hidden, **kwargs)
value = [k for k,v in states.items()][0]
self._thread = None
self._cmd = cmd
self.add(pr.LocalVariable(
name='runState',
value=value,
mode='RW',
groups=['NoConfig'],
disp=states,
localSet=self._setRunState,
description='Run state of the system.'))
value = [k for k,v in rates.items()][0]
self.add(pr.LocalVariable(
name='runRate',
value=value,
mode='RW',
disp=rates,
localSet=self._setRunRate,
description='Run rate of the system.'))
self.add(pr.LocalVariable(
name='runCount',
value=0,
typeStr='UInt32',
mode='RO',
pollInterval=1,
description='Run Counter updated by run thread.'))
def _setRunState(self,value,changed):
"""
Set run state. Re-implement in sub-class.
Enum of run states can also be overridden.
Underlying run control must update runCount variable.
Parameters
----------
param1: self
param2: value
param3: changed
returns:
-------
"""
if changed:
if self.runState.valueDisp() == 'Running':
#print("Starting run")
self._thread = threading.Thread(target=self._run)
self._thread.start()
elif self._thread is not None:
#print("Stopping run")
self._thread.join()
self._thread = None
def _setRunRate(self,value):
"""
Set run rate. Re-implement in sub-class if necessary.
Parameters
----------
param1: self
param2: value
returns:
-------
"""
pass
def _run(self):
"""
Parameters
----------
param1: self
----------
"""
#print("Thread start")
self.runCount.set(0)
with self.root.updateGroup(period=1.0):
while (self.runState.valueDisp() == 'Running'):
time.sleep(1.0 / float(self.runRate.value()))
if self._cmd is not None:
self._cmd()
with self.runCount.lock:
self.runCount.set(self.runCount.value() + 1)