Source code for pyrogue.utilities.fileio._StreamWriter

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
# Description:
# Module for writing stream data.
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------
import rogue.utilities
import rogue.utilities.fileio
import pyrogue
import rogue
from typing import Any


[docs] class StreamWriter(pyrogue.DataWriter): """Wrapper ``pyrogue.DataWriter`` around ``rogue.utilities.fileio.StreamWriter``. See Also -------- :ref:`utilities_fileio_format` Canonical Rogue file record format written by StreamWriter. :ref:`utilities_fileio_reading` Notes on how files are decoded back into stream frames. Parameters ---------- configStream : dict[int, rogue.interfaces.stream.Master], optional Mapping of stream channel index to configuration/status source object. writer : rogue.utilities.fileio.StreamWriter, optional Existing Rogue stream writer instance. When ``None``, a new one is created. rawMode : bool, optional (default = False) If ``False`` (default), each frame is written in Rogue file format with per-frame metadata headers (payload length, channel, error, and flags) followed by payload bytes. If ``True``, only raw frame payload bytes are written, with no per-frame Rogue headers. **kwargs : Any Additional keyword arguments forwarded to ``pyrogue.DataWriter``. """ def __init__( self, *, configStream: dict[int, rogue.interfaces.stream.Master] | None = None, writer: rogue.utilities.fileio.StreamWriter | None = None, rawMode: bool = False, **kwargs: Any, ) -> None: pyrogue.DataWriter.__init__(self, **kwargs) self._configStream = {} if configStream is None else configStream if writer is None: self._writer = rogue.utilities.fileio.StreamWriter() else: self._writer = writer if rawMode: self._writer.setRaw(True) # Connect configuration stream for k,v in self._configStream.items(): pyrogue.streamConnect(v, self._writer.getChannel(k)) def _open(self) -> None: """Open output file and emit initial configuration records.""" self._writer.open(self.DataFile.value()) # Dump config/status to file for k,v in self._configStream.items(): v.streamYaml() self.FrameCount.set(0) self.IsOpen.get() def _close(self) -> None: """Emit final configuration records and close output file.""" # Dump config/status to file for k,v in self._configStream.items(): v.streamYaml() self._writer.close() self.IsOpen.get() def _isOpen(self) -> bool: """Return whether the writer currently has an open file.""" return self._writer.isOpen() def _setBufferSize(self, value: int) -> None: """Update writer file buffer size.""" self._writer.setBufferSize(value) def _setMaxFileSize(self, value: int) -> None: """Update writer maximum file size.""" self._writer.setMaxSize(value) def _getCurrentSize(self) -> int: """Return current output file size in bytes.""" return self._writer.getCurrentSize() def _getTotalSize(self) -> int: """Return total bytes written across files.""" return self._writer.getTotalSize() def _getBandwidth(self) -> float: """Return file-write bandwidth in bytes/second over the recent ~1 second window.""" return self._writer.getBandwidth() def _getFrameCount(self) -> int: """Return total written frame count.""" return self._writer.getFrameCount() def _waitFrameCount(self, count: int, timeout: float = 0) -> bool: """Wait for total frame count to reach ``count`` within timeout seconds.""" return self._writer.waitFrameCount(count, timeout * 1000000) def _waitFrameChannelCount(self, chan: int, count: int, timeout: float = 0) -> bool: """Wait for channel frame count to reach ``count`` within timeout seconds.""" return self._writer.getChannel(chan).waitFrameCount(count, timeout * 1000000)
[docs] def getChannel(self, chan: int) -> rogue.interfaces.stream.Slave: """Return writer channel object by index.""" return self._writer.getChannel(chan)
[docs] def setDropErrors(self, drop: bool) -> None: """Enable or disable dropping errored frames.""" self._writer.setDropErrors(drop)