Basic Pyrogue Tree Tutorial
In this tutorial, we will go through all of the steps creating a pyrogue tree.
Creating a Root Node
The first step is to create the Root Node, which is the base Node of the tree. In order to do this, we need to create the subclass of Root, including calling the Root node.
Basic Initialization
Steps:
Create the subclass of Root (ExampleRoot)
Define needed specialized parameters(_sdata, _fig, _ax)
Call the
Root.__init__
class ExampleRoot(pyrogue.Root):
def __init__(self, epics4En=False):
self._scnt = 0
self._sdata = np.zeros(100,dtype=np.float64)
self._fig = None
self._ax = None
pyrogue.Root.__init__(self,
description="Example Root",
timeout=2.0,
pollEn=True)
Create a Memory Space Emulator
Create the memory emulater.
class ExampleRoot(pyrogue.Root):
def __init__(self, epics4En=False):
...
# Use a memory space emulator
sim = rogue.interfaces.memory.Emulate(4,0x1000)
sim.setName("SimSlave")
self.addInterface(sim)
Add Your Devices
Add devices (see AxiVersion for an example device class)
class ExampleRoot(pyrogue.Root):
def __init__(self, epics4En=False):
...
# Add Device
self.add(pyrogue.examples.AxiVersion(memBase=sim,
guiGroup='TestGroup',
offset=0x0))
self.add(pyrogue.examples.LargeDevice(guiGroup='TestGroup'))
Configuration Stream
Create configuration stream Variable
class ExampleRoot(pyrogue.Root):
def __init__(self, epics4En=False):
...
# Create configuration stream
stream = pyrogue.interfaces.stream.Variable(root=self)
Add Special Devices
Create ppg card interfaces
Create PRBS Transmitter PrbsRx Device Class device
Create a Stream Writer device
Create a Data Receiver device
Create a Run Control device
class ExampleRoot(pyrogue.Root):
def __init__(self, epics4En=False):
...
# PRBS Transmitter
self._prbsTx = pyrogue.utilities.prbs.PrbsTx()
self.add(self._prbsTx)
# Add Data Writer, configuration goes to channel 1
self._fw = pyrogue.utilities.fileio.StreamWriter(configStream={1: stream},rawMode=True)
self.add(self._fw)
self._prbsTx >> self._fw.getChannel(0)
# Data Receiver
drx = pyrogue.DataReceiver()
self._prbsTx >> drx
self.add(drx)
# Add Run Control
self.add(pyrogue.RunControl())
Add ZMQServer
Create a ZMQ Server interface
class ExampleRoot(pyrogue.Root):
def __init__(self, epics4En=False):
...
# Add zmq server
self.zmqServer = pyrogue.interfaces.ZmqServer(root=self, addr='127.0.0.1', port=0)
self.addInterface(self.zmqServer)
Add Process Controller
Add a Process Controller
class ExampleRoot(pyrogue.Root):
def __init__(self, epics4En=False):
...
# Add process controller
p = pyrogue.Process()
p.add(pyrogue.LocalVariable(name='Test1',value=''))
p.add(pyrogue.LocalVariable(name='Test2',value=''))
self.add(p)
Add Linked and Local Variables
class ExampleRoot(pyrogue.Root):
def __init__(self, epics4En=False):
...
self.add(pyrogue.LocalVariable(
name = 'TestPlot',
mode = 'RO',
pollInterval=1.0,
localGet = self._mySin,
minimum=-1.0,
maximum=1.0,
disp='{:1.2f}',
value = 0.0))
self.add(pyrogue.LocalVariable(
name = 'TestXAxis',
mode = 'RO',
pollInterval=1.0,
localGet = self._myXAxis,
disp='{:1.2f}',
value = 1.0))
self.add(pyrogue.LocalVariable(
name = 'TestArray',
mode = 'RO',
pollInterval=1.0,
localGet = self._myArray,
disp='{:1.2f}'))
#value = np.zeros(100,dtype=np.float64)))
self.add(pyrogue.LinkVariable(
name = 'TestPlotFigure',
mode = 'RO',
dependencies = [self.TestArray],
linkedGet = self._getPlot))
Connect to EPICS if needed
Connect it to the EPICS Protocol class
class ExampleRoot(pyrogue.Root):
def __init__(self, epics4En=False):
...
if epics4En:
self._epics4=pyrogue.protocols.epicsV4.EpicsPvServer(base="test", root=self,incGroups=None,excGroups=None)
self.addProtocol(self._epics4)
Create and Connect Memory Commands
class ExampleRoot(pyrogue.Root):
def __init__(self, epics4En=False):
...
# Remote memory command slave example
osSlave = pyrogue.examples.OsMemSlave()
osSlave.setName("OsSlave")
self.addInterface(osSlave)
self.add(pyrogue.examples.OsMemMaster(memBase=osSlave))
Override Necessary Functions
class ExampleRoot(pyrogue.Root):
...
def _mySin(self):
val = math.sin(2*math.pi*self._scnt / 100)
self._sdata = np.append(self._sdata,val)
self._scnt += 1
return val
def _myXAxis(self):
return float(self._scnt)
def _myArray(self):
return self._sdata
def _getPlot(self, read):
if self._fig is not None:
plt.close(self._fig)
self._fig.clf()
del self._ax, self._fig
self._fig = plt.Figure()
self._ax = self._fig.add_subplot(111)
self._ax.plot(self.TestArray.get(read=read))
return self._fig