Skip to content

Tasks

configure_lume_services()

Configure LUME-services using environment variables. This task must be included in any workflow using common database services.

Source code in lume_services/tasks/configure.py
24
25
26
27
28
29
30
31
@task(name="configure_lume_services")
def configure_lume_services():
    """Configure LUME-services using environment variables. This task must be included
    in any workflow using common database services.

    """
    logger.debug("Configuring environment using %s", json.dumps(dict(os.environ)))
    config.configure()

prepare_lume_model_variables(value_map, variables)

Utility task for translating parameter values to LUME-model variables.

Parameters:

Name Type Description Default
value_map Dict[str, Any]

Dictionary mapping variable name to value.

required
variables Dict[str, Variable]

Dictionary mapping variable name to LUME-model variable object.

required

Returns:

Name Type Description
variables Dict[str, Variable]

Formatted LUME-model variables.

Raises:

Type Description
ValueError

Variable name passed to value_map is not found in model variables.

Source code in lume_services/tasks/configure.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@task(name="prepare_lume_model_variables")
def prepare_lume_model_variables(
    value_map: Dict[str, Any], variables: Dict[str, Variable]
) -> Dict[str, Variable]:
    """Utility task for translating parameter values to LUME-model variables.

    Args:
        value_map (Dict[str, Any]): Dictionary mapping variable name to value.
        variables (Dict[str, Variable]): Dictionary mapping variable name to LUME-model
            variable object.

    Returns:
        variables (Dict[str, Variable]): Formatted LUME-model variables.

    Raises:
        ValueError: Variable name passed to value_map is not found in model variables.

    """
    # Use deepcopy because don't want to transform global vars
    variables = copy.deepcopy(variables)

    for var_name in value_map:
        if var_name not in variables:
            raise ValueError(
                "Variable name passed to value_map %s not found in model variables. \
                Model variables are %s",
                var_name,
                ",".join(list(variables.keys())),
            )
        variables[var_name].value = value_map[var_name]

    missing_values = [
        var_name for var_name in variables.keys() if var_name not in value_map
    ]
    if len(missing_values):
        logger.warning(
            "No value provided for: %s. Will assign variable default to value.",
            ", ".join(missing_values),
        )
    for var_name in missing_values:
        variables[var_name].value = variables[var_name].default

    return variables

LoadDBResult

Bases: Task

Load a result from the results database. All database results generate a minimally representative identifier that can be used to query the database and load the result. This idenifier is jsonable and therefore accessable outside of the workflow's scope. This task uses the identifier to load the database query and performs data selection via the attribute_index parameters.

The attribute_index parameter provides selection instructions from query. For example, selecting the first toyota from a dictionary of form: {"outputs": {"vehicle": {"car": ["toyota", "mini"], "boat": ["sail", "motor"]}}}would be accomplished by passing attribute_index=["outputs", "vehicle", "car", 0].

This task is defined as a subclass of the Prefect Task object and accepts all Task arguments during initialization.

Examples:

from prefect import Flow, task
from lume_services.tasks import configure_lume_services, LoadDBResult


load_db_result_task = LoadDBResult(timeout=20)

@task
def print_result(value):
    print(value)


with Flow(
    "my_flow"
) as flow:
    # must first configure services because using injected results
    # database service
    configure_lume_services()

    # assume some other flow has saved a Result object to the database with the
    # resulting unique representation (Result.unique_rep):
    unique_rep = {
        "result_type_string": "lume_services.results.generic:Result",
        "query": {
               "inputs": {
                   "input1": 1.0,
                   "input2": 2.0
                },
                "outputs": {
                    "output1": 1.0,
                    "output2": 2.0
                },
               "flow_id": 1
        }
    }

    my_result = load_db_result_task(
        result_rep = unique_rep,
        attribute_index=["outputs", "output1"]
    )

    print_result(my_result) # Will print 1.0
Source code in lume_services/tasks/db.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class LoadDBResult(Task):
    """Load a result from the results database. All database results generate a
    [minimally representative identifier][lume_services.results.generic.Result] that can
    be used to query the database and load the result. This idenifier is jsonable and
    therefore accessable outside of the workflow's scope. This task uses the identifier
    to load the database query and performs data selection via the `attribute_index`
    parameters.

    The `attribute_index` parameter provides selection instructions from query. For
    example, selecting the first `toyota` from a dictionary of form: `{"outputs":
    {"vehicle": {"car":  ["toyota", "mini"], "boat": ["sail", "motor"]}}}`would be
    accomplished by passing `attribute_index=["outputs", "vehicle", "car", 0]`.

    This task is defined as a subclass of the Prefect [Task](https://docs-v1.prefect.io/api/latest/core/task.html#task-2)
    object and accepts all Task arguments during initialization.

    Examples:
        ```python
        from prefect import Flow, task
        from lume_services.tasks import configure_lume_services, LoadDBResult


        load_db_result_task = LoadDBResult(timeout=20)

        @task
        def print_result(value):
            print(value)


        with Flow(
            "my_flow"
        ) as flow:
            # must first configure services because using injected results
            # database service
            configure_lume_services()

            # assume some other flow has saved a Result object to the database with the
            # resulting unique representation (Result.unique_rep):
            unique_rep = {
                "result_type_string": "lume_services.results.generic:Result",
                "query": {
                       "inputs": {
                           "input1": 1.0,
                           "input2": 2.0
                        },
                        "outputs": {
                            "output1": 1.0,
                            "output2": 2.0
                        },
                       "flow_id": 1
                }
            }

            my_result = load_db_result_task(
                result_rep = unique_rep,
                attribute_index=["outputs", "output1"]
            )

            print_result(my_result) # Will print 1.0
        ```

    """  # noqa

    parameters = {
        "attribute_index": Parameter("attribute_index"),
        "result_rep": Parameter("result_rep"),
    }

    def __init__(self, **kwargs):
        """This task is defined as a subclass of the Prefect [Task](https://docs-v1.prefect.io/api/latest/core/task.html#task-2)
        object and accepts all Task arguments during initialization.

        Args:
            name (Optional[str]): The name of this task.
            slug (Optional[str]):  The slug for this task. Slugs provide a stable ID
                for tasks so that the Prefect API can identify task run states. If a
                slug is not provided, one will be generated automatically once the
                task is added to a Flow.
            tags (Optional[List[str]]): A list of tags for this task.
            max_retries (Optional[int]): The maximum amount of times this task can be
                retried
            retry_delay (Optional[datetime.timedelta]): The amount of time to wait
                until task is retried
            retry_on (Optional[Union[Exception, Iterable[Type[Exception]]]]): Exception
                types that will allow retry behavior to occur. If not set, all
                exceptions will allow retries. If set, retries will only occur if the
                exception is a subtype of the exception types provided.
            timeout (Optional[Union[int, timedelta]]): The amount of time (in seconds)
                to wait while running this task before a timeout occurs; note that
                sub-second resolution is not supported, even when passing in a
                timedelta.
            trigger (Optional[callable]):  a function that determines whether the task
                should run, based on the states of any upstream tasks.
            skip_on_upstream_skip (Optional[bool]): if True, if any immediately upstream
                tasks are skipped, this task will automatically be skipped as well,
                regardless of trigger. By default, this prevents tasks from attempting
                to use either state or data from tasks that didn't run. If False, the
                task's trigger will be called as normal, with skips considered
                successes. Defaults to True.
            cache_for (Optional[timedelta]): The amount of time to maintain a cache
            of the outputs of this task.  Useful for situations where the containing
            Flow will be rerun multiple times, but this task doesn't need to be.
            cache_validator (Optional[Callable]): Validator that will determine
                whether the cache for this task is still valid (only required if
                `cache_for` is provided; defaults to
                `prefect.engine.cache_validators.duration_only`)
            cache_key (Optional[str]): if provided, a `cache_key`
                serves as a unique identifier for this Task's cache, and can be shared
                across both Tasks _and_ Flows; if not provided, the Task's _name_ will
                be used if running locally, or the Task's database ID if running in
                Cloud
            checkpoint (Optional[bool]): if this Task is successful, whether to
                store its result using the configured result available during the run;
                Also note that checkpointing will only occur locally if
                `prefect.config.flows.checkpointing` is set to `True`
            result (Optional[Result]): the result instance used to retrieve and
                store task results during execution
            target (Optional[Union[str, Callable]]): location to check for task Result.
                If a result exists at that location then the task run will enter a
                cached state. `target` strings can be templated formatting strings
                which will be formatted at runtime with values from `prefect.context`.
                If a callable function is provided, it should have signature
                `callable(**kwargs) -> str` and at write time all formatting kwargs
                will be passed and a fully formatted location is expected as the return
                value. The callable can be used for string formatting logic that
                `.format(**kwargs)` doesn't support.
            state_handlers (Optional[Iterable[Callable]]): A list of state change
                handlers that will be called whenever the task changes state,
                providing an opportunity to inspect or modify the new state. The
                handler will be passed the task instance, the old (prior) state,
                and the new
                (current) state, with the following signature:
                    `state_handler(task: Task, old_state: State, new_state: State) ->
                    Optional[State]`
                If multiple functions are passed, then the `new_state` argument will
                be the result of the previous handler.
            on_failure (Optional[Callable]): A function with signature
                `fn(task: Task, state: State) -> None` that will be called anytime this
                Task enters a failure state
            log_stdout (Optional[bool]): Toggle whether or not to send stdout messages
                to the Prefect logger. Defaults to `False`.
            task_run_name (Optional[Union[str, Callable]]): a name to set for this task
                at runtime. `task_run_name` strings can be templated formatting strings
                which will be formatted at runtime with values from task arguments,
                `prefect.context`, and flow parameters (in the case of a name conflict
                between these, earlier values take precedence). If a callable function
                is provided, it should have signature `callable(**kwargs) -> str` and
                at write time all formatting kwargs will be passed and a fully
                formatted location is expected as the return value. The callable can
                be used for string formatting logic that `.format(**kwargs)` doesn't
                support. **Note**: this only works for tasks running against a
                backend API.
            nout (Optional[int]): for tasks that return multiple results, the number of
                outputs to expect. If not provided, will be inferred from the task
                return annotation, if possible.  Note that `nout=1` implies the task
                returns a tuple of one value (leave as `None` for non-tuple return
                types).

        """  # noqa

        # apply some defaults but allow overrides
        log_stdout = kwargs.get("log_stdout")
        if not kwargs.get("log_stdout"):
            log_stdout = True
        else:
            log_stdout = kwargs.pop("log_stdout")

        if not kwargs.get("name"):
            name = "load_db_result"
        else:
            name = kwargs.pop("name")

        super().__init__(log_stdout=log_stdout, name=name, **kwargs)

    def run(
        self,
        result_rep: dict,
        attribute_index: Optional[list],
        results_db_service: ResultsDB = Provide[Context.results_db_service],
    ) -> Any:
        """Load a result from the database using a lume_services.Result represention.

        Args:
            result_rep (Union[dict, str]): Result representation containing
                result_type_string and query for selection. If string passed,
                will perform json loads to get dictionary.
            attribute_index (Optional[list]): Selection instructions from query.
                For example, selecting the first `toyota` from a dictionary of form:
                `{"vehicle": {"car":  ["toyota", "mini"], "boat": ["sail", "motor"]}}`
                would be accomplished by passing `attribute_index=["car", 0].
            results_db_service (ResultsDB): Results database service. This is injected
                when using the LUME-service configuration toolset.

        Returns:
            Any: Returns selection of value from result if attibute_index is passed,
                otherwise returns Result object.

        """
        result_type = get_result_from_string(result_rep["result_type_string"])
        result = result_type.load_from_query(
            result_rep["project_name"],
            result_rep["query"],
            results_db_service=results_db_service,
        )

        # select first attribute
        attr_value = getattr(result, attribute_index[0], None)
        if attr_value is None:
            return result

        else:
            for index in attribute_index[1:]:
                attr_value = attr_value[index]

            return attr_value

__init__(**kwargs)

This task is defined as a subclass of the Prefect Task object and accepts all Task arguments during initialization.

Parameters:

Name Type Description Default
name Optional[str]

The name of this task.

required
slug Optional[str]

The slug for this task. Slugs provide a stable ID for tasks so that the Prefect API can identify task run states. If a slug is not provided, one will be generated automatically once the task is added to a Flow.

required
tags Optional[List[str]]

A list of tags for this task.

required
max_retries Optional[int]

The maximum amount of times this task can be retried

required
retry_delay Optional[timedelta]

The amount of time to wait until task is retried

required
retry_on Optional[Union[Exception, Iterable[Type[Exception]]]]

Exception types that will allow retry behavior to occur. If not set, all exceptions will allow retries. If set, retries will only occur if the exception is a subtype of the exception types provided.

required
timeout Optional[Union[int, timedelta]]

The amount of time (in seconds) to wait while running this task before a timeout occurs; note that sub-second resolution is not supported, even when passing in a timedelta.

required
trigger Optional[callable]

a function that determines whether the task should run, based on the states of any upstream tasks.

required
skip_on_upstream_skip Optional[bool]

if True, if any immediately upstream tasks are skipped, this task will automatically be skipped as well, regardless of trigger. By default, this prevents tasks from attempting to use either state or data from tasks that didn't run. If False, the task's trigger will be called as normal, with skips considered successes. Defaults to True.

required
cache_for Optional[timedelta]

The amount of time to maintain a cache

required
cache_validator Optional[Callable]

Validator that will determine whether the cache for this task is still valid (only required if cache_for is provided; defaults to prefect.engine.cache_validators.duration_only)

required
cache_key Optional[str]

if provided, a cache_key serves as a unique identifier for this Task's cache, and can be shared across both Tasks and Flows; if not provided, the Task's name will be used if running locally, or the Task's database ID if running in Cloud

required
checkpoint Optional[bool]

if this Task is successful, whether to store its result using the configured result available during the run; Also note that checkpointing will only occur locally if prefect.config.flows.checkpointing is set to True

required
result Optional[Result]

the result instance used to retrieve and store task results during execution

required
target Optional[Union[str, Callable]]

location to check for task Result. If a result exists at that location then the task run will enter a cached state. target strings can be templated formatting strings which will be formatted at runtime with values from prefect.context. If a callable function is provided, it should have signature callable(**kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(**kwargs) doesn't support.

required
state_handlers Optional[Iterable[Callable]]

A list of state change handlers that will be called whenever the task changes state, providing an opportunity to inspect or modify the new state. The handler will be passed the task instance, the old (prior) state, and the new (current) state, with the following signature: state_handler(task: Task, old_state: State, new_state: State) -> Optional[State] If multiple functions are passed, then the new_state argument will be the result of the previous handler.

required
on_failure Optional[Callable]

A function with signature fn(task: Task, state: State) -> None that will be called anytime this Task enters a failure state

required
log_stdout Optional[bool]

Toggle whether or not to send stdout messages to the Prefect logger. Defaults to False.

required
task_run_name Optional[Union[str, Callable]]

a name to set for this task at runtime. task_run_name strings can be templated formatting strings which will be formatted at runtime with values from task arguments, prefect.context, and flow parameters (in the case of a name conflict between these, earlier values take precedence). If a callable function is provided, it should have signature callable(**kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(**kwargs) doesn't support. Note: this only works for tasks running against a backend API.

required
nout Optional[int]

for tasks that return multiple results, the number of outputs to expect. If not provided, will be inferred from the task return annotation, if possible. Note that nout=1 implies the task returns a tuple of one value (leave as None for non-tuple return types).

required
Source code in lume_services/tasks/db.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def __init__(self, **kwargs):
    """This task is defined as a subclass of the Prefect [Task](https://docs-v1.prefect.io/api/latest/core/task.html#task-2)
    object and accepts all Task arguments during initialization.

    Args:
        name (Optional[str]): The name of this task.
        slug (Optional[str]):  The slug for this task. Slugs provide a stable ID
            for tasks so that the Prefect API can identify task run states. If a
            slug is not provided, one will be generated automatically once the
            task is added to a Flow.
        tags (Optional[List[str]]): A list of tags for this task.
        max_retries (Optional[int]): The maximum amount of times this task can be
            retried
        retry_delay (Optional[datetime.timedelta]): The amount of time to wait
            until task is retried
        retry_on (Optional[Union[Exception, Iterable[Type[Exception]]]]): Exception
            types that will allow retry behavior to occur. If not set, all
            exceptions will allow retries. If set, retries will only occur if the
            exception is a subtype of the exception types provided.
        timeout (Optional[Union[int, timedelta]]): The amount of time (in seconds)
            to wait while running this task before a timeout occurs; note that
            sub-second resolution is not supported, even when passing in a
            timedelta.
        trigger (Optional[callable]):  a function that determines whether the task
            should run, based on the states of any upstream tasks.
        skip_on_upstream_skip (Optional[bool]): if True, if any immediately upstream
            tasks are skipped, this task will automatically be skipped as well,
            regardless of trigger. By default, this prevents tasks from attempting
            to use either state or data from tasks that didn't run. If False, the
            task's trigger will be called as normal, with skips considered
            successes. Defaults to True.
        cache_for (Optional[timedelta]): The amount of time to maintain a cache
        of the outputs of this task.  Useful for situations where the containing
        Flow will be rerun multiple times, but this task doesn't need to be.
        cache_validator (Optional[Callable]): Validator that will determine
            whether the cache for this task is still valid (only required if
            `cache_for` is provided; defaults to
            `prefect.engine.cache_validators.duration_only`)
        cache_key (Optional[str]): if provided, a `cache_key`
            serves as a unique identifier for this Task's cache, and can be shared
            across both Tasks _and_ Flows; if not provided, the Task's _name_ will
            be used if running locally, or the Task's database ID if running in
            Cloud
        checkpoint (Optional[bool]): if this Task is successful, whether to
            store its result using the configured result available during the run;
            Also note that checkpointing will only occur locally if
            `prefect.config.flows.checkpointing` is set to `True`
        result (Optional[Result]): the result instance used to retrieve and
            store task results during execution
        target (Optional[Union[str, Callable]]): location to check for task Result.
            If a result exists at that location then the task run will enter a
            cached state. `target` strings can be templated formatting strings
            which will be formatted at runtime with values from `prefect.context`.
            If a callable function is provided, it should have signature
            `callable(**kwargs) -> str` and at write time all formatting kwargs
            will be passed and a fully formatted location is expected as the return
            value. The callable can be used for string formatting logic that
            `.format(**kwargs)` doesn't support.
        state_handlers (Optional[Iterable[Callable]]): A list of state change
            handlers that will be called whenever the task changes state,
            providing an opportunity to inspect or modify the new state. The
            handler will be passed the task instance, the old (prior) state,
            and the new
            (current) state, with the following signature:
                `state_handler(task: Task, old_state: State, new_state: State) ->
                Optional[State]`
            If multiple functions are passed, then the `new_state` argument will
            be the result of the previous handler.
        on_failure (Optional[Callable]): A function with signature
            `fn(task: Task, state: State) -> None` that will be called anytime this
            Task enters a failure state
        log_stdout (Optional[bool]): Toggle whether or not to send stdout messages
            to the Prefect logger. Defaults to `False`.
        task_run_name (Optional[Union[str, Callable]]): a name to set for this task
            at runtime. `task_run_name` strings can be templated formatting strings
            which will be formatted at runtime with values from task arguments,
            `prefect.context`, and flow parameters (in the case of a name conflict
            between these, earlier values take precedence). If a callable function
            is provided, it should have signature `callable(**kwargs) -> str` and
            at write time all formatting kwargs will be passed and a fully
            formatted location is expected as the return value. The callable can
            be used for string formatting logic that `.format(**kwargs)` doesn't
            support. **Note**: this only works for tasks running against a
            backend API.
        nout (Optional[int]): for tasks that return multiple results, the number of
            outputs to expect. If not provided, will be inferred from the task
            return annotation, if possible.  Note that `nout=1` implies the task
            returns a tuple of one value (leave as `None` for non-tuple return
            types).

    """  # noqa

    # apply some defaults but allow overrides
    log_stdout = kwargs.get("log_stdout")
    if not kwargs.get("log_stdout"):
        log_stdout = True
    else:
        log_stdout = kwargs.pop("log_stdout")

    if not kwargs.get("name"):
        name = "load_db_result"
    else:
        name = kwargs.pop("name")

    super().__init__(log_stdout=log_stdout, name=name, **kwargs)

run(result_rep, attribute_index, results_db_service=Provide[Context.results_db_service])

Load a result from the database using a lume_services.Result represention.

Parameters:

Name Type Description Default
result_rep Union[dict, str]

Result representation containing result_type_string and query for selection. If string passed, will perform json loads to get dictionary.

required
attribute_index Optional[list]

Selection instructions from query. For example, selecting the first toyota from a dictionary of form: {"vehicle": {"car": ["toyota", "mini"], "boat": ["sail", "motor"]}} would be accomplished by passing `attribute_index=["car", 0].

required
results_db_service ResultsDB

Results database service. This is injected when using the LUME-service configuration toolset.

Provide[results_db_service]

Returns:

Name Type Description
Any Any

Returns selection of value from result if attibute_index is passed, otherwise returns Result object.

Source code in lume_services/tasks/db.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def run(
    self,
    result_rep: dict,
    attribute_index: Optional[list],
    results_db_service: ResultsDB = Provide[Context.results_db_service],
) -> Any:
    """Load a result from the database using a lume_services.Result represention.

    Args:
        result_rep (Union[dict, str]): Result representation containing
            result_type_string and query for selection. If string passed,
            will perform json loads to get dictionary.
        attribute_index (Optional[list]): Selection instructions from query.
            For example, selecting the first `toyota` from a dictionary of form:
            `{"vehicle": {"car":  ["toyota", "mini"], "boat": ["sail", "motor"]}}`
            would be accomplished by passing `attribute_index=["car", 0].
        results_db_service (ResultsDB): Results database service. This is injected
            when using the LUME-service configuration toolset.

    Returns:
        Any: Returns selection of value from result if attibute_index is passed,
            otherwise returns Result object.

    """
    result_type = get_result_from_string(result_rep["result_type_string"])
    result = result_type.load_from_query(
        result_rep["project_name"],
        result_rep["query"],
        results_db_service=results_db_service,
    )

    # select first attribute
    attr_value = getattr(result, attribute_index[0], None)
    if attr_value is None:
        return result

    else:
        for index in attribute_index[1:]:
            attr_value = attr_value[index]

        return attr_value

SaveDBResult

Bases: Task

Save a result from the results database. All database results generate a minimally representative identifier that can be used to query the database and load the result. This idenifier is jsonable and therefore accessable outside of the workflow's scope. This task uses either a passed or injected results database service to save the unique representation of the result to the database. Custom result sublasses may impose additional uniqueness constraints. In order to use this task with the backend, your flow must be registered with the backend as the result's flow_id is inferred from the Prefect Context. Alternatively, for development purposes, flow_id can be passed directly.

This task is defined as a subclass of the Prefect Task object and accepts all Task arguments during initialization.

Examples:

from prefect import Flow, task
from lume_services.results import Result
from lume_services.tasks import configure_lume_services, SaveDBResult

# construct_task
save_db_result_task = SaveDBResult(timeout=20)

@task
def format_result_entry():
    inputs = {
        "input1": 1.0,
        "input2": 2.0,
    }

    outputs = {
        "output1" : 1.0,
        "output2": 2.0
    }

    return Result(
        inputs=inputs,
        outputs=outputs,
        flow_id="local-test"
    )

with Flow(
    "my_flow"
) as flow:
    # must first configure services because using injected results
    # database service
    configure_lume_services()

    result = format_result_entry()

    my_result = save_db_result_task(
        result
    )
Source code in lume_services/tasks/db.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class SaveDBResult(Task):
    """Save a result from the results database. All database results generate a
    [minimally representative identifier][lume_services.results.generic.Result] that can
    be used to query the database and load the result. This idenifier is jsonable and
    therefore accessable outside of the workflow's scope. This task uses either a
    passed or injected results database service to save the unique representation
    of the result to the database. Custom result sublasses may impose additional
    uniqueness constraints. In order to use this task with the backend, your
    flow must be registered with the backend as the result's `flow_id` is inferred from
    the Prefect Context. Alternatively, for development purposes, `flow_id` can be
    passed directly.

    This task is defined as a subclass of the Prefect [Task](https://docs-v1.prefect.io/api/latest/core/task.html#task-2)
    object and accepts all Task arguments during initialization.

    Examples:
        ```python
        from prefect import Flow, task
        from lume_services.results import Result
        from lume_services.tasks import configure_lume_services, SaveDBResult

        # construct_task
        save_db_result_task = SaveDBResult(timeout=20)

        @task
        def format_result_entry():
            inputs = {
                "input1": 1.0,
                "input2": 2.0,
            }

            outputs = {
                "output1" : 1.0,
                "output2": 2.0
            }

            return Result(
                inputs=inputs,
                outputs=outputs,
                flow_id="local-test"
            )

        with Flow(
            "my_flow"
        ) as flow:
            # must first configure services because using injected results
            # database service
            configure_lume_services()

            result = format_result_entry()

            my_result = save_db_result_task(
                result
            )

        ```

    """  # noqa

    def __init__(self, **kwargs):
        """This task is defined as a subclass of the Prefect [Task](https://docs-v1.prefect.io/api/latest/core/task.html#task-2)
        object and accepts all Task arguments during initialization.

        Args:
            name (Optional[str]): The name of this task.
            slug (Optional[str]):  The slug for this task. Slugs provide a stable ID
                for tasks so that the Prefect API can identify task run states. If a
                slug is not provided, one will be generated automatically once the
                task is added to a Flow.
            tags (Optional[List[str]]): A list of tags for this task.
            max_retries (Optional[int]): The maximum amount of times this task can be
                retried
            retry_delay (Optional[datetime.timedelta]): The amount of time to wait
                until task is retried
            retry_on (Optional[Union[Exception, Iterable[Type[Exception]]]]): Exception
                types that will allow retry behavior to occur. If not set, all
                exceptions will allow retries. If set, retries will only occur if the
                exception is a subtype of the exception types provided.
            timeout (Optional[Union[int, timedelta]]): The amount of time (in seconds)
                to wait while running this task before a timeout occurs; note that
                sub-second resolution is not supported, even when passing in a
                timedelta.
            trigger (Optional[callable]):  a function that determines whether the task
                should run, based on the states of any upstream tasks.
            skip_on_upstream_skip (Optional[bool]): if True, if any immediately upstream
                tasks are skipped, this task will automatically be skipped as well,
                regardless of trigger. By default, this prevents tasks from attempting
                to use either state or data from tasks that didn't run. If False, the
                task's trigger will be called as normal, with skips considered
                successes. Defaults to True.
            cache_for (Optional[timedelta]): The amount of time to maintain a cache
            of the outputs of this task.  Useful for situations where the containing
            Flow will be rerun multiple times, but this task doesn't need to be.
            cache_validator (Optional[Callable]): Validator that will determine
                whether the cache for this task is still valid (only required if
                `cache_for` is provided; defaults to
                `prefect.engine.cache_validators.duration_only`)
            cache_key (Optional[str]): if provided, a `cache_key`
                serves as a unique identifier for this Task's cache, and can be shared
                across both Tasks _and_ Flows; if not provided, the Task's _name_ will
                be used if running locally, or the Task's database ID if running in
                Cloud
            checkpoint (Optional[bool]): if this Task is successful, whether to
                store its result using the configured result available during the run;
                Also note that checkpointing will only occur locally if
                `prefect.config.flows.checkpointing` is set to `True`
            result (Optional[Result]): the result instance used to retrieve and
                store task results during execution
            target (Optional[Union[str, Callable]]): location to check for task Result.
                If a result exists at that location then the task run will enter a
                cached state. `target` strings can be templated formatting strings
                which will be formatted at runtime with values from `prefect.context`.
                If a callable function is provided, it should have signature
                `callable(**kwargs) -> str` and at write time all formatting kwargs
                will be passed and a fully formatted location is expected as the return
                value. The callable can be used for string formatting logic that
                `.format(**kwargs)` doesn't support.
            state_handlers (Optional[Iterable[Callable]]): A list of state change
                handlers that will be called whenever the task changes state,
                providing an opportunity to inspect or modify the new state. The
                handler will be passed the task instance, the old (prior) state,
                and the new
                (current) state, with the following signature:
                    `state_handler(task: Task, old_state: State, new_state: State) ->
                    Optional[State]`
                If multiple functions are passed, then the `new_state` argument will
                be the result of the previous handler.
            on_failure (Optional[Callable]): A function with signature
                `fn(task: Task, state: State) -> None` that will be called anytime this
                Task enters a failure state
            log_stdout (Optional[bool]): Toggle whether or not to send stdout messages
                to the Prefect logger. Defaults to `False`.
            task_run_name (Optional[Union[str, Callable]]): a name to set for this task
                at runtime. `task_run_name` strings can be templated formatting strings
                which will be formatted at runtime with values from task arguments,
                `prefect.context`, and flow parameters (in the case of a name conflict
                between these, earlier values take precedence). If a callable function
                is provided, it should have signature `callable(**kwargs) -> str` and
                at write time all formatting kwargs will be passed and a fully
                formatted location is expected as the return value. The callable can
                be used for string formatting logic that `.format(**kwargs)` doesn't
                support. **Note**: this only works for tasks running against a
                backend API.
            nout (Optional[int]): for tasks that return multiple results, the number of
                outputs to expect. If not provided, will be inferred from the task
                return annotation, if possible.  Note that `nout=1` implies the task
                returns a tuple of one value (leave as `None` for non-tuple return
                types).

        """  # noqa

        # apply some defaults but allow overrides
        log_stdout = kwargs.get("log_stdout")
        if not kwargs.get("log_stdout"):
            log_stdout = True
        else:
            log_stdout = kwargs.pop("log_stdout")

        if not kwargs.get("name"):
            name = "save_db_result"
        else:
            name = kwargs.pop("name")

        if not kwargs.get("result"):
            result = PrefectResult(location=_unique_db_location)
        else:
            result = kwargs.pop("result")

        super().__init__(log_stdout=log_stdout, name=name, result=result, **kwargs)

    @inject
    def run(
        self,
        result,
        results_db_service: ResultsDB = Provide[Context.results_db_service],
    ) -> dict:
        """Insert result into the results database service. Creates a PrefectResult that
        contains minimal representative information for reconstruction.

        Args:
            result (Result): Result object to save
            results_db_service (ResultsDB): Results database service


        Returns:
            dict: Unique representation for collecting results.

        """

        result.insert(results_db_service=results_db_service)
        return result.unique_rep()

__init__(**kwargs)

This task is defined as a subclass of the Prefect Task object and accepts all Task arguments during initialization.

Parameters:

Name Type Description Default
name Optional[str]

The name of this task.

required
slug Optional[str]

The slug for this task. Slugs provide a stable ID for tasks so that the Prefect API can identify task run states. If a slug is not provided, one will be generated automatically once the task is added to a Flow.

required
tags Optional[List[str]]

A list of tags for this task.

required
max_retries Optional[int]

The maximum amount of times this task can be retried

required
retry_delay Optional[timedelta]

The amount of time to wait until task is retried

required
retry_on Optional[Union[Exception, Iterable[Type[Exception]]]]

Exception types that will allow retry behavior to occur. If not set, all exceptions will allow retries. If set, retries will only occur if the exception is a subtype of the exception types provided.

required
timeout Optional[Union[int, timedelta]]

The amount of time (in seconds) to wait while running this task before a timeout occurs; note that sub-second resolution is not supported, even when passing in a timedelta.

required
trigger Optional[callable]

a function that determines whether the task should run, based on the states of any upstream tasks.

required
skip_on_upstream_skip Optional[bool]

if True, if any immediately upstream tasks are skipped, this task will automatically be skipped as well, regardless of trigger. By default, this prevents tasks from attempting to use either state or data from tasks that didn't run. If False, the task's trigger will be called as normal, with skips considered successes. Defaults to True.

required
cache_for Optional[timedelta]

The amount of time to maintain a cache

required
cache_validator Optional[Callable]

Validator that will determine whether the cache for this task is still valid (only required if cache_for is provided; defaults to prefect.engine.cache_validators.duration_only)

required
cache_key Optional[str]

if provided, a cache_key serves as a unique identifier for this Task's cache, and can be shared across both Tasks and Flows; if not provided, the Task's name will be used if running locally, or the Task's database ID if running in Cloud

required
checkpoint Optional[bool]

if this Task is successful, whether to store its result using the configured result available during the run; Also note that checkpointing will only occur locally if prefect.config.flows.checkpointing is set to True

required
result Optional[Result]

the result instance used to retrieve and store task results during execution

required
target Optional[Union[str, Callable]]

location to check for task Result. If a result exists at that location then the task run will enter a cached state. target strings can be templated formatting strings which will be formatted at runtime with values from prefect.context. If a callable function is provided, it should have signature callable(**kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(**kwargs) doesn't support.

required
state_handlers Optional[Iterable[Callable]]

A list of state change handlers that will be called whenever the task changes state, providing an opportunity to inspect or modify the new state. The handler will be passed the task instance, the old (prior) state, and the new (current) state, with the following signature: state_handler(task: Task, old_state: State, new_state: State) -> Optional[State] If multiple functions are passed, then the new_state argument will be the result of the previous handler.

required
on_failure Optional[Callable]

A function with signature fn(task: Task, state: State) -> None that will be called anytime this Task enters a failure state

required
log_stdout Optional[bool]

Toggle whether or not to send stdout messages to the Prefect logger. Defaults to False.

required
task_run_name Optional[Union[str, Callable]]

a name to set for this task at runtime. task_run_name strings can be templated formatting strings which will be formatted at runtime with values from task arguments, prefect.context, and flow parameters (in the case of a name conflict between these, earlier values take precedence). If a callable function is provided, it should have signature callable(**kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(**kwargs) doesn't support. Note: this only works for tasks running against a backend API.

required
nout Optional[int]

for tasks that return multiple results, the number of outputs to expect. If not provided, will be inferred from the task return annotation, if possible. Note that nout=1 implies the task returns a tuple of one value (leave as None for non-tuple return types).

required
Source code in lume_services/tasks/db.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def __init__(self, **kwargs):
    """This task is defined as a subclass of the Prefect [Task](https://docs-v1.prefect.io/api/latest/core/task.html#task-2)
    object and accepts all Task arguments during initialization.

    Args:
        name (Optional[str]): The name of this task.
        slug (Optional[str]):  The slug for this task. Slugs provide a stable ID
            for tasks so that the Prefect API can identify task run states. If a
            slug is not provided, one will be generated automatically once the
            task is added to a Flow.
        tags (Optional[List[str]]): A list of tags for this task.
        max_retries (Optional[int]): The maximum amount of times this task can be
            retried
        retry_delay (Optional[datetime.timedelta]): The amount of time to wait
            until task is retried
        retry_on (Optional[Union[Exception, Iterable[Type[Exception]]]]): Exception
            types that will allow retry behavior to occur. If not set, all
            exceptions will allow retries. If set, retries will only occur if the
            exception is a subtype of the exception types provided.
        timeout (Optional[Union[int, timedelta]]): The amount of time (in seconds)
            to wait while running this task before a timeout occurs; note that
            sub-second resolution is not supported, even when passing in a
            timedelta.
        trigger (Optional[callable]):  a function that determines whether the task
            should run, based on the states of any upstream tasks.
        skip_on_upstream_skip (Optional[bool]): if True, if any immediately upstream
            tasks are skipped, this task will automatically be skipped as well,
            regardless of trigger. By default, this prevents tasks from attempting
            to use either state or data from tasks that didn't run. If False, the
            task's trigger will be called as normal, with skips considered
            successes. Defaults to True.
        cache_for (Optional[timedelta]): The amount of time to maintain a cache
        of the outputs of this task.  Useful for situations where the containing
        Flow will be rerun multiple times, but this task doesn't need to be.
        cache_validator (Optional[Callable]): Validator that will determine
            whether the cache for this task is still valid (only required if
            `cache_for` is provided; defaults to
            `prefect.engine.cache_validators.duration_only`)
        cache_key (Optional[str]): if provided, a `cache_key`
            serves as a unique identifier for this Task's cache, and can be shared
            across both Tasks _and_ Flows; if not provided, the Task's _name_ will
            be used if running locally, or the Task's database ID if running in
            Cloud
        checkpoint (Optional[bool]): if this Task is successful, whether to
            store its result using the configured result available during the run;
            Also note that checkpointing will only occur locally if
            `prefect.config.flows.checkpointing` is set to `True`
        result (Optional[Result]): the result instance used to retrieve and
            store task results during execution
        target (Optional[Union[str, Callable]]): location to check for task Result.
            If a result exists at that location then the task run will enter a
            cached state. `target` strings can be templated formatting strings
            which will be formatted at runtime with values from `prefect.context`.
            If a callable function is provided, it should have signature
            `callable(**kwargs) -> str` and at write time all formatting kwargs
            will be passed and a fully formatted location is expected as the return
            value. The callable can be used for string formatting logic that
            `.format(**kwargs)` doesn't support.
        state_handlers (Optional[Iterable[Callable]]): A list of state change
            handlers that will be called whenever the task changes state,
            providing an opportunity to inspect or modify the new state. The
            handler will be passed the task instance, the old (prior) state,
            and the new
            (current) state, with the following signature:
                `state_handler(task: Task, old_state: State, new_state: State) ->
                Optional[State]`
            If multiple functions are passed, then the `new_state` argument will
            be the result of the previous handler.
        on_failure (Optional[Callable]): A function with signature
            `fn(task: Task, state: State) -> None` that will be called anytime this
            Task enters a failure state
        log_stdout (Optional[bool]): Toggle whether or not to send stdout messages
            to the Prefect logger. Defaults to `False`.
        task_run_name (Optional[Union[str, Callable]]): a name to set for this task
            at runtime. `task_run_name` strings can be templated formatting strings
            which will be formatted at runtime with values from task arguments,
            `prefect.context`, and flow parameters (in the case of a name conflict
            between these, earlier values take precedence). If a callable function
            is provided, it should have signature `callable(**kwargs) -> str` and
            at write time all formatting kwargs will be passed and a fully
            formatted location is expected as the return value. The callable can
            be used for string formatting logic that `.format(**kwargs)` doesn't
            support. **Note**: this only works for tasks running against a
            backend API.
        nout (Optional[int]): for tasks that return multiple results, the number of
            outputs to expect. If not provided, will be inferred from the task
            return annotation, if possible.  Note that `nout=1` implies the task
            returns a tuple of one value (leave as `None` for non-tuple return
            types).

    """  # noqa

    # apply some defaults but allow overrides
    log_stdout = kwargs.get("log_stdout")
    if not kwargs.get("log_stdout"):
        log_stdout = True
    else:
        log_stdout = kwargs.pop("log_stdout")

    if not kwargs.get("name"):
        name = "save_db_result"
    else:
        name = kwargs.pop("name")

    if not kwargs.get("result"):
        result = PrefectResult(location=_unique_db_location)
    else:
        result = kwargs.pop("result")

    super().__init__(log_stdout=log_stdout, name=name, result=result, **kwargs)

run(result, results_db_service=Provide[Context.results_db_service])

Insert result into the results database service. Creates a PrefectResult that contains minimal representative information for reconstruction.

Parameters:

Name Type Description Default
result Result

Result object to save

required
results_db_service ResultsDB

Results database service

Provide[results_db_service]

Returns:

Name Type Description
dict dict

Unique representation for collecting results.

Source code in lume_services/tasks/db.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@inject
def run(
    self,
    result,
    results_db_service: ResultsDB = Provide[Context.results_db_service],
) -> dict:
    """Insert result into the results database service. Creates a PrefectResult that
    contains minimal representative information for reconstruction.

    Args:
        result (Result): Result object to save
        results_db_service (ResultsDB): Results database service


    Returns:
        dict: Unique representation for collecting results.

    """

    result.insert(results_db_service=results_db_service)
    return result.unique_rep()

LoadFile

Bases: Task

Source code in lume_services/tasks/file.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
class LoadFile(Task):
    parameters = {
        "file_rep": Parameter("file_rep"),
    }

    def __init__(self, **kwargs):
        """This task is defined as a subclass of the Prefect [Task](https://docs-v1.prefect.io/api/latest/core/task.html#task-2)
        object and accepts all Task arguments during initialization.

        Args:
            name (Optional[str]): The name of this task.
            slug (Optional[str]):  The slug for this task. Slugs provide a stable ID
                for tasks so that the Prefect API can identify task run states. If a
                slug is not provided, one will be generated automatically once the
                task is added to a Flow.
            tags (Optional[List[str]]): A list of tags for this task.
            max_retries (Optional[int]): The maximum amount of times this task can be
                retried
            retry_delay (Optional[datetime.timedelta]): The amount of time to wait
                until task is retried
            retry_on (Optional[Union[Exception, Iterable[Type[Exception]]]]): Exception
                types that will allow retry behavior to occur. If not set, all
                exceptions will allow retries. If set, retries will only occur if the
                exception is a subtype of the exception types provided.
            timeout (Optional[Union[int, timedelta]]): The amount of time (in seconds)
                to wait while running this task before a timeout occurs; note that
                sub-second resolution is not supported, even when passing in a
                timedelta.
            trigger (Optional[callable]):  a function that determines whether the task
                should run, based on the states of any upstream tasks.
            skip_on_upstream_skip (Optional[bool]): if True, if any immediately upstream
                tasks are skipped, this task will automatically be skipped as well,
                regardless of trigger. By default, this prevents tasks from attempting
                to use either state or data from tasks that didn't run. If False, the
                task's trigger will be called as normal, with skips considered
                successes. Defaults to True.
            cache_for (Optional[timedelta]): The amount of time to maintain a cache
            of the outputs of this task.  Useful for situations where the containing
            Flow will be rerun multiple times, but this task doesn't need to be.
            cache_validator (Optional[Callable]): Validator that will determine
                whether the cache for this task is still valid (only required if
                `cache_for` is provided; defaults to
                `prefect.engine.cache_validators.duration_only`)
            cache_key (Optional[str]): if provided, a `cache_key`
                serves as a unique identifier for this Task's cache, and can be shared
                across both Tasks _and_ Flows; if not provided, the Task's _name_ will
                be used if running locally, or the Task's database ID if running in
                Cloud
            checkpoint (Optional[bool]): if this Task is successful, whether to
                store its result using the configured result available during the run;
                Also note that checkpointing will only occur locally if
                `prefect.config.flows.checkpointing` is set to `True`
            result (Optional[Result]): the result instance used to retrieve and
                store task results during execution
            target (Optional[Union[str, Callable]]): location to check for task Result.
                If a result exists at that location then the task run will enter a
                cached state. `target` strings can be templated formatting strings
                which will be formatted at runtime with values from `prefect.context`.
                If a callable function is provided, it should have signature
                `callable(**kwargs) -> str` and at write time all formatting kwargs
                will be passed and a fully formatted location is expected as the return
                value. The callable can be used for string formatting logic that
                `.format(**kwargs)` doesn't support.
            state_handlers (Optional[Iterable[Callable]]): A list of state change
                handlers that will be called whenever the task changes state,
                providing an opportunity to inspect or modify the new state. The
                handler will be passed the task instance, the old (prior) state,
                and the new
                (current) state, with the following signature:
                    `state_handler(task: Task, old_state: State, new_state: State) ->
                    Optional[State]`
                If multiple functions are passed, then the `new_state` argument will
                be the result of the previous handler.
            on_failure (Optional[Callable]): A function with signature
                `fn(task: Task, state: State) -> None` that will be called anytime this
                Task enters a failure state
            log_stdout (Optional[bool]): Toggle whether or not to send stdout messages
                to the Prefect logger. Defaults to `False`.
            task_run_name (Optional[Union[str, Callable]]): a name to set for this task
                at runtime. `task_run_name` strings can be templated formatting strings
                which will be formatted at runtime with values from task arguments,
                `prefect.context`, and flow parameters (in the case of a name conflict
                between these, earlier values take precedence). If a callable function
                is provided, it should have signature `callable(**kwargs) -> str` and
                at write time all formatting kwargs will be passed and a fully
                formatted location is expected as the return value. The callable can
                be used for string formatting logic that `.format(**kwargs)` doesn't
                support. **Note**: this only works for tasks running against a
                backend API.
            nout (Optional[int]): for tasks that return multiple results, the number of
                outputs to expect. If not provided, will be inferred from the task
                return annotation, if possible.  Note that `nout=1` implies the task
                returns a tuple of one value (leave as `None` for non-tuple return
                types).

        """  # noqa

        # apply some defaults but allow overrides
        log_stdout = kwargs.get("log_stdout")
        if not kwargs.get("log_stdout"):
            log_stdout = True
        else:
            log_stdout = kwargs.pop("log_stdout")

        if not kwargs.get("name"):
            name = "load_file"
        else:
            name = kwargs.pop("name")

        super().__init__(log_stdout=log_stdout, name=name, **kwargs)

    @inject
    def run(
        self, file_rep: dict, file_service: FileService = Provide[Context.file_service]
    ) -> Any:
        """Load a file

        Args:
            file_rep (dict): File data representation
            file_service (FileService): File service for interacting w/ filesystems

        Returns:
            Any: Unserialize file object

        """

        file_type = get_file_from_serializer_string(file_rep["file_type_string"])
        file_result = file_type(**file_rep)

        return file_result.read(file_service=file_service)

__init__(**kwargs)

This task is defined as a subclass of the Prefect Task object and accepts all Task arguments during initialization.

Parameters:

Name Type Description Default
name Optional[str]

The name of this task.

required
slug Optional[str]

The slug for this task. Slugs provide a stable ID for tasks so that the Prefect API can identify task run states. If a slug is not provided, one will be generated automatically once the task is added to a Flow.

required
tags Optional[List[str]]

A list of tags for this task.

required
max_retries Optional[int]

The maximum amount of times this task can be retried

required
retry_delay Optional[timedelta]

The amount of time to wait until task is retried

required
retry_on Optional[Union[Exception, Iterable[Type[Exception]]]]

Exception types that will allow retry behavior to occur. If not set, all exceptions will allow retries. If set, retries will only occur if the exception is a subtype of the exception types provided.

required
timeout Optional[Union[int, timedelta]]

The amount of time (in seconds) to wait while running this task before a timeout occurs; note that sub-second resolution is not supported, even when passing in a timedelta.

required
trigger Optional[callable]

a function that determines whether the task should run, based on the states of any upstream tasks.

required
skip_on_upstream_skip Optional[bool]

if True, if any immediately upstream tasks are skipped, this task will automatically be skipped as well, regardless of trigger. By default, this prevents tasks from attempting to use either state or data from tasks that didn't run. If False, the task's trigger will be called as normal, with skips considered successes. Defaults to True.

required
cache_for Optional[timedelta]

The amount of time to maintain a cache

required
cache_validator Optional[Callable]

Validator that will determine whether the cache for this task is still valid (only required if cache_for is provided; defaults to prefect.engine.cache_validators.duration_only)

required
cache_key Optional[str]

if provided, a cache_key serves as a unique identifier for this Task's cache, and can be shared across both Tasks and Flows; if not provided, the Task's name will be used if running locally, or the Task's database ID if running in Cloud

required
checkpoint Optional[bool]

if this Task is successful, whether to store its result using the configured result available during the run; Also note that checkpointing will only occur locally if prefect.config.flows.checkpointing is set to True

required
result Optional[Result]

the result instance used to retrieve and store task results during execution

required
target Optional[Union[str, Callable]]

location to check for task Result. If a result exists at that location then the task run will enter a cached state. target strings can be templated formatting strings which will be formatted at runtime with values from prefect.context. If a callable function is provided, it should have signature callable(**kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(**kwargs) doesn't support.

required
state_handlers Optional[Iterable[Callable]]

A list of state change handlers that will be called whenever the task changes state, providing an opportunity to inspect or modify the new state. The handler will be passed the task instance, the old (prior) state, and the new (current) state, with the following signature: state_handler(task: Task, old_state: State, new_state: State) -> Optional[State] If multiple functions are passed, then the new_state argument will be the result of the previous handler.

required
on_failure Optional[Callable]

A function with signature fn(task: Task, state: State) -> None that will be called anytime this Task enters a failure state

required
log_stdout Optional[bool]

Toggle whether or not to send stdout messages to the Prefect logger. Defaults to False.

required
task_run_name Optional[Union[str, Callable]]

a name to set for this task at runtime. task_run_name strings can be templated formatting strings which will be formatted at runtime with values from task arguments, prefect.context, and flow parameters (in the case of a name conflict between these, earlier values take precedence). If a callable function is provided, it should have signature callable(**kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(**kwargs) doesn't support. Note: this only works for tasks running against a backend API.

required
nout Optional[int]

for tasks that return multiple results, the number of outputs to expect. If not provided, will be inferred from the task return annotation, if possible. Note that nout=1 implies the task returns a tuple of one value (leave as None for non-tuple return types).

required
Source code in lume_services/tasks/file.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def __init__(self, **kwargs):
    """This task is defined as a subclass of the Prefect [Task](https://docs-v1.prefect.io/api/latest/core/task.html#task-2)
    object and accepts all Task arguments during initialization.

    Args:
        name (Optional[str]): The name of this task.
        slug (Optional[str]):  The slug for this task. Slugs provide a stable ID
            for tasks so that the Prefect API can identify task run states. If a
            slug is not provided, one will be generated automatically once the
            task is added to a Flow.
        tags (Optional[List[str]]): A list of tags for this task.
        max_retries (Optional[int]): The maximum amount of times this task can be
            retried
        retry_delay (Optional[datetime.timedelta]): The amount of time to wait
            until task is retried
        retry_on (Optional[Union[Exception, Iterable[Type[Exception]]]]): Exception
            types that will allow retry behavior to occur. If not set, all
            exceptions will allow retries. If set, retries will only occur if the
            exception is a subtype of the exception types provided.
        timeout (Optional[Union[int, timedelta]]): The amount of time (in seconds)
            to wait while running this task before a timeout occurs; note that
            sub-second resolution is not supported, even when passing in a
            timedelta.
        trigger (Optional[callable]):  a function that determines whether the task
            should run, based on the states of any upstream tasks.
        skip_on_upstream_skip (Optional[bool]): if True, if any immediately upstream
            tasks are skipped, this task will automatically be skipped as well,
            regardless of trigger. By default, this prevents tasks from attempting
            to use either state or data from tasks that didn't run. If False, the
            task's trigger will be called as normal, with skips considered
            successes. Defaults to True.
        cache_for (Optional[timedelta]): The amount of time to maintain a cache
        of the outputs of this task.  Useful for situations where the containing
        Flow will be rerun multiple times, but this task doesn't need to be.
        cache_validator (Optional[Callable]): Validator that will determine
            whether the cache for this task is still valid (only required if
            `cache_for` is provided; defaults to
            `prefect.engine.cache_validators.duration_only`)
        cache_key (Optional[str]): if provided, a `cache_key`
            serves as a unique identifier for this Task's cache, and can be shared
            across both Tasks _and_ Flows; if not provided, the Task's _name_ will
            be used if running locally, or the Task's database ID if running in
            Cloud
        checkpoint (Optional[bool]): if this Task is successful, whether to
            store its result using the configured result available during the run;
            Also note that checkpointing will only occur locally if
            `prefect.config.flows.checkpointing` is set to `True`
        result (Optional[Result]): the result instance used to retrieve and
            store task results during execution
        target (Optional[Union[str, Callable]]): location to check for task Result.
            If a result exists at that location then the task run will enter a
            cached state. `target` strings can be templated formatting strings
            which will be formatted at runtime with values from `prefect.context`.
            If a callable function is provided, it should have signature
            `callable(**kwargs) -> str` and at write time all formatting kwargs
            will be passed and a fully formatted location is expected as the return
            value. The callable can be used for string formatting logic that
            `.format(**kwargs)` doesn't support.
        state_handlers (Optional[Iterable[Callable]]): A list of state change
            handlers that will be called whenever the task changes state,
            providing an opportunity to inspect or modify the new state. The
            handler will be passed the task instance, the old (prior) state,
            and the new
            (current) state, with the following signature:
                `state_handler(task: Task, old_state: State, new_state: State) ->
                Optional[State]`
            If multiple functions are passed, then the `new_state` argument will
            be the result of the previous handler.
        on_failure (Optional[Callable]): A function with signature
            `fn(task: Task, state: State) -> None` that will be called anytime this
            Task enters a failure state
        log_stdout (Optional[bool]): Toggle whether or not to send stdout messages
            to the Prefect logger. Defaults to `False`.
        task_run_name (Optional[Union[str, Callable]]): a name to set for this task
            at runtime. `task_run_name` strings can be templated formatting strings
            which will be formatted at runtime with values from task arguments,
            `prefect.context`, and flow parameters (in the case of a name conflict
            between these, earlier values take precedence). If a callable function
            is provided, it should have signature `callable(**kwargs) -> str` and
            at write time all formatting kwargs will be passed and a fully
            formatted location is expected as the return value. The callable can
            be used for string formatting logic that `.format(**kwargs)` doesn't
            support. **Note**: this only works for tasks running against a
            backend API.
        nout (Optional[int]): for tasks that return multiple results, the number of
            outputs to expect. If not provided, will be inferred from the task
            return annotation, if possible.  Note that `nout=1` implies the task
            returns a tuple of one value (leave as `None` for non-tuple return
            types).

    """  # noqa

    # apply some defaults but allow overrides
    log_stdout = kwargs.get("log_stdout")
    if not kwargs.get("log_stdout"):
        log_stdout = True
    else:
        log_stdout = kwargs.pop("log_stdout")

    if not kwargs.get("name"):
        name = "load_file"
    else:
        name = kwargs.pop("name")

    super().__init__(log_stdout=log_stdout, name=name, **kwargs)

run(file_rep, file_service=Provide[Context.file_service])

Load a file

Parameters:

Name Type Description Default
file_rep dict

File data representation

required
file_service FileService

File service for interacting w/ filesystems

Provide[file_service]

Returns:

Name Type Description
Any Any

Unserialize file object

Source code in lume_services/tasks/file.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
@inject
def run(
    self, file_rep: dict, file_service: FileService = Provide[Context.file_service]
) -> Any:
    """Load a file

    Args:
        file_rep (dict): File data representation
        file_service (FileService): File service for interacting w/ filesystems

    Returns:
        Any: Unserialize file object

    """

    file_type = get_file_from_serializer_string(file_rep["file_type_string"])
    file_result = file_type(**file_rep)

    return file_result.read(file_service=file_service)

SaveFile

Bases: Task

This task is used to save a workflow file to a filesystem for subsequent retrieval. File saving supports all File objects described in LUME-services. Additional file types can be created by subclassing the base in lume_services.files.file. The SaveFile task relies on a ... EXPOUND ON FILESYSTEM IDENTIFIER

Examples:

from prefect import Flow, task, Parameter
from lume_services.results import Result
from lume_services.tasks import configure_lume_services, SaveFile
from lume_services.files import TextFile

# construct task with prefect Task options
save_file_task = SaveFile(timeout=20)

@task
def concatenate_text(text1, text2):
    text = text1 + text2
    return TextFile(
                text,
                filesystem_identifier="local",
                filename="concatenated_text.txt"
            )


with Flow(
    "my_flow"
) as flow:
    # must first configure services because using injected results
    # database service
    configure_lume_services()

    text1 = Parameter("text1")
    text2 = Parameter("text2")

    my_file_obj = concatenate_text(text1, text2)

    file_parameters = save_file_task.parameters

    my_result = save_file_task(
        my_file_obj,
        filename = file_parameters["filename"],
        filesystem_identifier = file_parameters["filesystem_identifier"],
        file_type = TextFile # THIS MUST BE PASSED IN THE TASK CALL
    )
Source code in lume_services/tasks/file.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
class SaveFile(Task):
    """This task is used to save a workflow file to a filesystem for subsequent
    retrieval. File saving supports all File objects described in
    [LUME-services](https://slaclab.github.io/lume-services/api/files/files/).
    Additional file types can be created by subclassing the base in
    `lume_services.files.file`. The `SaveFile` task relies on a ...
    EXPOUND ON FILESYSTEM IDENTIFIER

    Examples:
        ```python
        from prefect import Flow, task, Parameter
        from lume_services.results import Result
        from lume_services.tasks import configure_lume_services, SaveFile
        from lume_services.files import TextFile

        # construct task with prefect Task options
        save_file_task = SaveFile(timeout=20)

        @task
        def concatenate_text(text1, text2):
            text = text1 + text2
            return TextFile(
                        text,
                        filesystem_identifier="local",
                        filename="concatenated_text.txt"
                    )


        with Flow(
            "my_flow"
        ) as flow:
            # must first configure services because using injected results
            # database service
            configure_lume_services()

            text1 = Parameter("text1")
            text2 = Parameter("text2")

            my_file_obj = concatenate_text(text1, text2)

            file_parameters = save_file_task.parameters

            my_result = save_file_task(
                my_file_obj,
                filename = file_parameters["filename"],
                filesystem_identifier = file_parameters["filesystem_identifier"],
                file_type = TextFile # THIS MUST BE PASSED IN THE TASK CALL
            )

        ```

    """

    parameters = {
        "filename": Parameter("filename"),
        "filesystem_identifier": Parameter("filesystem_identifier"),
    }

    def __init__(self, **kwargs):
        """This task is defined as a subclass of the Prefect [Task](https://docs-v1.prefect.io/api/latest/core/task.html#task-2)
        object and accepts all Task arguments during initialization.

        Args:
            name (Optional[str]): The name of this task.
            slug (Optional[str]):  The slug for this task. Slugs provide a stable ID
                for tasks so that the Prefect API can identify task run states. If a
                slug is not provided, one will be generated automatically once the
                task is added to a Flow.
            tags (Optional[List[str]]): A list of tags for this task.
            max_retries (Optional[int]): The maximum amount of times this task can be
                retried
            retry_delay (Optional[datetime.timedelta]): The amount of time to wait
                until task is retried
            retry_on (Optional[Union[Exception, Iterable[Type[Exception]]]]): Exception
                types that will allow retry behavior to occur. If not set, all
                exceptions will allow retries. If set, retries will only occur if the
                exception is a subtype of the exception types provided.
            timeout (Optional[Union[int, timedelta]]): The amount of time (in seconds)
                to wait while running this task before a timeout occurs; note that
                sub-second resolution is not supported, even when passing in a
                timedelta.
            trigger (Optional[callable]):  a function that determines whether the task
                should run, based on the states of any upstream tasks.
            skip_on_upstream_skip (Optional[bool]): if True, if any immediately upstream
                tasks are skipped, this task will automatically be skipped as well,
                regardless of trigger. By default, this prevents tasks from attempting
                to use either state or data from tasks that didn't run. If False, the
                task's trigger will be called as normal, with skips considered
                successes. Defaults to True.
            cache_for (Optional[timedelta]): The amount of time to maintain a cache
            of the outputs of this task.  Useful for situations where the containing
            Flow will be rerun multiple times, but this task doesn't need to be.
            cache_validator (Optional[Callable]): Validator that will determine
                whether the cache for this task is still valid (only required if
                `cache_for` is provided; defaults to
                `prefect.engine.cache_validators.duration_only`)
            cache_key (Optional[str]): if provided, a `cache_key`
                serves as a unique identifier for this Task's cache, and can be shared
                across both Tasks _and_ Flows; if not provided, the Task's _name_ will
                be used if running locally, or the Task's database ID if running in
                Cloud
            checkpoint (Optional[bool]): if this Task is successful, whether to
                store its result using the configured result available during the run;
                Also note that checkpointing will only occur locally if
                `prefect.config.flows.checkpointing` is set to `True`
            result (Optional[Result]): the result instance used to retrieve and
                store task results during execution
            target (Optional[Union[str, Callable]]): location to check for task Result.
                If a result exists at that location then the task run will enter a
                cached state. `target` strings can be templated formatting strings
                which will be formatted at runtime with values from `prefect.context`.
                If a callable function is provided, it should have signature
                `callable(**kwargs) -> str` and at write time all formatting kwargs
                will be passed and a fully formatted location is expected as the return
                value. The callable can be used for string formatting logic that
                `.format(**kwargs)` doesn't support.
            state_handlers (Optional[Iterable[Callable]]): A list of state change
                handlers that will be called whenever the task changes state,
                providing an opportunity to inspect or modify the new state. The
                handler will be passed the task instance, the old (prior) state,
                and the new
                (current) state, with the following signature:
                    `state_handler(task: Task, old_state: State, new_state: State) ->
                    Optional[State]`
                If multiple functions are passed, then the `new_state` argument will
                be the result of the previous handler.
            on_failure (Optional[Callable]): A function with signature
                `fn(task: Task, state: State) -> None` that will be called anytime this
                Task enters a failure state
            log_stdout (Optional[bool]): Toggle whether or not to send stdout messages
                to the Prefect logger. Defaults to `False`.
            task_run_name (Optional[Union[str, Callable]]): a name to set for this task
                at runtime. `task_run_name` strings can be templated formatting strings
                which will be formatted at runtime with values from task arguments,
                `prefect.context`, and flow parameters (in the case of a name conflict
                between these, earlier values take precedence). If a callable function
                is provided, it should have signature `callable(**kwargs) -> str` and
                at write time all formatting kwargs will be passed and a fully
                formatted location is expected as the return value. The callable can
                be used for string formatting logic that `.format(**kwargs)` doesn't
                support. **Note**: this only works for tasks running against a
                backend API.
            nout (Optional[int]): for tasks that return multiple results, the number of
                outputs to expect. If not provided, will be inferred from the task
                return annotation, if possible.  Note that `nout=1` implies the task
                returns a tuple of one value (leave as `None` for non-tuple return
                types).

        """  # noqa

        # apply some defaults but allow overrides
        log_stdout = kwargs.get("log_stdout")
        if not kwargs.get("log_stdout"):
            log_stdout = True
        else:
            log_stdout = kwargs.pop("log_stdout")

        if not kwargs.get("name"):
            name = "save_file"
        else:
            name = kwargs.pop("name")

        if not kwargs.get("result"):
            result = PrefectResult(location=_unique_file_location)
        else:
            result = kwargs.pop("result")

        super().__init__(log_stdout=log_stdout, name=name, result=result, **kwargs)

    @inject
    def run(
        self,
        obj: Any,
        filename: str,
        filesystem_identifier: str,
        file_type: type,
        file_service: FileService = Provide[Context.file_service],
    ):
        """Save a file.

        Args:
            obj (Any): Object to be saved
            filename (str): File path to save
            filesystem_identifier (str): String identifier for filesystem configured
                with File Service
            file_type (type): Type of file to save as. This is not exposed as a
                task parameter and should be passed explicitely during task run call.
                See examples.
            file_service (FileService): File service for interacting w/ filesystems

        Returns:
            dict: Loaded file type

        """
        file = file_type(
            obj=obj, filesystem_identifier=filesystem_identifier, filename=filename
        )
        file.write(file_service=file_service)
        return file.jsonable_dict()

__init__(**kwargs)

This task is defined as a subclass of the Prefect Task object and accepts all Task arguments during initialization.

Parameters:

Name Type Description Default
name Optional[str]

The name of this task.

required
slug Optional[str]

The slug for this task. Slugs provide a stable ID for tasks so that the Prefect API can identify task run states. If a slug is not provided, one will be generated automatically once the task is added to a Flow.

required
tags Optional[List[str]]

A list of tags for this task.

required
max_retries Optional[int]

The maximum amount of times this task can be retried

required
retry_delay Optional[timedelta]

The amount of time to wait until task is retried

required
retry_on Optional[Union[Exception, Iterable[Type[Exception]]]]

Exception types that will allow retry behavior to occur. If not set, all exceptions will allow retries. If set, retries will only occur if the exception is a subtype of the exception types provided.

required
timeout Optional[Union[int, timedelta]]

The amount of time (in seconds) to wait while running this task before a timeout occurs; note that sub-second resolution is not supported, even when passing in a timedelta.

required
trigger Optional[callable]

a function that determines whether the task should run, based on the states of any upstream tasks.

required
skip_on_upstream_skip Optional[bool]

if True, if any immediately upstream tasks are skipped, this task will automatically be skipped as well, regardless of trigger. By default, this prevents tasks from attempting to use either state or data from tasks that didn't run. If False, the task's trigger will be called as normal, with skips considered successes. Defaults to True.

required
cache_for Optional[timedelta]

The amount of time to maintain a cache

required
cache_validator Optional[Callable]

Validator that will determine whether the cache for this task is still valid (only required if cache_for is provided; defaults to prefect.engine.cache_validators.duration_only)

required
cache_key Optional[str]

if provided, a cache_key serves as a unique identifier for this Task's cache, and can be shared across both Tasks and Flows; if not provided, the Task's name will be used if running locally, or the Task's database ID if running in Cloud

required
checkpoint Optional[bool]

if this Task is successful, whether to store its result using the configured result available during the run; Also note that checkpointing will only occur locally if prefect.config.flows.checkpointing is set to True

required
result Optional[Result]

the result instance used to retrieve and store task results during execution

required
target Optional[Union[str, Callable]]

location to check for task Result. If a result exists at that location then the task run will enter a cached state. target strings can be templated formatting strings which will be formatted at runtime with values from prefect.context. If a callable function is provided, it should have signature callable(**kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(**kwargs) doesn't support.

required
state_handlers Optional[Iterable[Callable]]

A list of state change handlers that will be called whenever the task changes state, providing an opportunity to inspect or modify the new state. The handler will be passed the task instance, the old (prior) state, and the new (current) state, with the following signature: state_handler(task: Task, old_state: State, new_state: State) -> Optional[State] If multiple functions are passed, then the new_state argument will be the result of the previous handler.

required
on_failure Optional[Callable]

A function with signature fn(task: Task, state: State) -> None that will be called anytime this Task enters a failure state

required
log_stdout Optional[bool]

Toggle whether or not to send stdout messages to the Prefect logger. Defaults to False.

required
task_run_name Optional[Union[str, Callable]]

a name to set for this task at runtime. task_run_name strings can be templated formatting strings which will be formatted at runtime with values from task arguments, prefect.context, and flow parameters (in the case of a name conflict between these, earlier values take precedence). If a callable function is provided, it should have signature callable(**kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(**kwargs) doesn't support. Note: this only works for tasks running against a backend API.

required
nout Optional[int]

for tasks that return multiple results, the number of outputs to expect. If not provided, will be inferred from the task return annotation, if possible. Note that nout=1 implies the task returns a tuple of one value (leave as None for non-tuple return types).

required
Source code in lume_services/tasks/file.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def __init__(self, **kwargs):
    """This task is defined as a subclass of the Prefect [Task](https://docs-v1.prefect.io/api/latest/core/task.html#task-2)
    object and accepts all Task arguments during initialization.

    Args:
        name (Optional[str]): The name of this task.
        slug (Optional[str]):  The slug for this task. Slugs provide a stable ID
            for tasks so that the Prefect API can identify task run states. If a
            slug is not provided, one will be generated automatically once the
            task is added to a Flow.
        tags (Optional[List[str]]): A list of tags for this task.
        max_retries (Optional[int]): The maximum amount of times this task can be
            retried
        retry_delay (Optional[datetime.timedelta]): The amount of time to wait
            until task is retried
        retry_on (Optional[Union[Exception, Iterable[Type[Exception]]]]): Exception
            types that will allow retry behavior to occur. If not set, all
            exceptions will allow retries. If set, retries will only occur if the
            exception is a subtype of the exception types provided.
        timeout (Optional[Union[int, timedelta]]): The amount of time (in seconds)
            to wait while running this task before a timeout occurs; note that
            sub-second resolution is not supported, even when passing in a
            timedelta.
        trigger (Optional[callable]):  a function that determines whether the task
            should run, based on the states of any upstream tasks.
        skip_on_upstream_skip (Optional[bool]): if True, if any immediately upstream
            tasks are skipped, this task will automatically be skipped as well,
            regardless of trigger. By default, this prevents tasks from attempting
            to use either state or data from tasks that didn't run. If False, the
            task's trigger will be called as normal, with skips considered
            successes. Defaults to True.
        cache_for (Optional[timedelta]): The amount of time to maintain a cache
        of the outputs of this task.  Useful for situations where the containing
        Flow will be rerun multiple times, but this task doesn't need to be.
        cache_validator (Optional[Callable]): Validator that will determine
            whether the cache for this task is still valid (only required if
            `cache_for` is provided; defaults to
            `prefect.engine.cache_validators.duration_only`)
        cache_key (Optional[str]): if provided, a `cache_key`
            serves as a unique identifier for this Task's cache, and can be shared
            across both Tasks _and_ Flows; if not provided, the Task's _name_ will
            be used if running locally, or the Task's database ID if running in
            Cloud
        checkpoint (Optional[bool]): if this Task is successful, whether to
            store its result using the configured result available during the run;
            Also note that checkpointing will only occur locally if
            `prefect.config.flows.checkpointing` is set to `True`
        result (Optional[Result]): the result instance used to retrieve and
            store task results during execution
        target (Optional[Union[str, Callable]]): location to check for task Result.
            If a result exists at that location then the task run will enter a
            cached state. `target` strings can be templated formatting strings
            which will be formatted at runtime with values from `prefect.context`.
            If a callable function is provided, it should have signature
            `callable(**kwargs) -> str` and at write time all formatting kwargs
            will be passed and a fully formatted location is expected as the return
            value. The callable can be used for string formatting logic that
            `.format(**kwargs)` doesn't support.
        state_handlers (Optional[Iterable[Callable]]): A list of state change
            handlers that will be called whenever the task changes state,
            providing an opportunity to inspect or modify the new state. The
            handler will be passed the task instance, the old (prior) state,
            and the new
            (current) state, with the following signature:
                `state_handler(task: Task, old_state: State, new_state: State) ->
                Optional[State]`
            If multiple functions are passed, then the `new_state` argument will
            be the result of the previous handler.
        on_failure (Optional[Callable]): A function with signature
            `fn(task: Task, state: State) -> None` that will be called anytime this
            Task enters a failure state
        log_stdout (Optional[bool]): Toggle whether or not to send stdout messages
            to the Prefect logger. Defaults to `False`.
        task_run_name (Optional[Union[str, Callable]]): a name to set for this task
            at runtime. `task_run_name` strings can be templated formatting strings
            which will be formatted at runtime with values from task arguments,
            `prefect.context`, and flow parameters (in the case of a name conflict
            between these, earlier values take precedence). If a callable function
            is provided, it should have signature `callable(**kwargs) -> str` and
            at write time all formatting kwargs will be passed and a fully
            formatted location is expected as the return value. The callable can
            be used for string formatting logic that `.format(**kwargs)` doesn't
            support. **Note**: this only works for tasks running against a
            backend API.
        nout (Optional[int]): for tasks that return multiple results, the number of
            outputs to expect. If not provided, will be inferred from the task
            return annotation, if possible.  Note that `nout=1` implies the task
            returns a tuple of one value (leave as `None` for non-tuple return
            types).

    """  # noqa

    # apply some defaults but allow overrides
    log_stdout = kwargs.get("log_stdout")
    if not kwargs.get("log_stdout"):
        log_stdout = True
    else:
        log_stdout = kwargs.pop("log_stdout")

    if not kwargs.get("name"):
        name = "save_file"
    else:
        name = kwargs.pop("name")

    if not kwargs.get("result"):
        result = PrefectResult(location=_unique_file_location)
    else:
        result = kwargs.pop("result")

    super().__init__(log_stdout=log_stdout, name=name, result=result, **kwargs)

run(obj, filename, filesystem_identifier, file_type, file_service=Provide[Context.file_service])

Save a file.

Parameters:

Name Type Description Default
obj Any

Object to be saved

required
filename str

File path to save

required
filesystem_identifier str

String identifier for filesystem configured with File Service

required
file_type type

Type of file to save as. This is not exposed as a task parameter and should be passed explicitely during task run call. See examples.

required
file_service FileService

File service for interacting w/ filesystems

Provide[file_service]

Returns:

Name Type Description
dict

Loaded file type

Source code in lume_services/tasks/file.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@inject
def run(
    self,
    obj: Any,
    filename: str,
    filesystem_identifier: str,
    file_type: type,
    file_service: FileService = Provide[Context.file_service],
):
    """Save a file.

    Args:
        obj (Any): Object to be saved
        filename (str): File path to save
        filesystem_identifier (str): String identifier for filesystem configured
            with File Service
        file_type (type): Type of file to save as. This is not exposed as a
            task parameter and should be passed explicitely during task run call.
            See examples.
        file_service (FileService): File service for interacting w/ filesystems

    Returns:
        dict: Loaded file type

    """
    file = file_type(
        obj=obj, filesystem_identifier=filesystem_identifier, filename=filename
    )
    file.write(file_service=file_service)
    return file.jsonable_dict()