Skip to content

Backends

Backend

Bases: BaseModel, ABC

Abstract base class for Prefect backends. Backends handle Prefect interactions including running of flows, result handling, and flow registration with server backends.

Source code in lume_services/services/scheduling/backends/backend.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class Backend(BaseModel, ABC):
    """Abstract base class for Prefect backends. Backends handle Prefect interactions
    including running of flows, result handling, and flow registration with server
    backends.

    """

    @abstractmethod
    def create_project(self, project_name: str) -> None:
        """Create a Prefect project. Backend implementations without server connecton
        should raise errors when this method is called.

        Args:
            project_name (str): Create a named Prefect project.

        """
        ...

    @abstractmethod
    def register_flow(
        self,
        flow: Flow,
        project_name: str,
        image: Optional[str],
    ) -> str:
        """Register a flow with Prefect. Backend implementations without server connecton
        should raise errors when this method is called.

        Args:
            flow (Flow): Prefect flow to register.
            project_name (str): Name of project to register flow to.
            image (str): Name of Docker image to run flow inside.

        Returns:
            str: ID of registered flow.

        """
        ...

    @abstractmethod
    def load_flow(self, flow_name: str, project_name: str) -> dict:
        """Load a Prefect flow object. Backend implementations without server connecton
        should raise errors when this method is called.

        Args:
            flow_name (str): Name of flow.
            project_name (str): Name of project flow is registered with.

        Returns:
            dict: Dictionary with keys "flow_id" and "flow"

        """
        ...

    @abstractmethod
    def run(
        self,
        parameters: Optional[Dict[str, Any]],
        run_config: Optional[RunConfig],
        **kwargs
    ) -> Union[str, None]:
        """Run a flow. Does not return result. Implementations should cover instantiation
        of run_config from kwargs as well as backend-specific kwargs.

        Args:
            parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
                name to value
            run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
            **kwargs: Keyword arguments for RunConfig init and backend-specific
                execution.

        Returns:
            Union[str, None]: Return run_id in case of server backend, None in the case
                of local execution.

        Raises:
            pydantic.ValidationError: Error validating run configuration.
            ValueError: Value error on flow run
        """
        ...

    @abstractmethod
    def run_and_return(
        self,
        parameters: Optional[Dict[str, Any]],
        run_config: Optional[RunConfig],
        task_name: Optional[str],
        **kwargs
    ) -> Any:
        """Run a flow and return result. Implementations should cover instantiation of
        run_config from kwargs as well as backend-specific kwargs.

        Args:
            parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
                name to value
            run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
            task_name (Optional[str]): Name of task to return result. If no task slug
                is passed, will return the flow result.
            **kwargs: Keyword arguments for RunConfig init and backend-specific
                execution.

        Returns:
            Any: Result of flow run.

        Raises:
            lume_services.errors.EmptyResultError: No result is associated with the
                flow.
            pydantic.ValidationError: Error validating run configuration.
            ValueError: Value error on flow run
        """
        ...

create_project(project_name) abstractmethod

Create a Prefect project. Backend implementations without server connecton should raise errors when this method is called.

Parameters:

Name Type Description Default
project_name str

Create a named Prefect project.

required
Source code in lume_services/services/scheduling/backends/backend.py
42
43
44
45
46
47
48
49
50
51
@abstractmethod
def create_project(self, project_name: str) -> None:
    """Create a Prefect project. Backend implementations without server connecton
    should raise errors when this method is called.

    Args:
        project_name (str): Create a named Prefect project.

    """
    ...

load_flow(flow_name, project_name) abstractmethod

Load a Prefect flow object. Backend implementations without server connecton should raise errors when this method is called.

Parameters:

Name Type Description Default
flow_name str

Name of flow.

required
project_name str

Name of project flow is registered with.

required

Returns:

Name Type Description
dict dict

Dictionary with keys "flow_id" and "flow"

Source code in lume_services/services/scheduling/backends/backend.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@abstractmethod
def load_flow(self, flow_name: str, project_name: str) -> dict:
    """Load a Prefect flow object. Backend implementations without server connecton
    should raise errors when this method is called.

    Args:
        flow_name (str): Name of flow.
        project_name (str): Name of project flow is registered with.

    Returns:
        dict: Dictionary with keys "flow_id" and "flow"

    """
    ...

register_flow(flow, project_name, image) abstractmethod

Register a flow with Prefect. Backend implementations without server connecton should raise errors when this method is called.

Parameters:

Name Type Description Default
flow Flow

Prefect flow to register.

required
project_name str

Name of project to register flow to.

required
image str

Name of Docker image to run flow inside.

required

Returns:

Name Type Description
str str

ID of registered flow.

Source code in lume_services/services/scheduling/backends/backend.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@abstractmethod
def register_flow(
    self,
    flow: Flow,
    project_name: str,
    image: Optional[str],
) -> str:
    """Register a flow with Prefect. Backend implementations without server connecton
    should raise errors when this method is called.

    Args:
        flow (Flow): Prefect flow to register.
        project_name (str): Name of project to register flow to.
        image (str): Name of Docker image to run flow inside.

    Returns:
        str: ID of registered flow.

    """
    ...

run(parameters, run_config, **kwargs) abstractmethod

Run a flow. Does not return result. Implementations should cover instantiation of run_config from kwargs as well as backend-specific kwargs.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value

required
run_config Optional[RunConfig]

RunConfig object to configure flow fun.

required
**kwargs

Keyword arguments for RunConfig init and backend-specific execution.

{}

Returns:

Type Description
Union[str, None]

Union[str, None]: Return run_id in case of server backend, None in the case of local execution.

Raises:

Type Description
ValidationError

Error validating run configuration.

ValueError

Value error on flow run

Source code in lume_services/services/scheduling/backends/backend.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@abstractmethod
def run(
    self,
    parameters: Optional[Dict[str, Any]],
    run_config: Optional[RunConfig],
    **kwargs
) -> Union[str, None]:
    """Run a flow. Does not return result. Implementations should cover instantiation
    of run_config from kwargs as well as backend-specific kwargs.

    Args:
        parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
            name to value
        run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
        **kwargs: Keyword arguments for RunConfig init and backend-specific
            execution.

    Returns:
        Union[str, None]: Return run_id in case of server backend, None in the case
            of local execution.

    Raises:
        pydantic.ValidationError: Error validating run configuration.
        ValueError: Value error on flow run
    """
    ...

run_and_return(parameters, run_config, task_name, **kwargs) abstractmethod

Run a flow and return result. Implementations should cover instantiation of run_config from kwargs as well as backend-specific kwargs.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value

required
run_config Optional[RunConfig]

RunConfig object to configure flow fun.

required
task_name Optional[str]

Name of task to return result. If no task slug is passed, will return the flow result.

required
**kwargs

Keyword arguments for RunConfig init and backend-specific execution.

{}

Returns:

Name Type Description
Any Any

Result of flow run.

Raises:

Type Description
EmptyResultError

No result is associated with the flow.

ValidationError

Error validating run configuration.

ValueError

Value error on flow run

Source code in lume_services/services/scheduling/backends/backend.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@abstractmethod
def run_and_return(
    self,
    parameters: Optional[Dict[str, Any]],
    run_config: Optional[RunConfig],
    task_name: Optional[str],
    **kwargs
) -> Any:
    """Run a flow and return result. Implementations should cover instantiation of
    run_config from kwargs as well as backend-specific kwargs.

    Args:
        parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
            name to value
        run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
        task_name (Optional[str]): Name of task to return result. If no task slug
            is passed, will return the flow result.
        **kwargs: Keyword arguments for RunConfig init and backend-specific
            execution.

    Returns:
        Any: Result of flow run.

    Raises:
        lume_services.errors.EmptyResultError: No result is associated with the
            flow.
        pydantic.ValidationError: Error validating run configuration.
        ValueError: Value error on flow run
    """
    ...

RunConfig

Bases: BaseModel, ABC

Pydantic representation of Prefect UniversalRunConfig: https://docs.prefect.io/api/latest/run_configs.html#universalrun

Attributes:

Name Type Description
labels Optional[List[str]]

an list of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work

env Optional[dict]

Additional environment variables to set on the job

Source code in lume_services/services/scheduling/backends/backend.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class RunConfig(BaseModel, ABC):
    """Pydantic representation of Prefect UniversalRunConfig:
    https://docs.prefect.io/api/latest/run_configs.html#universalrun


    Attributes:
        labels (Optional[List[str]]): an list of labels to apply to this run
            config. Labels are string identifiers used by Prefect Agents for selecting
            valid flow runs when polling for work
        env (Optional[dict]): Additional environment variables to set on the job

    """

    labels: List[str] = ["lume-services"]
    env: Optional[dict]

    @abstractmethod
    def build(self) -> PrefectRunConfig:
        """Method for converting object to Prefect RunConfig type.

        Returns:
            PrefectRunConfig

        """
        ...

build() abstractmethod

Method for converting object to Prefect RunConfig type.

Returns:

Type Description
RunConfig

PrefectRunConfig

Source code in lume_services/services/scheduling/backends/backend.py
24
25
26
27
28
29
30
31
32
@abstractmethod
def build(self) -> PrefectRunConfig:
    """Method for converting object to Prefect RunConfig type.

    Returns:
        PrefectRunConfig

    """
    ...

LocalBackend

Bases: Backend

Backend used for local execution. This backend will raise errors on any function calls requiring registration with the Prefect server.

Attributes:

Name Type Description
run_config Optional[LocalRunConfig]

Default configuration object for a given run.

Source code in lume_services/services/scheduling/backends/local.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
class LocalBackend(Backend):
    """Backend used for local execution. This backend will raise errors on any function
    calls requiring registration with the Prefect server.

    Attributes:
        run_config (Optional[LocalRunConfig]): Default configuration object for a given
            run.

    """

    def run(
        self,
        data: Dict[str, Any],
        run_config: LocalRunConfig = None,
        *,
        flow: Flow,
        **kwargs
    ) -> None:
        """Run flow execution. Does not return result.

        Args:
            labels (Optional[List[str]]): an list of labels to apply to this run
                config. Labels are string identifiers used by Prefect Agents for
                selecting valid flow runs when polling for work.
            env (Optional[dict]): Additional environment variables to set on the job
            data (Optional[Dict[str, Any]]): Dictionary mapping flow parameter name to
                value.
            run_config (Optional[LocalRunConfig]): LocalRunConfig object to configure
                flow fun.
            flow (Flow): Prefect flow to execute.
            **kwargs: Keyword arguments to intantiate the LocalRunConfig.

        Raises:
            pydantic.ValidationError: Error validating run configuration.

        """

        if run_config is not None and len(kwargs):
            warnings.warn(
                "Both run_config and kwargs passed to LocalBackend.run. Flow\
                will execute using passed run_config."
            )

        if run_config is None:
            run_config = LocalRunConfig(**kwargs)

        # convert to Prefect LocalRun
        prefect_run_config = run_config.build()

        # apply run config
        flow.run_config = prefect_run_config
        flow.run(parameters=data)

    def run_and_return(
        self,
        data: Dict[str, Any],
        run_config: LocalRunConfig = None,
        task_name: str = None,
        *,
        flow: Flow,
        **kwargs
    ) -> Any:
        """Run flow execution and return result.

        Args:
            data (Optional[Dict[str, Any]]): Dictionary mapping flow parameter name to
                value.
            run_config (Optional[LocalRunConfig]): LocalRunConfig object to configure
                flow fun.
            task_name (Optional[str]): Name of task to return result. If no task slug
                is passed, will return the flow result.
            flow (Flow): Prefect flow to execute.
            **kwargs: Keyword arguments to intantiate the LocalRunConfig.

        Raises:
            pydantic.ValidationError: Error validating run configuration.
            EmptyResultError: No result is associated with the flow.
            TaskNotCompletedError: Result reference task was not completed.
            TaskNotInFlowError: Provided task slug not in flow.

        """
        if run_config is not None and len(kwargs):
            warnings.warn(
                "Both run_config and kwargs passed to LocalBackend.run. Flow\
                will execute using passed run_config."
            )

        if run_config is None:
            run_config = LocalRunConfig(**kwargs)

        # convert to Prefect LocalRun
        prefect_run_config = run_config.build()

        # apply run config
        flow.run_config = prefect_run_config

        try:
            flow_run = flow.run(parameters=data)
            if flow_run.is_failed():
                logger.exception(flow_run.message)
                raise FlowFailedError(
                    flow_id="local_flow",
                    flow_run_id="local_flow_run",
                    exception_message=flow_run.message,
                )

        except Exception as e:
            logger.exception(e.message)
            raise FlowFailedError(
                flow_id="local_flow",
                flow_run_id="local_flow_run",
                exception_message=e.message,
            )

        result = flow_run.result

        if result is None:
            raise EmptyResultError

        task_to_slug_map = {task: slug for task, slug in flow.slugs.items()}
        # slug_to_task_map = {slug: task for task, slug in flow.slugs.items()}

        # account for task slug
        if task_name is not None:
            # get tasks
            tasks = flow.get_tasks(name=task_name)
            if not len(tasks):
                raise TaskNotInFlowError(
                    flow_name=flow.name, project_name="local", task_name=task_name
                )

            results = []
            for task in tasks:
                slug = task_to_slug_map.get(task)
                state = result[task]
                if not state.is_successful():
                    raise TaskNotCompletedError(
                        slug, flow_id="local_flow", flow_run_id="local_flow_run"
                    )

                res = state.result
                if res is None:
                    raise EmptyResultError("local_flow", "local_flow_run", slug)

                results.append(state.result)

            if len(tasks) == 1:
                return results[0]

            else:
                return results

        # else return dict of task slug to value
        else:
            return {
                slug: result[task].result for task, slug in task_to_slug_map.items()
            }

    def create_project(self, *args, **kwargs) -> None:
        """Raise LocalBackendError for calls to register_flow server-type method.

        Raises:
            LocalBackendError: Indicates that a server-backend operation has been
                executed against the LocalBackend. Server-backend operations include
                flow registration and remote execution.

        """
        raise LocalBackendError()

    def register_flow(self, *args, **kwargs) -> None:
        """Raise LocalBackendError for calls to register_flow server-type method.

        Raises:
            LocalBackendError: Indicates that a server-backend operation has been
                executed against the LocalBackend. Server-backend operations include
                flow registration and remote execution.


        """
        raise LocalBackendError()

    def load_flow(self, *args, **kwargs) -> None:
        """Raise LocalBackendError for calls to load_flow server-type method.

        Raises:
            LocalBackendError: Indicates that a server-backend operation has been
                executed against the LocalBackend. Server-backend operations include
                flow registration and remote execution.

        """
        raise LocalBackendError()

create_project(*args, **kwargs)

Raise LocalBackendError for calls to register_flow server-type method.

Raises:

Type Description
LocalBackendError

Indicates that a server-backend operation has been executed against the LocalBackend. Server-backend operations include flow registration and remote execution.

Source code in lume_services/services/scheduling/backends/local.py
214
215
216
217
218
219
220
221
222
223
def create_project(self, *args, **kwargs) -> None:
    """Raise LocalBackendError for calls to register_flow server-type method.

    Raises:
        LocalBackendError: Indicates that a server-backend operation has been
            executed against the LocalBackend. Server-backend operations include
            flow registration and remote execution.

    """
    raise LocalBackendError()

load_flow(*args, **kwargs)

Raise LocalBackendError for calls to load_flow server-type method.

Raises:

Type Description
LocalBackendError

Indicates that a server-backend operation has been executed against the LocalBackend. Server-backend operations include flow registration and remote execution.

Source code in lume_services/services/scheduling/backends/local.py
237
238
239
240
241
242
243
244
245
246
def load_flow(self, *args, **kwargs) -> None:
    """Raise LocalBackendError for calls to load_flow server-type method.

    Raises:
        LocalBackendError: Indicates that a server-backend operation has been
            executed against the LocalBackend. Server-backend operations include
            flow registration and remote execution.

    """
    raise LocalBackendError()

register_flow(*args, **kwargs)

Raise LocalBackendError for calls to register_flow server-type method.

Raises:

Type Description
LocalBackendError

Indicates that a server-backend operation has been executed against the LocalBackend. Server-backend operations include flow registration and remote execution.

Source code in lume_services/services/scheduling/backends/local.py
225
226
227
228
229
230
231
232
233
234
235
def register_flow(self, *args, **kwargs) -> None:
    """Raise LocalBackendError for calls to register_flow server-type method.

    Raises:
        LocalBackendError: Indicates that a server-backend operation has been
            executed against the LocalBackend. Server-backend operations include
            flow registration and remote execution.


    """
    raise LocalBackendError()

run(data, run_config=None, *, flow, **kwargs)

Run flow execution. Does not return result.

Parameters:

Name Type Description Default
labels Optional[List[str]]

an list of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work.

required
env Optional[dict]

Additional environment variables to set on the job

required
data Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value.

required
run_config Optional[LocalRunConfig]

LocalRunConfig object to configure flow fun.

None
flow Flow

Prefect flow to execute.

required
**kwargs

Keyword arguments to intantiate the LocalRunConfig.

{}

Raises:

Type Description
ValidationError

Error validating run configuration.

Source code in lume_services/services/scheduling/backends/local.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def run(
    self,
    data: Dict[str, Any],
    run_config: LocalRunConfig = None,
    *,
    flow: Flow,
    **kwargs
) -> None:
    """Run flow execution. Does not return result.

    Args:
        labels (Optional[List[str]]): an list of labels to apply to this run
            config. Labels are string identifiers used by Prefect Agents for
            selecting valid flow runs when polling for work.
        env (Optional[dict]): Additional environment variables to set on the job
        data (Optional[Dict[str, Any]]): Dictionary mapping flow parameter name to
            value.
        run_config (Optional[LocalRunConfig]): LocalRunConfig object to configure
            flow fun.
        flow (Flow): Prefect flow to execute.
        **kwargs: Keyword arguments to intantiate the LocalRunConfig.

    Raises:
        pydantic.ValidationError: Error validating run configuration.

    """

    if run_config is not None and len(kwargs):
        warnings.warn(
            "Both run_config and kwargs passed to LocalBackend.run. Flow\
            will execute using passed run_config."
        )

    if run_config is None:
        run_config = LocalRunConfig(**kwargs)

    # convert to Prefect LocalRun
    prefect_run_config = run_config.build()

    # apply run config
    flow.run_config = prefect_run_config
    flow.run(parameters=data)

run_and_return(data, run_config=None, task_name=None, *, flow, **kwargs)

Run flow execution and return result.

Parameters:

Name Type Description Default
data Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value.

required
run_config Optional[LocalRunConfig]

LocalRunConfig object to configure flow fun.

None
task_name Optional[str]

Name of task to return result. If no task slug is passed, will return the flow result.

None
flow Flow

Prefect flow to execute.

required
**kwargs

Keyword arguments to intantiate the LocalRunConfig.

{}

Raises:

Type Description
ValidationError

Error validating run configuration.

EmptyResultError

No result is associated with the flow.

TaskNotCompletedError

Result reference task was not completed.

TaskNotInFlowError

Provided task slug not in flow.

Source code in lume_services/services/scheduling/backends/local.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def run_and_return(
    self,
    data: Dict[str, Any],
    run_config: LocalRunConfig = None,
    task_name: str = None,
    *,
    flow: Flow,
    **kwargs
) -> Any:
    """Run flow execution and return result.

    Args:
        data (Optional[Dict[str, Any]]): Dictionary mapping flow parameter name to
            value.
        run_config (Optional[LocalRunConfig]): LocalRunConfig object to configure
            flow fun.
        task_name (Optional[str]): Name of task to return result. If no task slug
            is passed, will return the flow result.
        flow (Flow): Prefect flow to execute.
        **kwargs: Keyword arguments to intantiate the LocalRunConfig.

    Raises:
        pydantic.ValidationError: Error validating run configuration.
        EmptyResultError: No result is associated with the flow.
        TaskNotCompletedError: Result reference task was not completed.
        TaskNotInFlowError: Provided task slug not in flow.

    """
    if run_config is not None and len(kwargs):
        warnings.warn(
            "Both run_config and kwargs passed to LocalBackend.run. Flow\
            will execute using passed run_config."
        )

    if run_config is None:
        run_config = LocalRunConfig(**kwargs)

    # convert to Prefect LocalRun
    prefect_run_config = run_config.build()

    # apply run config
    flow.run_config = prefect_run_config

    try:
        flow_run = flow.run(parameters=data)
        if flow_run.is_failed():
            logger.exception(flow_run.message)
            raise FlowFailedError(
                flow_id="local_flow",
                flow_run_id="local_flow_run",
                exception_message=flow_run.message,
            )

    except Exception as e:
        logger.exception(e.message)
        raise FlowFailedError(
            flow_id="local_flow",
            flow_run_id="local_flow_run",
            exception_message=e.message,
        )

    result = flow_run.result

    if result is None:
        raise EmptyResultError

    task_to_slug_map = {task: slug for task, slug in flow.slugs.items()}
    # slug_to_task_map = {slug: task for task, slug in flow.slugs.items()}

    # account for task slug
    if task_name is not None:
        # get tasks
        tasks = flow.get_tasks(name=task_name)
        if not len(tasks):
            raise TaskNotInFlowError(
                flow_name=flow.name, project_name="local", task_name=task_name
            )

        results = []
        for task in tasks:
            slug = task_to_slug_map.get(task)
            state = result[task]
            if not state.is_successful():
                raise TaskNotCompletedError(
                    slug, flow_id="local_flow", flow_run_id="local_flow_run"
                )

            res = state.result
            if res is None:
                raise EmptyResultError("local_flow", "local_flow_run", slug)

            results.append(state.result)

        if len(tasks) == 1:
            return results[0]

        else:
            return results

    # else return dict of task slug to value
    else:
        return {
            slug: result[task].result for task, slug in task_to_slug_map.items()
        }

LocalRunConfig

Bases: RunConfig

Local run configuration. If no directory is found at the filepath passed as working_dir, an error will be raised.

Attributes:

Name Type Description
labels Optional[List[str]]

an list of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work

env Optional[Dict[str, str]]

Dictionary of environment variables to use for run

working_dir Optional[str]

Working directory

Source code in lume_services/services/scheduling/backends/local.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class LocalRunConfig(RunConfig):
    """Local run configuration. If no directory is found at the filepath passed as
    working_dir, an error will be raised.

    Attributes:
        labels (Optional[List[str]]): an list of labels to apply to this run
            config. Labels are string identifiers used by Prefect Agents for selecting
            valid flow runs when polling for work
        env (Optional[Dict[str, str]]): Dictionary of environment variables to use for
            run
        working_dir (Optional[str]): Working directory

    """

    env: Optional[Dict[str, str]]
    working_dir: Optional[str] = str(os.getcwd())

    @validator("working_dir", pre=True)
    def validate(cls, v):
        """Pydantic validator checking working directory existence"""
        if not os.path.isdir(v):
            raise FileNotFoundError("No directory found at %s", v)

        return v

    def build(self) -> LocalRun:
        """Method for converting to Prefect RunConfig type LocalRun.

        Returns:
            LocalRun

        """
        return LocalRun(**self.dict(exclude_none=True))

build()

Method for converting to Prefect RunConfig type LocalRun.

Returns:

Type Description
LocalRun

LocalRun

Source code in lume_services/services/scheduling/backends/local.py
46
47
48
49
50
51
52
53
def build(self) -> LocalRun:
    """Method for converting to Prefect RunConfig type LocalRun.

    Returns:
        LocalRun

    """
    return LocalRun(**self.dict(exclude_none=True))

validate(v)

Pydantic validator checking working directory existence

Source code in lume_services/services/scheduling/backends/local.py
38
39
40
41
42
43
44
@validator("working_dir", pre=True)
def validate(cls, v):
    """Pydantic validator checking working directory existence"""
    if not os.path.isdir(v):
        raise FileNotFoundError("No directory found at %s", v)

    return v

ServerBackend

Bases: Backend

Abstract backend used for connecting to a Prefect server.

Prefect manages its own contexts for the purpose of registering flow objects etc. This introduced issues with management of clients, namely that even after setting the prefect configuration in the PrefectConfig.apply method, the original cloud context was still being used to construct the client. For this reason, all clients are constructed inside a context constructed from the backend configuration.

Attributes:

Name Type Description
config PrefectConfig

Instantiated PrefectConfig object describing connection to Prefect server.

default_image str

Default image used for registering flow storage.

Source code in lume_services/services/scheduling/backends/server.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
class ServerBackend(Backend):
    """Abstract backend used for connecting to a Prefect server.

    Prefect manages its own contexts for the purpose of registering flow objects
    etc. This introduced issues with management of clients, namely that even after
    setting the prefect configuration in the PrefectConfig.apply method, the
    original cloud context was still being used to construct the client. For this
    reason, all clients are constructed inside a context constructed from the backend
    configuration.


    Attributes:
        config (PrefectConfig): Instantiated PrefectConfig object describing connection
            to Prefect server.
        default_image (str): Default image used for registering flow storage.

    """

    config: PrefectConfig

    class Config:
        underscore_attrs_are_private = True

    @abstractproperty
    def run_config_type(self) -> PrefectRunConfig:
        """Abstract property that must return the Prefect RunConfig type pertinent to
        the Backend implementation.

        """
        ...

    def create_project(self, project_name: str) -> None:
        """Create a Prefect project.

        Args:
            project_name (str): Create a named Prefect project.

        Raises:
            prefect.errors.ClientError: if the GraphQL query is bad for any reason

        """
        with prefect.context(config=self.config.apply()):
            client = Client()
            client.create_project(project_name=project_name)

    def register_flow(
        self,
        flow: Flow,
        project_name: str,
        image: str = None,
        labels: List[str] = None,
        idempotency_key: str = None,
        version_group_id: str = None,
        build: bool = True,
        no_url: bool = False,
        set_schedule_active: bool = True,
    ) -> str:
        """Register a flow with Prefect.

        Args:
            flow (Flow): Prefect flow to register
            project_name (str): Name of project to register flow to
            image (str): Name of Docker image to run flow inside. If not specified,
                this will use the default image packaged with this repository.
            build (bool): Whether the flows storage should be built prior to
                serialization. By default lume-services flows use the same
                image for execution with additional packages passed for installation
                configured at runtime.
            labels (Optional[List[str]]): A list of labels to add to this Flow.
            idempotency_key (Optional[str]): a key that, if matching the most recent
                registration call for this flow group, will prevent the creation of
                another flow version and return the existing flow id instead.
            version_group_id (Optional[str]): The UUID version group ID to use for
                versioning this Flow in Cloud. If not provided, the version group ID
                associated with this Flow's project and name will be used.
            no_url (Optional[bool]): If True, the stdout from this function will not
                contain the URL link to the newly-registered flow in the UI
            set_schedule_active (Optional[bool]): If False, will set the schedule to
                inactive in the database to prevent auto-scheduling runs (if the Flow
                has a schedule)

        Returns:
            str: ID of registered flow

        Notes:
            prefect registration idempotency key omitted and version group...

        Raises:
            prefect.errors.ClientError: if the GraphQL query is bad for any reason

        """
        if not image:
            image = self.default_image

        # configure run config for backend
        run_config = self.run_config_type(image=image)
        flow.run_config = run_config.build()
        if labels is not None:
            logger.info(
                "Flow run config is not empty. Clearing existing labels and assigning \
                    new."
            )
            flow.run_config.labels = set(labels)

        flow.run_config.image_tag = image

        with prefect.context(config=self.config.apply()):
            flow_id = flow.register(
                project_name=project_name,
                build=build,
                set_schedule_active=set_schedule_active,
                version_group_id=version_group_id,
                no_url=no_url,
                idempotency_key=idempotency_key,
            )

        return flow_id

    def load_flow(self, flow_name: str, project_name: str) -> dict:
        """Load a Prefect flow object.

        Args:
            flow_name (str): Name of flow.
            project_name (str): Name of project flow is registered with.

        Returns:
            dict: Dictionary with keys "flow_id" and "flow"

        Raises:
            prefect.errors.ClientError: if the GraphQL query is bad for any reason

        """

        flow_view = FlowView.from_flow_name(
            flow_name, project_name=project_name, last_updated=True
        )
        with prefect.context(config=self.config.apply()):
            flow_view = FlowView.from_flow_name(
                flow_name, project_name=project_name, last_updated=True
            )
            return {"flow_id": flow_view.flow_id, "flow": flow_view.flow}

    def run(
        self,
        parameters: Dict[str, Any] = None,
        run_config: RunConfig = None,
        *,
        flow_id: str,
        **kwargs,
    ) -> str:
        """Create a flow run for a flow.

        Args:
            flow_id (str): Flow identifier
            parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
                name to value
            run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
            **kwargs: Keyword arguments to intantiate the RunConfig.

        Returns:
            str: ID of flow run

        Raises:
            prefect.errors.ClientError: if the GraphQL query is bad for any reason
            docker.errors.DockerException: Run configuration error for docker api.
            pydantic.ValidationError: Error validating run configuration.
            ValueError: Value error on flow run
        """
        if run_config is not None and len(kwargs):
            warnings.warn(
                "Both run_config and kwargs passed to Backend.run. Flow\
                will execute using passed run_config."
            )

        with prefect.context(config=self.config.apply()):
            client = Client()

            flow_view = FlowView.from_flow_id(flow_id)

            # convert LUME-services run config to appropriate Prefect RunConfig object
            if run_config is None:
                run_config = self.run_config_type(
                    env={"PREFECT__CONTEXT__PROJECT_NAME": flow_view.project_name},
                    **kwargs,
                )

            prefect_run_config = run_config.build()

            flow_run_id = client.create_flow_run(
                flow_id=flow_id, parameters=parameters, run_config=prefect_run_config
            )

        return flow_run_id

    def run_and_return(
        self,
        parameters: Dict[str, Any] = None,
        run_config: RunConfig = None,
        task_name: str = None,
        *,
        flow_id: str,
        timeout: timedelta = timedelta(minutes=1),
        cancel_on_timeout: bool = True,
        **kwargs,
    ):
        """Create a flow run for a flow and return the result.

        Args:
            parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
                name to value
            run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
            task_name (Optional[str]): Name of task to return result. If no task slug
                is passed, will return the flow result.
            flow_id (str): ID of flow to run.
            timeout (timedelta): Time before stopping flow execution.
            cancel_on_timeout (bool): Whether to cancel execution on timeout
                error.
            **kwargs: Keyword arguments to intantiate the RunConfig.

        Raises:
            EmptyResultError: No result is associated with the flow.
            TaskNotCompletedError: Result reference task was not completed.
            RuntimeError: Flow did not complete within given timeout.
            prefect.errors.ClientError: if the GraphQL query is bad for any reason
            docker.errors.DockerException: Run configuration error for docker api.
            pydantic.ValidationError: Error validating run configuration.
            TaskNotInFlowError: Provided task slug not in flow.
            ValueError: Value error on flow run
        """
        if run_config is not None and len(kwargs):
            warnings.warn(
                "Both run_config and kwargs passed to Backend.run. Flow\
                will execute using passed run_config."
            )

        with prefect.context(config=self.config.apply()):
            client = Client()

            flow_view = FlowView.from_flow_id(flow_id)

            # convert LUME-services run config to appropriate Prefect RunConfig object
            if run_config is None:
                run_config = self.run_config_type(
                    env={"PREFECT__CONTEXT__PROJECT_NAME": flow_view.project_name},
                    **kwargs,
                )

            logger.info(
                "Creating Prefect flow run for %s with parameters %s and run_config %s",
                flow_id,
                parameters,
                run_config.json(),
            )

            prefect_run_config = run_config.build()

            flow_run_id = client.create_flow_run(
                flow_id=flow_id, parameters=parameters, run_config=prefect_run_config
            )

            # watch flow run and stream logs until timeout
            try:
                for log in watch_flow_run(
                    flow_run_id,
                    stream_states=True,
                    stream_logs=True,
                    max_duration=timeout,
                ):
                    logger.info(log)
            except RuntimeError as err:
                if cancel_on_timeout:
                    client.cancel_flow_run(flow_run_id=flow_run_id)
                raise err

            logger.debug("Watched flow completed.")
            flow_run = FlowRunView.from_flow_run_id(flow_run_id)

            # check state
            if flow_run.state.is_failed():
                logger.exception(flow_run.state.message)
                raise FlowFailedError(
                    flow_id=flow_run.flow_id,
                    flow_run_id=flow_run.flow_run_id,
                    exception_message=flow_run.state.message,
                )

            task_runs = flow_run.get_all_task_runs()

            # populate tasks
            results = {}
            for task_run in task_runs:
                slug = task_run.task_slug
                if not task_run.state.is_successful():
                    raise TaskNotCompletedError(slug, flow_id, flow_run_id)

                try:
                    res = task_run.get_result()
                # location is not set, no result
                except ValueError:
                    res = None

                results[slug] = res

        # get task run
        if task_name is not None:
            # filter tasks based on name
            task_runs = {
                slug: res for slug, res in results.items() if task_name in slug
            }
            logger.debug(task_runs)

            if not len(task_runs):
                raise TaskNotInFlowError(
                    flow_name=flow_view.name,
                    project_name=flow_view.project_name,
                    task_name=task_name,
                )

            if len(task_runs) == 1:
                res = list(task_runs.values())[0]
                if res is None:
                    raise EmptyResultError(flow_id, flow_run_id, slug)

                return res

            else:
                return task_runs

        # assume flow result, return all results
        else:
            return results

create_project(project_name)

Create a Prefect project.

Parameters:

Name Type Description Default
project_name str

Create a named Prefect project.

required

Raises:

Type Description
ClientError

if the GraphQL query is bad for any reason

Source code in lume_services/services/scheduling/backends/server.py
110
111
112
113
114
115
116
117
118
119
120
121
122
def create_project(self, project_name: str) -> None:
    """Create a Prefect project.

    Args:
        project_name (str): Create a named Prefect project.

    Raises:
        prefect.errors.ClientError: if the GraphQL query is bad for any reason

    """
    with prefect.context(config=self.config.apply()):
        client = Client()
        client.create_project(project_name=project_name)

load_flow(flow_name, project_name)

Load a Prefect flow object.

Parameters:

Name Type Description Default
flow_name str

Name of flow.

required
project_name str

Name of project flow is registered with.

required

Returns:

Name Type Description
dict dict

Dictionary with keys "flow_id" and "flow"

Raises:

Type Description
ClientError

if the GraphQL query is bad for any reason

Source code in lume_services/services/scheduling/backends/server.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def load_flow(self, flow_name: str, project_name: str) -> dict:
    """Load a Prefect flow object.

    Args:
        flow_name (str): Name of flow.
        project_name (str): Name of project flow is registered with.

    Returns:
        dict: Dictionary with keys "flow_id" and "flow"

    Raises:
        prefect.errors.ClientError: if the GraphQL query is bad for any reason

    """

    flow_view = FlowView.from_flow_name(
        flow_name, project_name=project_name, last_updated=True
    )
    with prefect.context(config=self.config.apply()):
        flow_view = FlowView.from_flow_name(
            flow_name, project_name=project_name, last_updated=True
        )
        return {"flow_id": flow_view.flow_id, "flow": flow_view.flow}

register_flow(flow, project_name, image=None, labels=None, idempotency_key=None, version_group_id=None, build=True, no_url=False, set_schedule_active=True)

Register a flow with Prefect.

Parameters:

Name Type Description Default
flow Flow

Prefect flow to register

required
project_name str

Name of project to register flow to

required
image str

Name of Docker image to run flow inside. If not specified, this will use the default image packaged with this repository.

None
build bool

Whether the flows storage should be built prior to serialization. By default lume-services flows use the same image for execution with additional packages passed for installation configured at runtime.

True
labels Optional[List[str]]

A list of labels to add to this Flow.

None
idempotency_key Optional[str]

a key that, if matching the most recent registration call for this flow group, will prevent the creation of another flow version and return the existing flow id instead.

None
version_group_id Optional[str]

The UUID version group ID to use for versioning this Flow in Cloud. If not provided, the version group ID associated with this Flow's project and name will be used.

None
no_url Optional[bool]

If True, the stdout from this function will not contain the URL link to the newly-registered flow in the UI

False
set_schedule_active Optional[bool]

If False, will set the schedule to inactive in the database to prevent auto-scheduling runs (if the Flow has a schedule)

True

Returns:

Name Type Description
str str

ID of registered flow

Notes

prefect registration idempotency key omitted and version group...

Raises:

Type Description
ClientError

if the GraphQL query is bad for any reason

Source code in lume_services/services/scheduling/backends/server.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def register_flow(
    self,
    flow: Flow,
    project_name: str,
    image: str = None,
    labels: List[str] = None,
    idempotency_key: str = None,
    version_group_id: str = None,
    build: bool = True,
    no_url: bool = False,
    set_schedule_active: bool = True,
) -> str:
    """Register a flow with Prefect.

    Args:
        flow (Flow): Prefect flow to register
        project_name (str): Name of project to register flow to
        image (str): Name of Docker image to run flow inside. If not specified,
            this will use the default image packaged with this repository.
        build (bool): Whether the flows storage should be built prior to
            serialization. By default lume-services flows use the same
            image for execution with additional packages passed for installation
            configured at runtime.
        labels (Optional[List[str]]): A list of labels to add to this Flow.
        idempotency_key (Optional[str]): a key that, if matching the most recent
            registration call for this flow group, will prevent the creation of
            another flow version and return the existing flow id instead.
        version_group_id (Optional[str]): The UUID version group ID to use for
            versioning this Flow in Cloud. If not provided, the version group ID
            associated with this Flow's project and name will be used.
        no_url (Optional[bool]): If True, the stdout from this function will not
            contain the URL link to the newly-registered flow in the UI
        set_schedule_active (Optional[bool]): If False, will set the schedule to
            inactive in the database to prevent auto-scheduling runs (if the Flow
            has a schedule)

    Returns:
        str: ID of registered flow

    Notes:
        prefect registration idempotency key omitted and version group...

    Raises:
        prefect.errors.ClientError: if the GraphQL query is bad for any reason

    """
    if not image:
        image = self.default_image

    # configure run config for backend
    run_config = self.run_config_type(image=image)
    flow.run_config = run_config.build()
    if labels is not None:
        logger.info(
            "Flow run config is not empty. Clearing existing labels and assigning \
                new."
        )
        flow.run_config.labels = set(labels)

    flow.run_config.image_tag = image

    with prefect.context(config=self.config.apply()):
        flow_id = flow.register(
            project_name=project_name,
            build=build,
            set_schedule_active=set_schedule_active,
            version_group_id=version_group_id,
            no_url=no_url,
            idempotency_key=idempotency_key,
        )

    return flow_id

run(parameters=None, run_config=None, *, flow_id, **kwargs)

Create a flow run for a flow.

Parameters:

Name Type Description Default
flow_id str

Flow identifier

required
parameters Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value

None
run_config Optional[RunConfig]

RunConfig object to configure flow fun.

None
**kwargs

Keyword arguments to intantiate the RunConfig.

{}

Returns:

Name Type Description
str str

ID of flow run

Raises:

Type Description
ClientError

if the GraphQL query is bad for any reason

DockerException

Run configuration error for docker api.

ValidationError

Error validating run configuration.

ValueError

Value error on flow run

Source code in lume_services/services/scheduling/backends/server.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def run(
    self,
    parameters: Dict[str, Any] = None,
    run_config: RunConfig = None,
    *,
    flow_id: str,
    **kwargs,
) -> str:
    """Create a flow run for a flow.

    Args:
        flow_id (str): Flow identifier
        parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
            name to value
        run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
        **kwargs: Keyword arguments to intantiate the RunConfig.

    Returns:
        str: ID of flow run

    Raises:
        prefect.errors.ClientError: if the GraphQL query is bad for any reason
        docker.errors.DockerException: Run configuration error for docker api.
        pydantic.ValidationError: Error validating run configuration.
        ValueError: Value error on flow run
    """
    if run_config is not None and len(kwargs):
        warnings.warn(
            "Both run_config and kwargs passed to Backend.run. Flow\
            will execute using passed run_config."
        )

    with prefect.context(config=self.config.apply()):
        client = Client()

        flow_view = FlowView.from_flow_id(flow_id)

        # convert LUME-services run config to appropriate Prefect RunConfig object
        if run_config is None:
            run_config = self.run_config_type(
                env={"PREFECT__CONTEXT__PROJECT_NAME": flow_view.project_name},
                **kwargs,
            )

        prefect_run_config = run_config.build()

        flow_run_id = client.create_flow_run(
            flow_id=flow_id, parameters=parameters, run_config=prefect_run_config
        )

    return flow_run_id

run_and_return(parameters=None, run_config=None, task_name=None, *, flow_id, timeout=timedelta(minutes=1), cancel_on_timeout=True, **kwargs)

Create a flow run for a flow and return the result.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Any]]

Dictionary mapping flow parameter name to value

None
run_config Optional[RunConfig]

RunConfig object to configure flow fun.

None
task_name Optional[str]

Name of task to return result. If no task slug is passed, will return the flow result.

None
flow_id str

ID of flow to run.

required
timeout timedelta

Time before stopping flow execution.

timedelta(minutes=1)
cancel_on_timeout bool

Whether to cancel execution on timeout error.

True
**kwargs

Keyword arguments to intantiate the RunConfig.

{}

Raises:

Type Description
EmptyResultError

No result is associated with the flow.

TaskNotCompletedError

Result reference task was not completed.

RuntimeError

Flow did not complete within given timeout.

ClientError

if the GraphQL query is bad for any reason

DockerException

Run configuration error for docker api.

ValidationError

Error validating run configuration.

TaskNotInFlowError

Provided task slug not in flow.

ValueError

Value error on flow run

Source code in lume_services/services/scheduling/backends/server.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def run_and_return(
    self,
    parameters: Dict[str, Any] = None,
    run_config: RunConfig = None,
    task_name: str = None,
    *,
    flow_id: str,
    timeout: timedelta = timedelta(minutes=1),
    cancel_on_timeout: bool = True,
    **kwargs,
):
    """Create a flow run for a flow and return the result.

    Args:
        parameters (Optional[Dict[str, Any]]): Dictionary mapping flow parameter
            name to value
        run_config (Optional[RunConfig]): RunConfig object to configure flow fun.
        task_name (Optional[str]): Name of task to return result. If no task slug
            is passed, will return the flow result.
        flow_id (str): ID of flow to run.
        timeout (timedelta): Time before stopping flow execution.
        cancel_on_timeout (bool): Whether to cancel execution on timeout
            error.
        **kwargs: Keyword arguments to intantiate the RunConfig.

    Raises:
        EmptyResultError: No result is associated with the flow.
        TaskNotCompletedError: Result reference task was not completed.
        RuntimeError: Flow did not complete within given timeout.
        prefect.errors.ClientError: if the GraphQL query is bad for any reason
        docker.errors.DockerException: Run configuration error for docker api.
        pydantic.ValidationError: Error validating run configuration.
        TaskNotInFlowError: Provided task slug not in flow.
        ValueError: Value error on flow run
    """
    if run_config is not None and len(kwargs):
        warnings.warn(
            "Both run_config and kwargs passed to Backend.run. Flow\
            will execute using passed run_config."
        )

    with prefect.context(config=self.config.apply()):
        client = Client()

        flow_view = FlowView.from_flow_id(flow_id)

        # convert LUME-services run config to appropriate Prefect RunConfig object
        if run_config is None:
            run_config = self.run_config_type(
                env={"PREFECT__CONTEXT__PROJECT_NAME": flow_view.project_name},
                **kwargs,
            )

        logger.info(
            "Creating Prefect flow run for %s with parameters %s and run_config %s",
            flow_id,
            parameters,
            run_config.json(),
        )

        prefect_run_config = run_config.build()

        flow_run_id = client.create_flow_run(
            flow_id=flow_id, parameters=parameters, run_config=prefect_run_config
        )

        # watch flow run and stream logs until timeout
        try:
            for log in watch_flow_run(
                flow_run_id,
                stream_states=True,
                stream_logs=True,
                max_duration=timeout,
            ):
                logger.info(log)
        except RuntimeError as err:
            if cancel_on_timeout:
                client.cancel_flow_run(flow_run_id=flow_run_id)
            raise err

        logger.debug("Watched flow completed.")
        flow_run = FlowRunView.from_flow_run_id(flow_run_id)

        # check state
        if flow_run.state.is_failed():
            logger.exception(flow_run.state.message)
            raise FlowFailedError(
                flow_id=flow_run.flow_id,
                flow_run_id=flow_run.flow_run_id,
                exception_message=flow_run.state.message,
            )

        task_runs = flow_run.get_all_task_runs()

        # populate tasks
        results = {}
        for task_run in task_runs:
            slug = task_run.task_slug
            if not task_run.state.is_successful():
                raise TaskNotCompletedError(slug, flow_id, flow_run_id)

            try:
                res = task_run.get_result()
            # location is not set, no result
            except ValueError:
                res = None

            results[slug] = res

    # get task run
    if task_name is not None:
        # filter tasks based on name
        task_runs = {
            slug: res for slug, res in results.items() if task_name in slug
        }
        logger.debug(task_runs)

        if not len(task_runs):
            raise TaskNotInFlowError(
                flow_name=flow_view.name,
                project_name=flow_view.project_name,
                task_name=task_name,
            )

        if len(task_runs) == 1:
            res = list(task_runs.values())[0]
            if res is None:
                raise EmptyResultError(flow_id, flow_run_id, slug)

            return res

        else:
            return task_runs

    # assume flow result, return all results
    else:
        return results

run_config_type()

Abstract property that must return the Prefect RunConfig type pertinent to the Backend implementation.

Source code in lume_services/services/scheduling/backends/server.py
102
103
104
105
106
107
108
@abstractproperty
def run_config_type(self) -> PrefectRunConfig:
    """Abstract property that must return the Prefect RunConfig type pertinent to
    the Backend implementation.

    """
    ...

DockerBackend

Bases: ServerBackend

Implementation of Backend used for interacting with prefect deployed in cluster of Docker containers, as with docker-compose.

Attributes:

Name Type Description
config PrefectConfig

Instantiated PrefectConfig object describing connection to Prefect server.

_client Client

Prefect client connection created on instantiation.

_run_config_type type

Type used to compose Prefect run configuration.

Source code in lume_services/services/scheduling/backends/docker.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class DockerBackend(ServerBackend):
    """Implementation of Backend used for interacting with prefect deployed in
    cluster of Docker containers, as with docker-compose.

    Attributes:
        config (PrefectConfig): Instantiated PrefectConfig object describing connection
            to Prefect server.
        _client (Client): Prefect client connection created on instantiation.
        _run_config_type (type): Type used to compose Prefect run configuration.

    """

    _run_config_type: type = DockerRunConfig

    @property
    def run_config_type(self):
        return self._run_config_type

DockerRunConfig

Bases: RunConfig

Pydantic representation of a Docker Prefect run configuration: https://docs.prefect.io/api/latest/run_configs.html#dockerrun

Attributes:

Name Type Description
labels Optional[List[str]]

an list of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work

env Optional[dict]

Additional environment variables to set on the job

image str

Tag of image in which flow should run.

host_config Optional[Dict[str, Any]]

Dictionary representing runtime args to be passed to Docker agent. Full documentation of args can be found here: https://docker-py.readthedocs.io/en/stable/api.html#docker.api.container.ContainerApiMixin.create_host_config

ports Optional[List[int]]

An list of ports numbers to expose on container.

Source code in lume_services/services/scheduling/backends/docker.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class DockerRunConfig(RunConfig):
    """Pydantic representation of a Docker Prefect run configuration:
    https://docs.prefect.io/api/latest/run_configs.html#dockerrun

    Attributes:
        labels (Optional[List[str]]): an list of labels to apply to this run
            config. Labels are string identifiers used by Prefect Agents for selecting
            valid flow runs when polling for work
        env (Optional[dict]): Additional environment variables to set on the job
        image (str): Tag of image in which flow should run.
        host_config (Optional[Dict[str, Any]]): Dictionary representing runtime args
            to be passed to Docker agent. Full documentation of args can be found here:
            https://docker-py.readthedocs.io/en/stable/api.html#docker.api.container.ContainerApiMixin.create_host_config
        ports (Optional[List[int]]): An list of ports numbers to expose on
            container.

    """  # noqa

    image: str
    host_config: Dict[str, Any] = None
    ports: Optional[List[int]]

    @validator("host_config", pre=True)
    def validate_host_config(cls, v):
        """Composes a model for the Docker host configuration and applies any passed
        values.

        """
        if isinstance(v, (dict,)):
            # test host config composition using api version
            try:
                HostConfig(version=docker_api_version(), **v)
            except Exception as e:
                logger.exception(e)
                raise e

        return v

    def build(self) -> DockerRun:
        """Method for converting to Prefect RunConfig type DockerRun.

        Returns:
            DockerRun

        """
        return DockerRun(**self.dict(exclude_none=True))

build()

Method for converting to Prefect RunConfig type DockerRun.

Returns:

Type Description
DockerRun

DockerRun

Source code in lume_services/services/scheduling/backends/docker.py
54
55
56
57
58
59
60
61
def build(self) -> DockerRun:
    """Method for converting to Prefect RunConfig type DockerRun.

    Returns:
        DockerRun

    """
    return DockerRun(**self.dict(exclude_none=True))

validate_host_config(v)

Composes a model for the Docker host configuration and applies any passed values.

Source code in lume_services/services/scheduling/backends/docker.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@validator("host_config", pre=True)
def validate_host_config(cls, v):
    """Composes a model for the Docker host configuration and applies any passed
    values.

    """
    if isinstance(v, (dict,)):
        # test host config composition using api version
        try:
            HostConfig(version=docker_api_version(), **v)
        except Exception as e:
            logger.exception(e)
            raise e

    return v

KubernetesBackend

Bases: ServerBackend

Implementation of Backend used for interacting with Prefect deployed in K8 cluster.

Attributes:

Name Type Description
config PrefectConfig

Instantiated PrefectConfig object describing connection to Prefect server.

_client Client

Prefect client connection created on instantiation.

_run_config_type type

Type used to compose run configuration.

Source code in lume_services/services/scheduling/backends/kubernetes.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class KubernetesBackend(ServerBackend):
    """Implementation of Backend used for interacting with Prefect deployed in
    K8 cluster.

    Attributes:
        config (PrefectConfig): Instantiated PrefectConfig object describing connection
            to Prefect server.
        _client (Client): Prefect client connection created on instantiation.
        _run_config_type (type): Type used to compose run configuration.

    """

    _run_config_type: type = KubernetesRunConfig

    @property
    def run_config_type(self):
        return self._run_config_type

KubernetesRunConfig

Bases: RunConfig

Pydantic representation of args to: https://docs.prefect.io/api/latest/run_configs.html#kubernetesrun https://kubernetes.io/docs/concepts/configuration/overview/#container-images

Attributes:

Name Type Description
labels Optional[List[str]]

an list of labels to apply to this run config. Labels are string identifiers used by Prefect Agents for selecting valid flow runs when polling for work

env Optional[dict]

Additional environment variables to set on the job

image Optional[str]

The image to use. Can also be specified via job template.

job_template_path Optional[str]

Path to a job template to use. If a local path (no file scheme, or a file/local scheme), the job template will be loaded on initialization and stored on the KubernetesRun object as the job_template field. Otherwise the job template will be loaded at runtime on the agent. Supported runtime file schemes include (s3, gcs, and agent (for paths local to the runtime agent)).

job_template Optional[str]

An in-memory job template to use.

cpu_limit Union[float, str]

The CPU limit to use for the job

cpu_request Union[float, str]

The CPU request to use for the job

memory_limit Optional[str]

The memory limit to use for the job

memory_request Optional[str]

The memory request to use for the job

service_account_name Optional[str]

A service account name to use for this job. If present, overrides any service account configured on the agent or in the job template.

image_pull_secrets Optional[list]

A list of image pull secrets to use for this job. If present, overrides any image pull secrets configured on the agent or in the job template.

image_pull_policy Optional[str]

The imagePullPolicy to use for the job.

Source code in lume_services/services/scheduling/backends/kubernetes.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class KubernetesRunConfig(RunConfig):
    """Pydantic representation of args to:
    https://docs.prefect.io/api/latest/run_configs.html#kubernetesrun
    https://kubernetes.io/docs/concepts/configuration/overview/#container-images

    Attributes:
        labels (Optional[List[str]]): an list of labels to apply to this run
            config. Labels are string identifiers used by Prefect Agents for selecting
            valid flow runs when polling for work
        env (Optional[dict]): Additional environment variables to set on the job
        image (Optional[str]): The image to use. Can also be specified via job
            template.
        job_template_path (Optional[str]): Path to a job template to use. If a local
            path (no file scheme, or a file/local scheme), the job template will be
            loaded on initialization and stored on the KubernetesRun object as the
            job_template field. Otherwise the job template will be loaded at runtime
            on the agent. Supported runtime file schemes include (s3, gcs, and agent
            (for paths local to the runtime agent)).
        job_template (Optional[str]): An in-memory job template to use.
        cpu_limit (Union[float, str]): The CPU limit to use for the job
        cpu_request (Union[float, str]): The CPU request to use for the job
        memory_limit (Optional[str]): The memory limit to use for the job
        memory_request (Optional[str]): The memory request to use for the job
        service_account_name (Optional[str]): A service account name to use for this
            job. If present, overrides any service account configured on the agent or
            in the job template.
        image_pull_secrets (Optional[list]): A list of image pull secrets to use for
            this job. If present, overrides any image pull secrets configured on the
            agent or in the job template.
        image_pull_policy (Optional[str]): The imagePullPolicy to use for the job.

    """

    image: Optional[str]
    image_pull_secrets: Optional[List[str]]
    job_template: Optional[dict]
    job_template_path: Optional[str]
    service_account_name: Optional[str]
    image_pull_policy: Literal["Always", "IfNotPresent", "Never"] = "IfNotPresent"
    cpu_limit: Union[float, str] = 1.0
    cpu_request: Union[float, str] = 0.5
    memory_limit: Union[str, int] = None
    memory_request: Union[str, int] = None

    @validator("memory_limit", "memory_request")
    def validate_memory(cls, v):
        """Validate w.r.t. Kubernetes resource formats: int, fixed-point number using
        quantity suffixes: E, P, T, G, M, k or power-of-two equivalents: Ei, Pi,
        Ti, Gi, Mi, Ki

        """

        if isinstance(v, (int,)):
            return v

        elif isinstance(v, (str,)):

            acceptable = False

            # check substrings
            inclusions = [
                substring for substring in KUBERNETES_REQUEST_SUFFIXES if substring in v
            ]

            if len(inclusions):

                for inclusion in inclusions:

                    try:
                        stripped = v.replace(inclusion, "")
                        _ = int(stripped)
                        acceptable = True

                    except ValueError:
                        pass

            if not acceptable:
                logger.error("Kubernetes resource request invalid: %s", v)
                raise ValueError(f"Kubernetes resource request invalid: {v}")

        else:
            raise ValueError("Must provide string or int to request")

        return v

    def build(self) -> KubernetesRun:
        """Method for converting to Prefect RunConfig type KubernetesRun.

        Returns:
            KubernetesRun

        """
        # if job template and job template path missing, use packaged template
        if self.job_template is None and self.job_template_path is None:
            self.job_template = KUBERNETES_JOB_TEMPLATE

        return KubernetesRun(**self.dict(exclude_none=True))

build()

Method for converting to Prefect RunConfig type KubernetesRun.

Returns:

Type Description
KubernetesRun

KubernetesRun

Source code in lume_services/services/scheduling/backends/kubernetes.py
115
116
117
118
119
120
121
122
123
124
125
126
def build(self) -> KubernetesRun:
    """Method for converting to Prefect RunConfig type KubernetesRun.

    Returns:
        KubernetesRun

    """
    # if job template and job template path missing, use packaged template
    if self.job_template is None and self.job_template_path is None:
        self.job_template = KUBERNETES_JOB_TEMPLATE

    return KubernetesRun(**self.dict(exclude_none=True))

validate_memory(v)

Validate w.r.t. Kubernetes resource formats: int, fixed-point number using quantity suffixes: E, P, T, G, M, k or power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki

Source code in lume_services/services/scheduling/backends/kubernetes.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@validator("memory_limit", "memory_request")
def validate_memory(cls, v):
    """Validate w.r.t. Kubernetes resource formats: int, fixed-point number using
    quantity suffixes: E, P, T, G, M, k or power-of-two equivalents: Ei, Pi,
    Ti, Gi, Mi, Ki

    """

    if isinstance(v, (int,)):
        return v

    elif isinstance(v, (str,)):

        acceptable = False

        # check substrings
        inclusions = [
            substring for substring in KUBERNETES_REQUEST_SUFFIXES if substring in v
        ]

        if len(inclusions):

            for inclusion in inclusions:

                try:
                    stripped = v.replace(inclusion, "")
                    _ = int(stripped)
                    acceptable = True

                except ValueError:
                    pass

        if not acceptable:
            logger.error("Kubernetes resource request invalid: %s", v)
            raise ValueError(f"Kubernetes resource request invalid: {v}")

    else:
        raise ValueError("Must provide string or int to request")

    return v