Skip to content

Database

ConnectionConfig

Bases: BaseModel

Configuration for creating sqlalchemy engine.

Parameters:

Name Type Description Default
pool_size int

Number of connections to maintain in the connection pool. Establishing connections is expensive and maintaining multiple connections in a pool allows for availability.

required
pool_pre_ping bool

Performs liveliness check and expires all existing connections if the database is unreachable.

required
Source code in lume_services/services/models/db/db.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ConnectionConfig(BaseModel):
    """Configuration for creating sqlalchemy engine.

    Args:
        pool_size (int): Number of connections to maintain in the connection pool.
            Establishing connections is expensive and maintaining multiple connections
            in a pool allows for availability.
        pool_pre_ping (bool): Performs liveliness check and expires all existing
            connections if the database is unreachable.

    """

    pool_size: Optional[int]
    pool_pre_ping: bool = True

ModelDB

DBService client responsible for handling connections to the model database.

Source code in lume_services/services/models/db/db.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class ModelDB:
    """DBService client responsible for handling connections to the model database."""

    def __init__(self, config: ModelDBConfig):
        """Initialize client service.

        Args:
            config (ModelDBConfig): Connection configuration.

        """
        self.config = config
        self._create_engine()

    def _create_engine(self) -> None:
        """Create sqlalchemy engine using uri."""
        self._pid = os.getpid()

        # since using a context manager, must have context-local managed vars
        self._connection = ContextVar("connection", default=None)

        self.engine = create_engine(
            f"{self.config.dialect_str}://{self.config.user}:%s@{self.config.host}:\
                {self.config.port}/{self.config.database}"
            % quote_plus(self.config.password.get_secret_value()),
            **self.config.connection.dict(exclude_none=True),
        )

        # sessionmaker for orm operations
        self._sessionmaker = sessionmaker(bind=self.engine)

    def _connect(self) -> Connection:
        """Establish connection and set _connection."""
        cxn = self.engine.connect()
        self._connection.set(cxn)

        return cxn

    def _check_mp(self) -> None:
        """Check for multiprocessing. If PID is different that object PID, create new
        engine connection.

        """

        if os.getpid() != self._pid:
            self._create_engine()

    @property
    def _currect_connection(self) -> Connection:
        """Getter for current connection"""

        return self._connection.get()

    @contextmanager
    def connection(self) -> Connection:
        """Context manager for operations. Will clean up connections on exit of
        scope.

        """

        self._check_mp()

        # get connection
        cxn = self._connection.get()

        if cxn is None:
            cxn = self._connect()
            cleanup = True

        else:
            cleanup = False

        try:
            yield cxn

        finally:
            if cleanup:
                cxn = self._connection.get()

                if cxn:
                    cxn.close()
                    self._connection.set(None)

    def session(self) -> Session:
        """Establishes Session with active connection.

        Note: Setting expire_on_commit to False allows us to access objects
        after session closing.

        """
        logger.debug("ModelDB creating session.")
        with self.connection():
            session = self._sessionmaker()
            logger.debug("ModelDB session created.")
            session.expire_on_commit = False
            return session

    def execute(self, sql) -> list:
        """Execute sql inside a managed session.

        Args:
            sql (sqlalchemy.sql.base.Executable): SQL query to execute.

        Results:
            list: Results of query operation

        """
        logger.info("ModelDB executing: %s", str(sql))
        with self.session() as session:

            res = session.execute(sql)
            session.commit()

        logger.info("ModelDB executed: %s", str(sql))

        return res

    def select(self, sql: Select) -> list:
        """Execute sql query inside a managed session.

        Args:
            sql (Select): Selection query to execute.

        Results:
            list: Results of selection operation

        """
        logger.info("ModelDB selecting: %s", str(sql))
        with self.session() as session:

            res = session.execute(sql).scalars().all()
            session.commit()

        return res

    def insert(self, sql: Insert):
        """Execute and insert operation inside a managed session.

        Args:
            sql (Insert): Sqlalchemy insert operation

        Returns:
            Union[str, int]: primary key returned from insert operation

        """
        logger.info("ModelDB inserting: %s", str(sql))
        with self.session() as session:

            res = session.execute(sql)
            session.commit()

        logger.info("Sucessfully executed: %s", str(sql))

        return res.inserted_primary_key

    def insert_many(self, sql: List[Insert]) -> List[Union[str, int]]:
        """Execute many inserts within a managed session.

        Args:
            sql (List[Insert]): Execute a sqlalchemy insert operation

        Returns:
            List[Union[str, int]]: List of primary keys returned from insert operation

        """
        logger.info("ModelDB inserting many: %s", [str(statement) for statement in sql])
        with self.session() as session:

            results = []

            for stmt in sql:
                res = session.execute(stmt)
                results.append(res)

            session.commit()

        logger.info("Sucessfully executed: %s", [str(statement) for statement in sql])

        return [res.inserted_primary_key for res in results]

    @classmethod
    def from_config_init(cls, **kwargs) -> "ModelDB":
        """Initialize database handler from ModelDBConfig kwargs."""
        config = ModelDBConfig(**kwargs)
        return cls(config=config)

__init__(config)

Initialize client service.

Parameters:

Name Type Description Default
config ModelDBConfig

Connection configuration.

required
Source code in lume_services/services/models/db/db.py
62
63
64
65
66
67
68
69
70
def __init__(self, config: ModelDBConfig):
    """Initialize client service.

    Args:
        config (ModelDBConfig): Connection configuration.

    """
    self.config = config
    self._create_engine()

connection()

Context manager for operations. Will clean up connections on exit of scope.

Source code in lume_services/services/models/db/db.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@contextmanager
def connection(self) -> Connection:
    """Context manager for operations. Will clean up connections on exit of
    scope.

    """

    self._check_mp()

    # get connection
    cxn = self._connection.get()

    if cxn is None:
        cxn = self._connect()
        cleanup = True

    else:
        cleanup = False

    try:
        yield cxn

    finally:
        if cleanup:
            cxn = self._connection.get()

            if cxn:
                cxn.close()
                self._connection.set(None)

execute(sql)

Execute sql inside a managed session.

Parameters:

Name Type Description Default
sql Executable

SQL query to execute.

required
Results

list: Results of query operation

Source code in lume_services/services/models/db/db.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def execute(self, sql) -> list:
    """Execute sql inside a managed session.

    Args:
        sql (sqlalchemy.sql.base.Executable): SQL query to execute.

    Results:
        list: Results of query operation

    """
    logger.info("ModelDB executing: %s", str(sql))
    with self.session() as session:

        res = session.execute(sql)
        session.commit()

    logger.info("ModelDB executed: %s", str(sql))

    return res

from_config_init(**kwargs) classmethod

Initialize database handler from ModelDBConfig kwargs.

Source code in lume_services/services/models/db/db.py
238
239
240
241
242
@classmethod
def from_config_init(cls, **kwargs) -> "ModelDB":
    """Initialize database handler from ModelDBConfig kwargs."""
    config = ModelDBConfig(**kwargs)
    return cls(config=config)

insert(sql)

Execute and insert operation inside a managed session.

Parameters:

Name Type Description Default
sql Insert

Sqlalchemy insert operation

required

Returns:

Type Description

Union[str, int]: primary key returned from insert operation

Source code in lume_services/services/models/db/db.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def insert(self, sql: Insert):
    """Execute and insert operation inside a managed session.

    Args:
        sql (Insert): Sqlalchemy insert operation

    Returns:
        Union[str, int]: primary key returned from insert operation

    """
    logger.info("ModelDB inserting: %s", str(sql))
    with self.session() as session:

        res = session.execute(sql)
        session.commit()

    logger.info("Sucessfully executed: %s", str(sql))

    return res.inserted_primary_key

insert_many(sql)

Execute many inserts within a managed session.

Parameters:

Name Type Description Default
sql List[Insert]

Execute a sqlalchemy insert operation

required

Returns:

Type Description
List[Union[str, int]]

List[Union[str, int]]: List of primary keys returned from insert operation

Source code in lume_services/services/models/db/db.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def insert_many(self, sql: List[Insert]) -> List[Union[str, int]]:
    """Execute many inserts within a managed session.

    Args:
        sql (List[Insert]): Execute a sqlalchemy insert operation

    Returns:
        List[Union[str, int]]: List of primary keys returned from insert operation

    """
    logger.info("ModelDB inserting many: %s", [str(statement) for statement in sql])
    with self.session() as session:

        results = []

        for stmt in sql:
            res = session.execute(stmt)
            results.append(res)

        session.commit()

    logger.info("Sucessfully executed: %s", [str(statement) for statement in sql])

    return [res.inserted_primary_key for res in results]

select(sql)

Execute sql query inside a managed session.

Parameters:

Name Type Description Default
sql Select

Selection query to execute.

required
Results

list: Results of selection operation

Source code in lume_services/services/models/db/db.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def select(self, sql: Select) -> list:
    """Execute sql query inside a managed session.

    Args:
        sql (Select): Selection query to execute.

    Results:
        list: Results of selection operation

    """
    logger.info("ModelDB selecting: %s", str(sql))
    with self.session() as session:

        res = session.execute(sql).scalars().all()
        session.commit()

    return res

session()

Establishes Session with active connection.

Note: Setting expire_on_commit to False allows us to access objects after session closing.

Source code in lume_services/services/models/db/db.py
141
142
143
144
145
146
147
148
149
150
151
152
153
def session(self) -> Session:
    """Establishes Session with active connection.

    Note: Setting expire_on_commit to False allows us to access objects
    after session closing.

    """
    logger.debug("ModelDB creating session.")
    with self.connection():
        session = self._sessionmaker()
        logger.debug("ModelDB session created.")
        session.expire_on_commit = False
        return session

ModelDBConfig

Bases: BaseModel

Configuration for SQL connection using sqlalchemy.

Parameters:

Name Type Description Default
host str

Host of SQL server.

required
port str

Port of SQL server on host.

required
user str

User for connecting to SQL server.

required
password SecretStr

Password for auth.

required
database str

Name of database.

required
connection ConnectionConfig

Configuration options for creating sqlalchemy engine.

required
Source code in lume_services/services/models/db/db.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class ModelDBConfig(BaseModel):
    """Configuration for SQL connection using sqlalchemy.

    Args:
        host (str): Host of SQL server.
        port (str): Port of SQL server on host.
        user (str): User for connecting to SQL server.
        password (SecretStr): Password for auth.
        database (str): Name of database.
        connection (ConnectionConfig): Configuration options for creating sqlalchemy
            engine.

    """

    host: str
    port: int
    user: str
    password: SecretStr = Field(exclude=True)
    database: str
    connection: ConnectionConfig = ConnectionConfig()
    dialect_str: str = "mysql+pymysql"