Source code for pyrogue.interfaces._SqlLogging

#-----------------------------------------------------------------------------
# Company    : SLAC National Accelerator Laboratory
#-----------------------------------------------------------------------------
#  Description:
#       PyRogue - SQL Logging Module
#-----------------------------------------------------------------------------
# This file is part of the rogue software platform. It is subject to
# the license terms in the LICENSE.txt file found in the top-level directory
# of this distribution and at:
#    https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html.
# No part of the rogue software platform, including this file, may be
# copied, modified, propagated, or distributed except according to the terms
# contained in the LICENSE.txt file.
#-----------------------------------------------------------------------------

import pyrogue as pr
import sqlalchemy
import threading
import queue
import json
from typing import Any


[docs] class SqlLogger(object): """Logs variable updates and system log entries to a SQL database. Parameters ---------- root : pr.Root PyRogue root node to monitor. url : str SQLAlchemy database URL. incGroups : object, optional Groups to include in variable updates. excGroups : object, optional Groups to exclude from variable updates. """
[docs] def __init__( self, *, root: pr.Root, url: str, incGroups: str | list[str] | None = None, excGroups: str | list[str] | None = ['NoSql'], ) -> None: self._log = pr.logInit(cls=self,name="SqlLogger",path=None) self._url = url self._engine = None self._thread = None self._queue = queue.Queue() self._thread = threading.Thread(target=self._worker) self._thread.start() self._sysLogPath = root.SystemLogLast.path root.addVarListener(func=self._varUpdate, done=None, incGroups=incGroups, excGroups=excGroups) try: engine = sqlalchemy.create_engine(self._url) #, isolation_level="AUTOCOMMIT") self._log.info("Opened database connection to {}".format(self._url)) except Exception as e: self._log.error("Failed to open database connection to {}: {}".format(self._url,e)) return #self._metadata = sqlalchemy.MetaData(engine) self._metadata = sqlalchemy.MetaData() self._varTable = sqlalchemy.Table( 'variables', self._metadata, sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True), sqlalchemy.Column('timestamp', sqlalchemy.DateTime(timezone=True), server_default=sqlalchemy.func.now()), sqlalchemy.Column('path', sqlalchemy.String), sqlalchemy.Column('enum', sqlalchemy.String), sqlalchemy.Column('disp', sqlalchemy.String), sqlalchemy.Column('value', sqlalchemy.String), sqlalchemy.Column('valueDisp', sqlalchemy.String), sqlalchemy.Column('severity', sqlalchemy.String), sqlalchemy.Column('status', sqlalchemy.String)) self._logTable = sqlalchemy.Table( 'syslog', self._metadata, sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True), sqlalchemy.Column('timestamp', sqlalchemy.DateTime(timezone=True), server_default=sqlalchemy.func.now()), sqlalchemy.Column('name', sqlalchemy.String), sqlalchemy.Column('message', sqlalchemy.String), sqlalchemy.Column('exception', sqlalchemy.String), sqlalchemy.Column('levelName', sqlalchemy.String), sqlalchemy.Column('levelNumber', sqlalchemy.Integer)) self._varTable.create(engine, checkfirst=True) self._logTable.create(engine, checkfirst=True) self._engine = engine
def _stop(self) -> None: """Stop the SQL logger worker thread and flush pending entries.""" if not self._queue.empty(): print("Waiting for sql logger to finish...") self._queue.put(None) self._thread.join() print('Sql logger stopped')
[docs] def insert_from_q(self, entry: tuple[str, Any], conn: sqlalchemy.Connection) -> None: """ Insert a single queue entry into the database. Parameters ---------- entry : tuple (path, value) for variables or (path, log_data) for syslog. conn : object SQLAlchemy connection for the transaction. """ # Syslog if entry[0] == self._sysLogPath: val = json.loads(entry[1]) ins = self._logTable.insert().values( name=val['name'], message=val['message'], exception=val['exception'], levelName=val['levelName'], levelNumber=val['levelNumber']) # Variable else: # Handle corner cases value = entry[1].value if isinstance(value, int) and value.bit_length() > 64: # Support >64 bit ints value = entry[1].valueDisp elif isinstance(value, tuple): # Support tuples value = str(value) ins = self._varTable.insert().values( path=entry[0], enum=str(entry[1].enum), disp=entry[1].disp, value=value, valueDisp=entry[1].valueDisp, severity=entry[1].severity, status=entry[1].status) conn.execute(ins)
def _worker(self) -> None: """Worker thread that processes queue entries and writes to the database.""" while True: # Block and wait for a queue entry to arrive entry = self._queue.get() # Exit thread if None entry received if entry is None: return # Do nothing with the data if db connection not present if self._engine is None: continue # If there are multiple entries in the queue # write them to the DB in a single transaction try: with self._engine.begin() as conn: while True: #Need to check for null again from loop if entry is None: return # Insert the entry self.insert_from_q(entry, conn) # Break from loop and close the transaction # when queue has been emptied if self._queue.qsize() == 0: break # Read the next queue entry and loop entry = self._queue.get() except Exception as e: self._engine = None pr.logException(self._log,e) self._log.error("Lost database connection to {}".format(self._url)) def _varUpdate(self, path: str, value: Any) -> None: """Queue a variable update for the worker to write to the database.""" self._queue.put((path,value))
[docs] class SqlReader(object): """Read variable and syslog data from a SQL database. Parameters ---------- url : str SQLAlchemy database URL. """
[docs] def __init__(self, url: str) -> None: self._log = pr.logInit(cls=self,name="SqlReader",path=None) self._url = url self._engine = None try: engine = sqlalchemy.create_engine(self._url) self._log.info("Opened database connection to {}".format(self._url)) except Exception as e: self._log.error("Failed to open database connection to {}: {}".format(self._url,e)) return self._metadata = sqlalchemy.MetaData(engine) self._varTable = sqlalchemy.Table('variables', self._metadata, autoload=True) self._logTable = sqlalchemy.Table('syslog', self._metadata, autoload=True) self._engine = engine
[docs] def getVariable(self) -> None: """Fetch and print all variable entries. Placeholder for future enhancement.""" r = self._conn.execute(sqlalchemy.select([self._varTable])) print(r.fetchall())
[docs] def getSyslog(self, syslogData: Any) -> None: """Fetch and print syslog entries. Placeholder for future enhancement.""" r = self._conn.execute(sqlalchemy.select([self._logTable])) print(r.fetchall())