Skip to content

Database

ResultsDB

Bases: ABC

Implementation of the database.

Source code in lume_services/services/results/db.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ResultsDB(ABC):
    """Implementation of the database."""

    @abstractmethod
    def __init__(self, db_config: ResultsDBConfig):
        ...

    @abstractmethod
    def insert_one(self, item: dict, **kwargs) -> str:
        """Insert document into the database.

        Args:
            item (dict): Dictionary representation of item

        Returns:
            str: Inserted item id

        """

    @abstractmethod
    def insert_many(self, items: List[dict], **kwargs) -> List[str]:
        """Insert many documents into the database.

        Args:
            items (List[dict]): List of dictionary representations of items

        Returns:
            List[str]: List of interted ids

        """

    @abstractmethod
    def find(self, *, query: dict, fields: List[str] = None, **kwargs) -> List[dict]:
        """Find a document based on a query.

        Args:
            query (dict): fields to query on
            fields (List[str]): List of fields to return if any
            **kwargs (dict): DB implementation specific fields

        Returns:
            List[dict]: List of dict reps of found items.

        """

    @abstractmethod
    def find_all(self, **kwargs) -> List[dict]:
        """Find all documents for a collection

        Returns:
            List[dict]: List of result items represented as dict.
        """

    @abstractmethod
    def configure(self, **kwargs) -> None:
        """Configure the results db service."""

configure(**kwargs) abstractmethod

Configure the results db service.

Source code in lume_services/services/results/db.py
69
70
71
@abstractmethod
def configure(self, **kwargs) -> None:
    """Configure the results db service."""

find(*, query, fields=None, **kwargs) abstractmethod

Find a document based on a query.

Parameters:

Name Type Description Default
query dict

fields to query on

required
fields List[str]

List of fields to return if any

None
**kwargs dict

DB implementation specific fields

{}

Returns:

Type Description
List[dict]

List[dict]: List of dict reps of found items.

Source code in lume_services/services/results/db.py
47
48
49
50
51
52
53
54
55
56
57
58
59
@abstractmethod
def find(self, *, query: dict, fields: List[str] = None, **kwargs) -> List[dict]:
    """Find a document based on a query.

    Args:
        query (dict): fields to query on
        fields (List[str]): List of fields to return if any
        **kwargs (dict): DB implementation specific fields

    Returns:
        List[dict]: List of dict reps of found items.

    """

find_all(**kwargs) abstractmethod

Find all documents for a collection

Returns:

Type Description
List[dict]

List[dict]: List of result items represented as dict.

Source code in lume_services/services/results/db.py
61
62
63
64
65
66
67
@abstractmethod
def find_all(self, **kwargs) -> List[dict]:
    """Find all documents for a collection

    Returns:
        List[dict]: List of result items represented as dict.
    """

insert_many(items, **kwargs) abstractmethod

Insert many documents into the database.

Parameters:

Name Type Description Default
items List[dict]

List of dictionary representations of items

required

Returns:

Type Description
List[str]

List[str]: List of interted ids

Source code in lume_services/services/results/db.py
35
36
37
38
39
40
41
42
43
44
45
@abstractmethod
def insert_many(self, items: List[dict], **kwargs) -> List[str]:
    """Insert many documents into the database.

    Args:
        items (List[dict]): List of dictionary representations of items

    Returns:
        List[str]: List of interted ids

    """

insert_one(item, **kwargs) abstractmethod

Insert document into the database.

Parameters:

Name Type Description Default
item dict

Dictionary representation of item

required

Returns:

Name Type Description
str str

Inserted item id

Source code in lume_services/services/results/db.py
23
24
25
26
27
28
29
30
31
32
33
@abstractmethod
def insert_one(self, item: dict, **kwargs) -> str:
    """Insert document into the database.

    Args:
        item (dict): Dictionary representation of item

    Returns:
        str: Inserted item id

    """

MongodbResultsDB

Bases: ResultsDB

Source code in lume_services/services/results/mongodb.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class MongodbResultsDB(ResultsDB):
    # Note: pymongo is threadsafe

    def __init__(self, db_config: MongodbResultsDBConfig):
        self.config = db_config

        # track pid to make multiprocessing safe
        self._pid = os.getpid()
        self._client = ContextVar("client", default=None)
        self._collections = ContextVar("collections", default={})

    def _connect(self) -> MongoClient:
        """Establish connection and set _client."""

        client = MongoClient(
            **self.config.dict(exclude_none=True, by_alias=True),
            password=self.config.password.get_secret_value(),
            **self.config.options
        )

        self._client.set(client)
        db = client[self.config.database]

        collections = {}

        for collection_name in db.list_collection_names():
            collection = db[collection_name]
            index_info = collection.index_information()
            collections[collection_name] = MongodbCollection(
                database=self.config.database, name=collection_name, indices=index_info
            )

        self._collections.set(collections)

        return client

    def _check_mp(self) -> None:
        """Check for multiprocessing. If PID is different that object PID, create new
        engine connection.

        """

        if os.getpid() != self._pid:
            self._connect()

    @property
    def _currect_connection(self) -> MongoClient:
        """Getter for current connection"""

        return self._client.get()

    def _disconnect(self):
        """Disconnect mongodb connection."""
        client = self._client.get()
        if client is not None:
            client.close()
        self._client.set(None)
        self._collections.set(None)

    def disconnect(self):
        """Disconnect mongodb connection."""
        self._disconnect()

    @contextmanager
    def client(self) -> MongoClient:
        """Context manager for mongoclient. Will check for multiprocessing and restart
        accordingly.

        """
        self._check_mp()

        # get connection
        client = self._client.get()

        if client is None:
            client = self._connect()
            cleanup = True

        else:
            cleanup = False

        try:
            yield client

        finally:
            if cleanup:
                client = self._client.get()

                if client:
                    self._disconnect()
                    self._client.set(None)

    def insert_one(self, collection: str, **kwargs) -> str:
        """Insert one document into the database.

        Args:
            collection (str): Name of collection for saving document
            **kwargs: Kwargs contain representation of document

        Returns:
            str: saved document id

        """
        # conver to bson
        with self.client() as client:
            db = client[self.config.database]
            db_collection = db[collection]
            inserted_id = db_collection.insert_one(kwargs).inserted_id

            if inserted_id is None:
                raise ValueError(
                    "Unable to insert database object %s into collection %s",
                    kwargs,
                    collection,
                )

        return inserted_id

    def insert_many(self, collection: str, items: List[dict]) -> List[str]:
        """Insert many documents into the database.

        Args:
            collection (str): Document type to query
            items (List[dict]): List of dictionary reps of documents to save to database

        Returns:
            List[str]: List of saved document ids.


        """
        # make items bsonable
        with self.client() as client:
            db = client[self.config.database]
            db_collection = db[collection]
            inserted_ids = db_collection.insert_many(items).inserted_ids

        return [inserted_id.str for inserted_id in inserted_ids]

    def find(
        self, collection: str, query: dict = None, fields: List[str] = None
    ) -> List[dict]:
        """Find a document based on a query.

        Args:
            collection (str): Document type to query
            query (dict): Query in dictionary form mapping fields to values
            fields (List[str]): List of fields for filtering result

        Returns:
            List[dict]: List of of saved document ids.

        """

        with self.client() as client:
            db = client[self.config.database]
            if fields is None:
                results = db[collection].find(query)

            else:
                results = db[collection].find(query, projection=fields)

            results = list(results)

        # convert types to python

        return results

    def find_all(self, collection: str) -> List[dict]:
        """Find all documents for a collection

        Args:
            collection (str): Collection name to query

        Returns:
            List[dict]: List of result documents.

        """
        return self.find(collection=collection)

    def configure(self, collections: Dict[str, List[str]]) -> None:
        """Configure the results database from collections and their indices.

        Args:
            collections (Dict[str, List[str]]): Dictionary mapping collection to
                index rep.

        """

        collection_indices = {}

        with self.client() as client:
            db = client[self.config.database]

            for collection_name, index in collections.items():

                formatted_index = [(idx, DESCENDING) for idx in index]
                db[collection_name].create_index(formatted_index, unique=True)

                index_info = db[collection_name].index_information()

                collection_indices[collection_name] = MongodbCollection(
                    database=self.config.database,
                    name=collection_name,
                    indices=index_info,
                )

        self._collections.set(collection_indices)

client()

Context manager for mongoclient. Will check for multiprocessing and restart accordingly.

Source code in lume_services/services/results/mongodb.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@contextmanager
def client(self) -> MongoClient:
    """Context manager for mongoclient. Will check for multiprocessing and restart
    accordingly.

    """
    self._check_mp()

    # get connection
    client = self._client.get()

    if client is None:
        client = self._connect()
        cleanup = True

    else:
        cleanup = False

    try:
        yield client

    finally:
        if cleanup:
            client = self._client.get()

            if client:
                self._disconnect()
                self._client.set(None)

configure(collections)

Configure the results database from collections and their indices.

Parameters:

Name Type Description Default
collections Dict[str, List[str]]

Dictionary mapping collection to index rep.

required
Source code in lume_services/services/results/mongodb.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def configure(self, collections: Dict[str, List[str]]) -> None:
    """Configure the results database from collections and their indices.

    Args:
        collections (Dict[str, List[str]]): Dictionary mapping collection to
            index rep.

    """

    collection_indices = {}

    with self.client() as client:
        db = client[self.config.database]

        for collection_name, index in collections.items():

            formatted_index = [(idx, DESCENDING) for idx in index]
            db[collection_name].create_index(formatted_index, unique=True)

            index_info = db[collection_name].index_information()

            collection_indices[collection_name] = MongodbCollection(
                database=self.config.database,
                name=collection_name,
                indices=index_info,
            )

    self._collections.set(collection_indices)

disconnect()

Disconnect mongodb connection.

Source code in lume_services/services/results/mongodb.py
119
120
121
def disconnect(self):
    """Disconnect mongodb connection."""
    self._disconnect()

find(collection, query=None, fields=None)

Find a document based on a query.

Parameters:

Name Type Description Default
collection str

Document type to query

required
query dict

Query in dictionary form mapping fields to values

None
fields List[str]

List of fields for filtering result

None

Returns:

Type Description
List[dict]

List[dict]: List of of saved document ids.

Source code in lume_services/services/results/mongodb.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def find(
    self, collection: str, query: dict = None, fields: List[str] = None
) -> List[dict]:
    """Find a document based on a query.

    Args:
        collection (str): Document type to query
        query (dict): Query in dictionary form mapping fields to values
        fields (List[str]): List of fields for filtering result

    Returns:
        List[dict]: List of of saved document ids.

    """

    with self.client() as client:
        db = client[self.config.database]
        if fields is None:
            results = db[collection].find(query)

        else:
            results = db[collection].find(query, projection=fields)

        results = list(results)

    # convert types to python

    return results

find_all(collection)

Find all documents for a collection

Parameters:

Name Type Description Default
collection str

Collection name to query

required

Returns:

Type Description
List[dict]

List[dict]: List of result documents.

Source code in lume_services/services/results/mongodb.py
227
228
229
230
231
232
233
234
235
236
237
def find_all(self, collection: str) -> List[dict]:
    """Find all documents for a collection

    Args:
        collection (str): Collection name to query

    Returns:
        List[dict]: List of result documents.

    """
    return self.find(collection=collection)

insert_many(collection, items)

Insert many documents into the database.

Parameters:

Name Type Description Default
collection str

Document type to query

required
items List[dict]

List of dictionary reps of documents to save to database

required

Returns:

Type Description
List[str]

List[str]: List of saved document ids.

Source code in lume_services/services/results/mongodb.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def insert_many(self, collection: str, items: List[dict]) -> List[str]:
    """Insert many documents into the database.

    Args:
        collection (str): Document type to query
        items (List[dict]): List of dictionary reps of documents to save to database

    Returns:
        List[str]: List of saved document ids.


    """
    # make items bsonable
    with self.client() as client:
        db = client[self.config.database]
        db_collection = db[collection]
        inserted_ids = db_collection.insert_many(items).inserted_ids

    return [inserted_id.str for inserted_id in inserted_ids]

insert_one(collection, **kwargs)

Insert one document into the database.

Parameters:

Name Type Description Default
collection str

Name of collection for saving document

required
**kwargs

Kwargs contain representation of document

{}

Returns:

Name Type Description
str str

saved document id

Source code in lume_services/services/results/mongodb.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def insert_one(self, collection: str, **kwargs) -> str:
    """Insert one document into the database.

    Args:
        collection (str): Name of collection for saving document
        **kwargs: Kwargs contain representation of document

    Returns:
        str: saved document id

    """
    # conver to bson
    with self.client() as client:
        db = client[self.config.database]
        db_collection = db[collection]
        inserted_id = db_collection.insert_one(kwargs).inserted_id

        if inserted_id is None:
            raise ValueError(
                "Unable to insert database object %s into collection %s",
                kwargs,
                collection,
            )

    return inserted_id

MongodbResultsDBConfig

Bases: ResultsDBConfig

Configuration for connecting to Mongodb using the PyMongo driver.

Attr

database (Optional[str]): Database name used for storing results. host (str): Host name of mongodb service. username (str): Username string. password (SecretStr): Password stored as a Pydantic secret string. https://pydantic-docs.helpmanual.io/usage/types/#secret-types port (int): Host port of mongodb service endpoint. authMechanism (str): Auth mechanism supported by PyMongo driver. See https://pymongo.readthedocs.io/en/stable/api/pymongo/database.html#pymongo.auth.MECHANISMS. options (dict): Dictionary of additional connection options for MongoClient. https://pymongo.readthedocs.io/en/stable/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient

Source code in lume_services/services/results/mongodb.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class MongodbResultsDBConfig(ResultsDBConfig):
    """Configuration for connecting to Mongodb using the PyMongo driver.

    Attr:
        database (Optional[str]): Database name used for storing results.
        host (str): Host name of mongodb service.
        username (str): Username string.
        password (SecretStr): Password stored as a Pydantic secret string. https://pydantic-docs.helpmanual.io/usage/types/#secret-types
        port (int): Host port of mongodb service endpoint.
        authMechanism (str): Auth mechanism supported by PyMongo driver. See https://pymongo.readthedocs.io/en/stable/api/pymongo/database.html#pymongo.auth.MECHANISMS.
        options (dict): Dictionary of additional connection options for MongoClient. https://pymongo.readthedocs.io/en/stable/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient

    """  # noqa

    # excluded in serialization bc not used to initialize cxn
    database: Optional[str] = Field(exclude=True)
    username: str
    host: str
    password: SecretStr = Field(exclude=True)
    port: int
    authMechanism: str = "DEFAULT"
    # Pydantic literal parsing from env has issue with literals...
    # Literal["DEFAULT", 'GSSAPI', 'MONGODB-AWS', 'MONGODB-CR', 'MONGODB-X509',
    # 'PLAIN', 'SCRAM-SHA-1', 'SCRAM-SHA-256'] = "DEFAULT"
    options: dict = Field({}, exclude=True)

    class Config:
        allow_population_by_field_name = True