#-----------------------------------------------------------------------------
# Company : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# PyRogue base module - Device Class
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
# https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
from __future__ import annotations
import collections
import functools as ft
import threading
from typing import Any, Callable, Iterable, Literal
import pyrogue as pr
import rogue.interfaces.memory as rim
class EnableVariable(pr.BaseVariable):
"""Enable flag variable with dependency-aware reporting.
Parameters
----------
enabled : bool
Initial enable state.
deps : iterable, optional
Dependency variables that can disable this one.
"""
def __init__(self, *, enabled: bool, deps: Iterable[pr.BaseVariable] | None = None) -> None:
"""Initialize the enable-state variable."""
pr.BaseVariable.__init__(
self,
description='Determines if device is enabled for hardware access',
name='enable',
mode='RW',
value=enabled,
groups='Enable',
disp={False: 'False', True: 'True', 'parent': 'ParentFalse', 'deps': 'ExtDepFalse'})
if deps is None:
self._deps = []
self._depDis = False
else:
self._deps = deps
self._depDis = True
for d in self._deps:
d.addListener(self)
self._value = enabled
self._lock = threading.Lock()
@pr.expose
def get(self, read: bool = False, index: int = -1) -> Any:
"""Return the effective enable state.
Parameters
----------
read : bool, optional (default = False)
Unused for enable evaluation.
index : int, optional (default = -1)
Unused for enable evaluation.
Returns
-------
Any
``True``/``False`` or a dependency status string.
"""
ret = self._value
with self._lock:
if self._value is False:
ret = False
elif self._depDis:
ret = 'deps'
elif self._parent is self._root:
#print("Root enable = {}".format(self._value))
ret = self._value
else:
if self._parent._parent.enable.value() is not True:
ret = 'parent'
else:
ret = True
return ret
@pr.expose
def set(
self,
value: bool | Literal['parent', 'deps'],
write: bool = True,
index: int = -1,
) -> None:
"""Set the enable value.
Parameters
----------
value : bool | 'parent' | 'deps'
New enable value.
write : bool, optional (default = True)
Unused for enable evaluation.
index : int, optional (default = -1)
Unused for enable evaluation.
"""
if value != 'parent' and value != 'deps':
old = self.value()
with self._lock:
self._value = value
if old != value and old != 'parent' and old != 'deps':
self.parent._updateBlockEnable()
self.parent.enableChanged(value)
with self.parent.root.updateGroup():
self._queueUpdate()
# The following concept will trigger enable listeners
# directly. This is causing lock contentions in practice
# (epics4 as an example)
#self._doUpdate()
#for var in self._listeners:
# var._doUpdate()
def _doUpdate(self) -> Any:
"""Update dependency state before notifying listeners."""
if len(self._deps) != 0:
oldEn = (self.value() is True)
with self._lock:
self._depDis = not all(x.value() for x in self._deps)
newEn = (self.value() is True)
if oldEn != newEn:
self.parent._updateBlockEnable()
self.parent.enableChanged(newEn)
return super()._doUpdate()
def _rootAttached(self, parent: Any, root: Any) -> None:
"""Attach enable listeners when rooted."""
pr.Node._rootAttached(self,parent,root)
if parent is not root:
parent._parent.enable.addListener(self)
class DeviceError(Exception):
"""Raised for device-level errors."""
pass
[docs]
class Device(pr.Node,rim.Hub):
"""Base class for PyRogue devices.
Parameters
----------
name : str, optional
Device name, defaults to the class name.
description : str, optional (default = "")
Human-readable description.
memBase : rim.MemorySlave, optional
Optional memory interface base, inherited from parent if not provided.
offset : int, optional (default = 0)
Memory offset for the device.
hidden : bool, optional (default = False)
If True, add the device to the ``Hidden`` group.
groups : list[str], optional
Groups to assign.
expand : bool, optional (default = False)
Default GUI expand state.
enabled : bool, optional (default = True)
Initial enable state.
defaults : dict, optional
Default variable values keyed by name.
enableDeps : iterable, optional
Enable dependency variables.
hubMin : int, optional (default = 0)
Hub minimum access size.
hubMax : int, optional (default = 0)
Hub maximum access size.
guiGroup : str, optional
GUI grouping label.
"""
def __init__(
self,
*,
name: str | None = None,
description: str = '',
memBase: rim.MemorySlave | None = None,
offset: int = 0,
hidden: bool = False,
groups: list[str] | None = None,
expand: bool = False,
enabled: bool = True,
defaults: dict | None = None,
enableDeps: Iterable[pr.BaseVariable] | None = None,
hubMin: int = 0,
hubMax: int = 0,
guiGroup: str | None = None,
) -> None:
"""Initialize the device node and associated interfaces."""
if name is None:
name = self.__class__.__name__
# Hub.__init__ must be called first for _setSlave to work below
rim.Hub.__init__(self,offset,hubMin,hubMax)
# Blocks
self._blocks = []
self._custBlocks = []
self._memBase = memBase
self._memLock = threading.RLock()
self._defaults = defaults if defaults is not None else {}
self._ifAndProto = []
self.forceCheckEach = False
# Connect to memory slave
if memBase:
self._setSlave(memBase)
# Node.__init__ can't be called until after self._memBase is created
pr.Node.__init__(self, name=name, hidden=hidden, groups=groups, description=description, expand=expand, guiGroup=guiGroup)
self._log.info("Making device {:s}".format(name))
# Convenience methods
self.addRemoteCommands = ft.partial(self.addNodes, pr.RemoteCommand)
# Variable interface to enable flag
self.add(EnableVariable(enabled=enabled, deps=enableDeps))
self.add(pr.LocalCommand(name='ReadDevice', value=False, hidden=True,
function=lambda arg: self.readAndCheckBlocks(recurse=arg),
description='Force read of device without recursion'))
self.add(pr.LocalCommand(name='WriteDevice', value='', hidden=True,
function=lambda arg: self.writeAndVerifyBlocks(force=True,recurse=arg),
description='Force write of device without recursion'))
@pr.expose
@property
def address(self) -> Any:
"""Return the device base address."""
return self._getAddress()
@pr.expose
@property
def offset(self) -> Any:
"""Return the device offset."""
return self._getOffset()
[docs]
def addCustomBlock(self, block: Any) -> None:
"""Add a pre-defined memory block to the device."""
self._custBlocks.append(block)
self._custBlocks.sort(key=lambda x: (x.offset, x.size))
[docs]
def add(self, node: Any) -> None:
"""Add a child node to the device."""
# Call node add
pr.Node.add(self,node)
# Adding device
if isinstance(node,Device):
# Device does not have a membase
if node._memBase is None:
node._setSlave(self)
[docs]
def addInterface(self, *interfaces: Any | Iterable[Any]) -> None:
"""Add stream or memory interfaces to manage.
Parameters
----------
*interfaces : Any
One or more interfaces or iterables of interfaces.
"""
for interface in interfaces:
if isinstance(interface, collections.abc.Iterable):
self._ifAndProto.extend(interface)
else:
self._ifAndProto.append(interface)
[docs]
def addProtocol(self, *protocols: Any) -> None:
"""Add protocol entities (alias of ``addInterface``)."""
self.addInterface(protocols)
[docs]
def manage(self, *interfaces: Any) -> None:
"""Manage additional interfaces for start/stop."""
self._ifAndProto.extend(interfaces)
def _start(self) -> None:
""" Called recursively from Root.start when starting """
for intf in self._ifAndProto:
if hasattr(intf,"_start"):
intf._start()
for d in self.devices.values():
d._start()
def _stop(self) -> None:
"""Called recursively from Root.stop when exiting"""
for intf in self._ifAndProto:
if hasattr(intf,"_stop"):
intf._stop()
for d in self.devices.values():
d._stop()
@property
def running(self) -> bool:
"""Return True if the device is running."""
return self.root is not None and self.root.running
[docs]
def addRemoteVariables(
self,
number: int,
stride: int,
pack: bool = False,
**kwargs: Any,
) -> None:
"""Add a repeating block of remote variables.
Parameters
----------
number : int
Number of variables to add.
stride : int
Byte stride between instances.
pack : bool, optional (default = False)
If True, add a packed link variable for all entries.
**kwargs : Any
Arguments forwarded to ``RemoteVariable``.
"""
if pack:
hidden=True
else:
hidden=kwargs.pop('hidden', False)
self.addNodes(pr.RemoteVariable, number, stride, hidden=hidden, **kwargs)
# If pack specified, create a linked variable to combine everything
if pack:
varList = getattr(self, kwargs['name']).values()
def linkedSet(dev: Any, var: pr.LinkVariable, val: str, write: bool) -> None:
"""Split a packed display string and write each element variable."""
if val == '':
return
values = reversed(val.split('_'))
for variable, value in zip(varList, values):
variable.setDisp(value, write=write)
def linkedGet(dev: Any, var: pr.LinkVariable, read: bool) -> str:
"""Join element display values into one packed underscore string."""
values = [v.getDisp(read=read) for v in varList]
return '_'.join(reversed(values))
name = kwargs.pop('name')
kwargs.pop('value', None)
lv = pr.LinkVariable(name=name, value='', dependencies=varList, linkedGet=linkedGet, linkedSet=linkedSet, **kwargs)
self.add(lv)
[docs]
def setPollInterval(
self,
interval: float,
variables: Iterable[Any] | None = None,
) -> None:
"""Set the poll interval for a group of variables.
Parameters
----------
interval : float
Polling interval in seconds.
variables : iterable, optional
Iterable of variable names or variables. If ``None``, all
variables with nonzero poll interval are updated.
"""
if variables is None:
variables = [k for k,v in self.variables.items() if v.pollInterval != 0]
for x in variables:
self.node(x).setPollInterval(interval)
[docs]
def hideVariables(
self,
hidden: bool,
variables: Iterable[Any] | None = None,
) -> None:
"""Hide a list of variables or variable names.
Parameters
----------
hidden : bool
True to hide variables, False to show them.
variables : iterable, optional
Iterable of variable names or variables.
"""
if variables is None:
variables=self.variables.values()
for v in variables:
if isinstance(v, pr.BaseVariable):
v.hidden = hidden
elif isinstance(variables[0], str):
self.variables[v].hidden = hidden
[docs]
def initialize(self) -> None:
"""Call ``initialize`` on all child devices."""
for key,value in self.devices.items():
value.initialize()
[docs]
def hardReset(self) -> None:
"""Call ``hardReset`` on all child devices."""
for key,value in self.devices.items():
value.hardReset()
[docs]
def countReset(self) -> None:
"""Call ``countReset`` on all child devices."""
for key,value in self.devices.items():
value.countReset()
[docs]
def enableChanged(self, value: Any) -> None:
"""Hook for reacting to enable state changes."""
pass
#if value is True:
# self.writeAndVerifyBlocks(force=True, recurse=True, variable=None)
[docs]
def writeBlocks(
self,
*,
force: bool = False,
recurse: bool = True,
variable: Any | None = None,
checkEach: bool = False,
index: int = -1,
**kwargs: Any,
) -> None:
"""Write all blocks held by this device.
Parameters
----------
force : bool, optional (default = False)
Force the write even if values are unchanged.
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to write.
checkEach : bool, optional (default = False)
Perform per-variable verification checks.
index : int, optional (default = -1)
Optional index for array variables.
**kwargs : Any
Additional arguments passed through to the transaction.
"""
checkEach = checkEach or self.forceCheckEach
if variable is not None:
pr.startTransaction(variable._block, type=rim.Write, forceWr=force, checkEach=checkEach, variable=variable, index=index, **kwargs)
else:
for block in self._blocks:
if block.bulkOpEn:
pr.startTransaction(block, type=rim.Write, forceWr=force, checkEach=checkEach, **kwargs)
if recurse:
for key,value in self.devices.items():
value.writeBlocks(force=force, recurse=True, checkEach=checkEach, **kwargs)
[docs]
def verifyBlocks(
self,
*,
recurse: bool = True,
variable: Any | None = None,
checkEach: bool = False,
**kwargs: Any,
) -> None:
"""Verify blocks in the background.
Parameters
----------
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to verify.
checkEach : bool, optional (default = False)
Perform per-variable verification checks.
**kwargs : Any
Additional arguments passed through to the transaction.
"""
checkEach = checkEach or self.forceCheckEach
if variable is not None:
pr.startTransaction(variable._block, type=rim.Verify, checkEach=checkEach, **kwargs) # Verify range is set by previous write
else:
for block in self._blocks:
if block.bulkOpEn:
pr.startTransaction(block, type=rim.Verify, checkEach=checkEach, **kwargs)
if recurse:
for key,value in self.devices.items():
value.verifyBlocks(recurse=True, checkEach=checkEach, **kwargs)
[docs]
def readBlocks(
self,
*,
recurse: bool = True,
variable: Any | None = None,
checkEach: bool = False,
index: int = -1,
**kwargs: Any,
) -> None:
"""Read blocks in the background.
Parameters
----------
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to read.
checkEach : bool, optional (default = False)
Perform per-variable verification checks.
index : int, optional (default = -1)
Optional index for array variables.
**kwargs : Any
Additional arguments passed through to the transaction.
"""
checkEach = checkEach or self.forceCheckEach
if variable is not None:
pr.startTransaction(variable._block, type=rim.Read, checkEach=checkEach, variable=variable, index=index, **kwargs)
else:
for block in self._blocks:
if block.bulkOpEn:
pr.startTransaction(block, type=rim.Read, checkEach=checkEach, **kwargs)
if recurse:
for key,value in self.devices.items():
value.readBlocks(recurse=True, checkEach=checkEach, **kwargs)
[docs]
def checkBlocks(
self,
*,
recurse: bool = True,
variable: Any | None = None,
**kwargs: Any,
) -> None:
"""Check block transactions and notify variable listeners.
Parameters
----------
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to check.
**kwargs : Any
Additional arguments passed through to the transaction.
"""
if variable is not None:
pr.checkTransaction(variable._block, **kwargs)
else:
for block in self._blocks:
pr.checkTransaction(block, **kwargs)
if recurse:
for key,value in self.devices.items():
value.checkBlocks(recurse=True, **kwargs)
[docs]
def writeAndVerifyBlocks(
self,
force: bool = False,
recurse: bool = True,
variable: Any | None = None,
checkEach: bool = False,
) -> None:
"""Write, verify, and check all blocks.
Parameters
----------
force : bool, optional (default = False)
Force the write even if values are unchanged.
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to write/verify.
checkEach : bool, optional (default = False)
Perform per-variable verification checks.
"""
self.writeBlocks(force=force, recurse=recurse, variable=variable, checkEach=checkEach)
self.verifyBlocks(recurse=recurse, variable=variable, checkEach=checkEach)
self.checkBlocks(recurse=recurse, variable=variable)
[docs]
def readAndCheckBlocks(
self,
recurse: bool = True,
variable: Any | None = None,
checkEach: bool = False,
) -> None:
"""Read and check all blocks.
Parameters
----------
recurse : bool, optional (default = True)
If True, recurse into child devices.
variable : object, optional
Optional variable to read.
checkEach : bool, optional (default = False)
Perform per-variable verification checks.
"""
self.readBlocks(recurse=recurse, variable=variable, checkEach=checkEach)
self.checkBlocks(recurse=recurse, variable=variable)
def _updateBlockEnable(self) -> None:
"""Propagate effective enable state to this device's blocks."""
for block in self._blocks:
block.setEnable(self.enable.value() is True)
for key,value in self.devices.items():
value._updateBlockEnable()
def _buildBlocks(self) -> None:
"""Build and attach memory blocks for local/remote variables."""
remVars = []
# Use min block size, larger blocks can be pre-created
blkSize = self._blkMinAccess()
# Process all of the variables
for k,n in self.nodes.items():
# Local variables have a 1:1 block association
if isinstance(n,pr.LocalVariable):
self._blocks.append(n._block)
# Align to min access, create list of remote variables
elif isinstance(n,pr.RemoteVariable) and n.offset is not None:
self._log.info(f"Before Shift variable {n.name} offset={n.offset} bitSize={n.bitSize} bytes={n.varBytes}")
n._updatePath(n.path)
n._shiftOffsetDown(n.offset % blkSize, blkSize)
remVars += [n]
self._log.info(f"Creating variable {n.name} offset={n.offset} bitSize={n.bitSize} bytes={n.varBytes}")
# Sort var list by offset, size
remVars.sort(key=lambda x: (x.offset, x.varBytes))
blocks = []
blk = None
# Go through sorted variable list, look for overlaps, group into blocks
for n in remVars:
if blk is not None and ( (blk['offset'] + blk['size']) > n.offset):
self._log.info("Overlap detected var offset={} block offset={} block bytes={}".format(n.offset,blk['offset'],blk['size']))
n._shiftOffsetDown(n.offset - blk['offset'], blkSize)
blk['vars'].append(n)
if n.varBytes > blk['size']:
blk['size'] = n.varBytes
# We need a new block for this variable
else:
blk = None
# Look for pre-made block which overlaps
for b in self._custBlocks:
if ( (n.offset >= b.offset) and ((b.offset + b.size) > n.offset)):
# Just in case a variable extends past the end of pre-made block, user mistake
if n.varBytes > b.size:
msg = 'Failed to add variable {n.name} to pre-made block with offset {b.offset} and size {b.size}'
raise pr.MemoryError(name=self.path, address=self.address, msg=msg)
blk = {'offset':b.offset, 'size':b.size, 'vars':[n], 'block':b}
break
# Block not found
if blk is None:
blk = {'offset':n.offset, 'size':n.varBytes, 'vars':[n], 'block':None}
blocks.append(blk)
# Clear pre-made list
self._custBlocks = []
# Create new blocks and add new and pre-made blocks to device
# Add variables to the block
for b in blocks:
# Create new block
if b['block'] is None:
newBlock = rim.Block(b['offset'], b['size'])
self._log.debug("Adding new block at offset {:#02x}, size {}".format(b['offset'], b['size']))
else:
newBlock = b['block']
# Set memory slave
newBlock._setSlave(self)
# Verify the block is not too small or large for the memory interface
if newBlock.size > self._blkMaxAccess() or newBlock.size < self._blkMinAccess():
msg = f'Block size {newBlock.size} is not in the range: {self._blkMinAccess()} - {self._blkMaxAccess()}'
raise pr.MemoryError(name=self.path, address=self.address, msg=msg)
# Add variables to the block
newBlock.addVariables(b['vars'])
# Set varible block links
for v in b['vars']:
v._block = newBlock
# Add to device
self._blocks.append(newBlock)
newBlock.setEnable(self.enable.value() is True)
def _rootAttached(self, parent: pr.Node, root: pr.Root) -> None:
"""Attach this device into the rooted tree and build variable blocks."""
pr.Node._rootAttached(self, parent, root)
for key,value in self._nodes.items():
value._rootAttached(self,root)
self._buildBlocks()
# Override defaults as dictated by the _defaults dict
for varName, defValue in self._defaults.items():
nodes,keys = self.nodeMatch(varName)
if keys is None:
for var in nodes:
var._default = defValue
def _setTimeout(self, timeout: float) -> None:
"""Set transaction timeout on this device, blocks, and child devices.
Parameters
----------
timeout : float
Timeout in seconds.
"""
for block in self._blocks:
block._setTimeout(int(timeout*1000000))
rim.Master._setTimeout(self, int(timeout*1000000))
for key,value in self._nodes.items():
if isinstance(value,Device):
value._setTimeout(timeout)
[docs]
def command(self, **kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Decorator to add inline methods as commands.
The decorated function is used as ``LocalCommand(function=...)``.
The command wrapper supplies keyword arguments ``root``, ``dev``,
``cmd``, and ``arg``; the function may accept any subset.
"""
def _decorator(func: Callable[..., Any]) -> Callable[..., Any]:
"""Wrap and register a method as a LocalCommand."""
if 'name' not in kwargs:
kwargs['name'] = func.__name__
self.add(pr.LocalCommand(function=func, **kwargs))
return func
return _decorator
[docs]
def linkVariableGet(self, **kwargs: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Decorator to add inline ``linkedGet`` functions.
The decorated function is used as ``LinkVariable(linkedGet=...)``.
The linked-get wrapper supplies keyword arguments ``dev``, ``var``,
``read``, ``index``, and ``check``; the function may accept any subset.
"""
def _decorator(func: Callable[..., Any]) -> Callable[..., Any]:
"""Wrap and register a function as LinkVariable linkedGet."""
if 'name' not in kwargs:
kwargs['name'] = func.__name__
self.add(pr.LinkVariable(linkedGet=func, **kwargs))
return func
return _decorator
[docs]
def genDocuments(
self,
path: str,
incGroups: str | list[str] | None = None,
excGroups: str | list[str] | None = None,
) -> None:
"""Generate Sphinx documentation pages for this device.
Parameters
----------
path : str
Output directory path.
incGroups : str or list[str], optional
Group name or group names to include.
excGroups : str or list[str], optional
Group name or group names to exclude.
"""
with open(path + '/' + self.path.replace('.','_') + '.rst','w') as file:
print("****************************",file=file)
print(self.name,file=file)
print("****************************",file=file)
print('',file=file)
print(pr.genDocDesc(self.description,0),file=file)
print('',file=file)
plist = self.getNodes(typ=pr.Process,incGroups=incGroups,excGroups=excGroups)
dlist = self.devicesByGroup(incGroups=incGroups,excGroups=excGroups)
vlist = self.variablesByGroup(incGroups=incGroups,excGroups=excGroups)
clist = self.commandsByGroup(incGroups=incGroups,excGroups=excGroups)
tlist = {k:v for k,v in dlist.items() if k not in plist}
if len(tlist) > 0:
print(".. toctree::",file=file)
print(" :maxdepth: 1",file=file)
print(" :caption: Sub Devices:",file=file)
print('',file=file)
for k,v in tlist.items():
print(' ' + v.path.replace('.','_'),file=file)
v.genDocuments(path,incGroups,excGroups)
print('',file=file)
if len(plist) > 0:
print(".. toctree::",file=file)
print(" :maxdepth: 1",file=file)
print(" :caption: Processes:",file=file)
print('',file=file)
for k,v in plist.items():
print(' ' + v.path.replace('.','_'),file=file)
v.genDocuments(path,incGroups,excGroups)
print('',file=file)
print("Summary",file=file)
print("#######",file=file)
print('',file=file)
if len(vlist):
print("Variable List",file=file)
print("*************",file=file)
print('',file=file)
for k,v in vlist.items():
print(f"* {k}",file=file)
print('',file=file)
if len(clist):
print("Command List",file=file)
print("*************",file=file)
print('',file=file)
for k,v in clist.items():
print(f"* {k}",file=file)
print('',file=file)
print("Details",file=file)
print("#######",file=file)
print('',file=file)
if len(vlist):
print("Variables",file=file)
print("*********",file=file)
print('',file=file)
for k,v in vlist.items():
v._genDocs(file=file)
print('',file=file)
if len(clist):
print("Commands",file=file)
print("********",file=file)
print('',file=file)
for k,v in clist.items():
v._genDocs(file=file)
print('',file=file)
class ArrayDevice(Device):
"""Device that instantiates an array of sub-devices.
Parameters
----------
arrayClass : object
Class to instantiate for each element.
number : int
Number of devices in the array.
stride : int, optional (default = 0)
Address stride between array elements.
arrayArgs : object, optional
Per-element argument overrides.
**kwargs : Any
Additional arguments forwarded to ``Device``.
"""
def __init__(
self,
*,
arrayClass: Any,
number: int,
stride: int = 0,
arrayArgs: Any | None = None,
**kwargs: Any,
) -> None:
"""Initialize and populate an array-style device container."""
if 'name' not in kwargs:
kwargs['name'] = f'{arrayClass.__name__}Array'
super().__init__(**kwargs)
if arrayArgs is None:
arrayArgs = [{} for x in range(number)]
elif isinstance(arrayArgs, dict):
arrayArgs = [arrayArgs.copy() for x in range(number)]
for i in range(number):
args = arrayArgs[i]
if 'name' in args:
name = args.pop('name')
else:
name = f'{arrayClass.__name__}[{i}]'
self.add(arrayClass(
name=name,
offset=i*stride,
**args))