Skip to content

Flows

FileMappedParameter

Bases: MappedParameter

FileMappedParameters describe files passed between different flows. Files are saved as json representations describing file type (and serialization) and filesystem information.

Attr

parent_flow_name (str): Parent flow holding origin of mapped parameter. parent_task_name (str): Task whose result is mapped to the parameter. map_type (Literal["file", "db", "raw"] = "file"): The "file" map type describes the

Source code in lume_services/flows/flow.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class FileMappedParameter(MappedParameter):
    """FileMappedParameters describe files passed between different flows. Files are
    saved as json representations describing file type (and serialization) and
    filesystem information.

    Attr:
        parent_flow_name (str): Parent flow holding origin of mapped parameter.
        parent_task_name (str): Task whose result is mapped to the parameter.
        map_type (Literal["file", "db", "raw"] = "file"): The "file" map type describes
            the

    """

    map_type: str = Field("file", const=True)

Flow

Bases: BaseModel

Interface to a workflow object.

Attributes:

Name Type Description
name str

Name of flow

flow_id Optional[str]

ID of flow as registered with Prefect. If running locally, this will be null.

project_name Optional[str]

Name of Prefect project with which the flow is registered. If running locally this will be null.

parameters Optional[Dict[str, Parameter]]

Dictionary of Prefect parameters associated with the flow.

mapped_parameters Optional[Dict[str, MappedParameter]]

Parameters to be collected from results of other flows.

task_slugs Optional[Dict[str, str]]

Slug of tasks associated with the Prefect flow.

labels List[str] = ["lume-services"]

List of labels to assign to flow when registering with Prefect backend. This label is used to assign agents that will manage deployment.

image str

Image inside which to run flow if deploying to remote backend.

Source code in lume_services/flows/flow.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
class Flow(BaseModel):
    """Interface to a workflow object.

    Attributes:
        name: Name of flow
        flow_id (Optional[str]): ID of flow as registered with Prefect. If running
            locally, this will be null.
        project_name (Optional[str]): Name of Prefect project with which the flow is
            registered. If running locally this will be null.
        parameters (Optional[Dict[str, Parameter]]): Dictionary of Prefect parameters
            associated with the flow.
        mapped_parameters (Optional[Dict[str, MappedParameter]]): Parameters to be
            collected from results of other flows.
        task_slugs (Optional[Dict[str, str]]): Slug of tasks associated with the
            Prefect flow.
        labels (List[str] = ["lume-services"]): List of labels to assign to flow when
            registering with Prefect backend. This label is used to assign agents that
            will manage deployment.
        image (str): Image inside which to run flow if deploying to remote backend.

    """

    name: str
    flow_id: Optional[str]
    project_name: Optional[str]
    prefect_flow: Optional[PrefectFlow]
    parameters: Optional[Dict[str, Parameter]]
    mapped_parameters: Optional[Dict[str, MappedParameter]]
    task_slugs: Optional[Dict[str, str]]
    labels: List[str] = ["lume-services"]
    image: str

    class Config:
        arbitrary_types_allowed = True
        validate_assignment = True

    @validator("mapped_parameters", pre=True)
    def validate_mapped_parameters(cls, v):

        if v is None:
            return v

        mapped_parameters = {}

        for param_name, param in v.items():
            # persist instantiated params
            if isinstance(param, (MappedParameter,)):
                mapped_parameters[param_name] = param

            elif isinstance(param, (dict,)):
                # default raw
                if not param.get("map_type"):
                    mapped_parameters[param_name] = RawMappedParameter(**param)

                else:
                    mapped_param_type = _get_mapped_parameter_type(param["map_type"])
                    mapped_parameters[param_name] = mapped_param_type(**param)

            else:
                raise ValueError(
                    "Mapped parameters must be passed as instantiated \
                    MappedParameters or dictionary"
                )

        return mapped_parameters

    @inject
    def load_flow(
        self,
        scheduling_service: SchedulingService = Provide[Context.scheduling_service],
    ) -> None:
        """Loads Prefect flow artifact from the backend.

        Args:
            scheduling_service (SchedulingService): Scheduling service. If not
                provided, uses injected service.
        """
        flow_dict = scheduling_service.load_flow(self.name, self.project_name)

        flow = flow_dict["flow"]

        # assign attributes
        self.prefect_flow = flow
        self.task_slugs = {task.name: task.slug for task in flow.get_tasks()}
        self.parameters = {parameter.name: parameter for parameter in flow.parameters()}
        self.flow_id = flow_dict["flow_id"]

    @inject
    def register(
        self,
        scheduling_service: SchedulingService = Provide[Context.scheduling_service],
    ) -> str:
        """Register flow with SchedulingService backend.

        Args:
            scheduling_service (SchedulingService): Scheduling service. If not
                provided, uses injected service.

        Returns:
            flow_id (str): ID of registered flow.

        """

        if self.prefect_flow is None:
            # attempt loading
            self.load_flow()

        self.flow_id = scheduling_service.register_flow(
            self.prefect_flow, self.project_name, labels=self.labels, image=self.image
        )

        self.parameters = {
            parameter.name: parameter for parameter in self.prefect_flow.parameters()
        }
        self.task_slugs = {
            task.name: task.slug for task in self.prefect_flow.get_tasks()
        }

        return self.flow_id

    def run(
        self,
        parameters,
        scheduling_service: SchedulingService = Provide[Context.scheduling_service],
        **kwargs
    ):
        """Run the flow.

        Args:
            kwargs: Arguments passed to run config construction

        """
        if isinstance(scheduling_service.backend, (LocalBackend,)):
            if self.prefect_flow is None:
                self.load_flow()

            scheduling_service.run(
                parameters=parameters, flow=self.prefect_flow, **kwargs
            )

        elif isinstance(scheduling_service.backend, (ServerBackend,)):
            scheduling_service.run(
                parameters=parameters, flow_id=self.flow_id, image=self.image, **kwargs
            )

    def run_and_return(
        self,
        parameters,
        task_name: Optional[str],
        scheduling_service: SchedulingService = Provide[Context.scheduling_service],
        **kwargs
    ):
        """Run flow and return result. Result will reference either passed task name or
        the result of all tasks.

        Args:
            kwargs: Arguments passed to run config construction


        """
        if isinstance(scheduling_service.backend, (LocalBackend,)):
            if self.prefect_flow is None:
                self.load_flow()

            return scheduling_service.run_and_return(
                parameters=parameters,
                flow=self.prefect_flow,
                task_name=task_name,
                image=self.image,
                **kwargs
            )

        elif isinstance(scheduling_service.backend, (ServerBackend,)):
            return scheduling_service.run_and_return(
                parameters=parameters,
                flow_id=self.flow_id,
                task_name=task_name,
                image=self.image,
                **kwargs
            )

load_flow(scheduling_service=Provide[Context.scheduling_service])

Loads Prefect flow artifact from the backend.

Parameters:

Name Type Description Default
scheduling_service SchedulingService

Scheduling service. If not provided, uses injected service.

Provide[scheduling_service]
Source code in lume_services/flows/flow.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@inject
def load_flow(
    self,
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
) -> None:
    """Loads Prefect flow artifact from the backend.

    Args:
        scheduling_service (SchedulingService): Scheduling service. If not
            provided, uses injected service.
    """
    flow_dict = scheduling_service.load_flow(self.name, self.project_name)

    flow = flow_dict["flow"]

    # assign attributes
    self.prefect_flow = flow
    self.task_slugs = {task.name: task.slug for task in flow.get_tasks()}
    self.parameters = {parameter.name: parameter for parameter in flow.parameters()}
    self.flow_id = flow_dict["flow_id"]

register(scheduling_service=Provide[Context.scheduling_service])

Register flow with SchedulingService backend.

Parameters:

Name Type Description Default
scheduling_service SchedulingService

Scheduling service. If not provided, uses injected service.

Provide[scheduling_service]

Returns:

Name Type Description
flow_id str

ID of registered flow.

Source code in lume_services/flows/flow.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@inject
def register(
    self,
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
) -> str:
    """Register flow with SchedulingService backend.

    Args:
        scheduling_service (SchedulingService): Scheduling service. If not
            provided, uses injected service.

    Returns:
        flow_id (str): ID of registered flow.

    """

    if self.prefect_flow is None:
        # attempt loading
        self.load_flow()

    self.flow_id = scheduling_service.register_flow(
        self.prefect_flow, self.project_name, labels=self.labels, image=self.image
    )

    self.parameters = {
        parameter.name: parameter for parameter in self.prefect_flow.parameters()
    }
    self.task_slugs = {
        task.name: task.slug for task in self.prefect_flow.get_tasks()
    }

    return self.flow_id

run(parameters, scheduling_service=Provide[Context.scheduling_service], **kwargs)

Run the flow.

Parameters:

Name Type Description Default
kwargs

Arguments passed to run config construction

{}
Source code in lume_services/flows/flow.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def run(
    self,
    parameters,
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
    **kwargs
):
    """Run the flow.

    Args:
        kwargs: Arguments passed to run config construction

    """
    if isinstance(scheduling_service.backend, (LocalBackend,)):
        if self.prefect_flow is None:
            self.load_flow()

        scheduling_service.run(
            parameters=parameters, flow=self.prefect_flow, **kwargs
        )

    elif isinstance(scheduling_service.backend, (ServerBackend,)):
        scheduling_service.run(
            parameters=parameters, flow_id=self.flow_id, image=self.image, **kwargs
        )

run_and_return(parameters, task_name, scheduling_service=Provide[Context.scheduling_service], **kwargs)

Run flow and return result. Result will reference either passed task name or the result of all tasks.

Parameters:

Name Type Description Default
kwargs

Arguments passed to run config construction

{}
Source code in lume_services/flows/flow.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def run_and_return(
    self,
    parameters,
    task_name: Optional[str],
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
    **kwargs
):
    """Run flow and return result. Result will reference either passed task name or
    the result of all tasks.

    Args:
        kwargs: Arguments passed to run config construction


    """
    if isinstance(scheduling_service.backend, (LocalBackend,)):
        if self.prefect_flow is None:
            self.load_flow()

        return scheduling_service.run_and_return(
            parameters=parameters,
            flow=self.prefect_flow,
            task_name=task_name,
            image=self.image,
            **kwargs
        )

    elif isinstance(scheduling_service.backend, (ServerBackend,)):
        return scheduling_service.run_and_return(
            parameters=parameters,
            flow_id=self.flow_id,
            task_name=task_name,
            image=self.image,
            **kwargs
        )

MappedParameter

Bases: BaseModel

There are three types of mapped parameters: file, db, and raw.

file: File parameters are file outputs that will be loaded in downstream flows. Downstream loading must use the packaged load_file task in lume_services.tasks.file.

db: Database results ...

raw: Raw values are passed from task output to parameter input.

Attr

parent_flow_name (str): Parent flow holding origin of mapped parameter. parent_task_name (str): Task whose result is mapped to the parameter. map_type (Literal["file", "db", "raw"]): Type of mapping describing the parameters.

Source code in lume_services/flows/flow.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class MappedParameter(BaseModel):
    """There are three types of mapped parameters: file, db, and raw.

    file: File parameters are file outputs that will be loaded in downstream flows.
    Downstream loading must use the packaged `load_file` task in
    `lume_services.tasks.file`.

    db: Database results ...

    raw: Raw values are passed from task output to parameter input.

    Attr:
        parent_flow_name (str): Parent flow holding origin of mapped parameter.
        parent_task_name (str): Task whose result is mapped to the parameter.
        map_type (Literal["file", "db", "raw"]): Type of mapping describing the
            parameters.

    """

    parent_flow_name: str
    parent_task_name: str
    map_type: Literal["file", "db", "raw"] = "raw"

RawMappedParameter

Bases: MappedParameter

RawMappedParameters describe parameter mappings where the result of a task is used as the input to a parameter.

Attr

parent_flow_name (str): Parent flow holding origin of mapped parameter. parent_task_name (str): Task whose result is mapped to the parameter. map_type (Literal["file", "db", "raw"] = "raw"): The "raw" map type describes the one-to-one result to parameter map.

Source code in lume_services/flows/flow.py
39
40
41
42
43
44
45
46
47
48
49
50
51
class RawMappedParameter(MappedParameter):
    """RawMappedParameters describe parameter mappings where the result of a task is
    used as the input to a parameter.

    Attr:
        parent_flow_name (str): Parent flow holding origin of mapped parameter.
        parent_task_name (str): Task whose result is mapped to the parameter.
        map_type (Literal["file", "db", "raw"] = "raw"): The "raw" map type describes
            the one-to-one result to parameter map.

    """

    map_type: str = Field("raw", const=True)

FlowOfFlows

Bases: Flow

Source code in lume_services/flows/flow_of_flows.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
class FlowOfFlows(Flow):
    composing_flows: dict

    class Config:
        arbitrary_types_allowed = True

    @root_validator(pre=True)
    def validate(cls, values: dict):
        """Validate composing flow data against Prefect server."""
        flows = {}

        scheduling_service = None
        if "scheduling_service" in values:
            scheduling_service = values.pop("scheduling_service")

        # validate composing flow existence
        composing_flows = values.get("composing_flows")

        if isinstance(composing_flows, (dict,)):
            pass

        # iterate to create dict
        elif isinstance(composing_flows, (list,)):
            for flow in values["composing_flows"]:

                # compose flow objects
                flow_obj = Flow(
                    name=flow["name"],
                    project_name=flow["project_name"],
                    mapped_parameters=flow.get("mapped_parameters"),
                )

                # load Prefect parameters
                if scheduling_service is not None:
                    flow_obj.load_flow(scheduling_service=scheduling_service)
                else:
                    flow_obj.load_flow()

                flows[flow["name"]] = flow_obj

        # validate flow parameters
        for flow_name, flow in flows.items():
            if flow.mapped_parameters is not None:
                for parameter_name, parameter in flow.mapped_parameters.items():

                    # validate parameter is in flow spec
                    parameter_obj = flow.parameters.get(parameter_name)
                    if parameter_obj is None:
                        raise ParameterNotInFlowError(parameter_name, flow_name)

                    # validate parent flow is included in listed flows
                    parent_flow = flows.get(parameter.parent_flow_name)
                    if parent_flow is None:
                        raise ParentFlowNotInFlowsError(
                            parameter.parent_flow_name, list(flows.keys())
                        )

                    # validate task is in the parent flow
                    task = parent_flow.task_slugs.get(parameter.parent_task_name)

                    if task is None:
                        raise TaskNotInFlowError(
                            parameter.parent_flow_name, parameter.parent_task_name
                        )

        values["composing_flows"] = flows

        return values

    def compose(
        self,
        image_name: str,
        image_tag: str = "latest",
        local: bool = False,
        scheduling_service: SchedulingService = Provide[Context.scheduling_service],
    ) -> PrefectFlow:
        """Compose Prefect flow from FlowOfFlows object. Uses base image assigned to
        the FlowOfFlows Object and builds a new Docker image containing the composite
        flow.


        Args:
            image_name (str): Name of generated image.
            image_tag (str): Tag of generated image.
            local (bool=False): Whether to use local images for the base image.


        Returns:
            PrefectFlow

        """

        # compose flow of flows
        with PrefectFlow(
            self.name,
            storage=Docker(
                base_image=self.image,
                image_name=image_name,
                image_tag=image_tag,
                local_image=local,
            ),
        ) as composed_flow:

            flow_runs = {}
            flow_waits = {}
            params = {}

            for i, (flow_name, flow) in enumerate(self.composing_flows.items()):

                # begin by creating parameters for all flow parameters
                flow_params = {}
                for param_name, param in flow.parameters.items():

                    # update name and slug
                    param.name = f"{flow_name}-{param_name}"
                    param.slug = f"{flow_name}-{param_name}"
                    params[param.name] = param

                    # use original param name for flow config
                    flow_params[param_name] = param

                # set up entry task
                if i == 0:
                    flow_run = create_flow_run(
                        flow_id=flow.flow_id,
                        parameters=flow_params,
                        labels=flow.labels,
                    )

                # setup other tasks
                elif i > 0:

                    # create references to parameters
                    upstream_flows = set()
                    if flow.mapped_parameters is not None:

                        # update flow_params with mapping
                        for param_name, mapped_param in flow.mapped_parameters.items():
                            task_slug = self.composing_flows[
                                mapped_param.parent_flow_name
                            ].task_slugs[mapped_param.parent_task_name]

                            task_run_result = get_task_run_result(
                                flow_runs[mapped_param.parent_flow_name], task_slug
                            )

                            # raw results and file results use their values directly
                            if mapped_param.map_type in ["raw", "file"]:
                                flow.prefect_flow.replace(
                                    flow_params.pop(param_name), task_run_result
                                )

                            # handle database results
                            elif mapped_param.map_type == "db":
                                load_db_result = LoadDBResult()
                                db_result = load_db_result(
                                    task_run_result,
                                    attribute_index=mapped_param.attribute_index,
                                )
                                flow.prefect_flow.replace(
                                    flow_params.pop(param_name), db_result
                                )

                                # add db result parameters to the task and create edge
                                for param in load_db_result.parameters.values():
                                    flow.prefect_flow.add_task(param)
                                    flow.prefect_flow.add_edge(
                                        param, load_db_result, mapped=True
                                    )

                            else:
                                # should never reach if instantiating MappedParameter
                                mapped_param_types = get_args(
                                    MappedParameter.__fields__["map_type"].type_
                                )
                                raise ValueError(
                                    f"Task type {mapped_param.map_type} not in task. \
                                        Allowed types: {mapped_param_types}."
                                )

                            # add flow to upstream
                            upstream_flows.add(mapped_param.parent_flow_name)

                        # add creation of flow run to flow
                        flow_run = create_flow_run(
                            flow_id=flow.flow_id,
                            parameters=flow_params,
                            labels=flow.labels,
                        )

                    # configure upstreams if any
                    for upstream in upstream_flows:
                        flow_run.set_upstream(flow_waits[upstream])

                flow_wait = wait_for_flow_run(flow_run, raise_final_state=True)
                flow_runs[flow_name] = flow_run
                flow_waits[flow_name] = flow_wait

        # validate flow of flows
        composed_flow.validate()

        # assign to obj
        self.prefect_flow = composed_flow
        self.image = f"{image_name}:{image_tag}"

        return composed_flow

    def compose_and_register(self) -> str:
        """Compose flow and register with project.

        Returns:
            str: Registered flow id

        """

        flow = self.compose()
        self.prefect_flow = flow
        return self.register(self.project_name)

    @classmethod
    @inject
    def from_yaml(
        cls,
        yaml_obj,
        scheduling_service: SchedulingService = Provide[Context.scheduling_service],
    ):
        if os.path.exists(yaml_obj):
            flow_of_flow_config = yaml.safe_load(open(yaml_obj))

        else:
            flow_of_flow_config = yaml_obj

        # now validate
        return cls(**flow_of_flow_config, scheduling_service=scheduling_service)

    def _compose_local(self):
        """

        Note:
            Prefect 1.0 does not allow subflow run without previous registration with
            the server. This function is a workaround, but will be massively simplified
            once moved to Prefect 2.0, which does support direct subflow run.
        """
        ...

compose(image_name, image_tag='latest', local=False, scheduling_service=Provide[Context.scheduling_service])

Compose Prefect flow from FlowOfFlows object. Uses base image assigned to the FlowOfFlows Object and builds a new Docker image containing the composite flow.

Parameters:

Name Type Description Default
image_name str

Name of generated image.

required
image_tag str

Tag of generated image.

'latest'
local bool=False

Whether to use local images for the base image.

False

Returns:

Type Description
Flow

PrefectFlow

Source code in lume_services/flows/flow_of_flows.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def compose(
    self,
    image_name: str,
    image_tag: str = "latest",
    local: bool = False,
    scheduling_service: SchedulingService = Provide[Context.scheduling_service],
) -> PrefectFlow:
    """Compose Prefect flow from FlowOfFlows object. Uses base image assigned to
    the FlowOfFlows Object and builds a new Docker image containing the composite
    flow.


    Args:
        image_name (str): Name of generated image.
        image_tag (str): Tag of generated image.
        local (bool=False): Whether to use local images for the base image.


    Returns:
        PrefectFlow

    """

    # compose flow of flows
    with PrefectFlow(
        self.name,
        storage=Docker(
            base_image=self.image,
            image_name=image_name,
            image_tag=image_tag,
            local_image=local,
        ),
    ) as composed_flow:

        flow_runs = {}
        flow_waits = {}
        params = {}

        for i, (flow_name, flow) in enumerate(self.composing_flows.items()):

            # begin by creating parameters for all flow parameters
            flow_params = {}
            for param_name, param in flow.parameters.items():

                # update name and slug
                param.name = f"{flow_name}-{param_name}"
                param.slug = f"{flow_name}-{param_name}"
                params[param.name] = param

                # use original param name for flow config
                flow_params[param_name] = param

            # set up entry task
            if i == 0:
                flow_run = create_flow_run(
                    flow_id=flow.flow_id,
                    parameters=flow_params,
                    labels=flow.labels,
                )

            # setup other tasks
            elif i > 0:

                # create references to parameters
                upstream_flows = set()
                if flow.mapped_parameters is not None:

                    # update flow_params with mapping
                    for param_name, mapped_param in flow.mapped_parameters.items():
                        task_slug = self.composing_flows[
                            mapped_param.parent_flow_name
                        ].task_slugs[mapped_param.parent_task_name]

                        task_run_result = get_task_run_result(
                            flow_runs[mapped_param.parent_flow_name], task_slug
                        )

                        # raw results and file results use their values directly
                        if mapped_param.map_type in ["raw", "file"]:
                            flow.prefect_flow.replace(
                                flow_params.pop(param_name), task_run_result
                            )

                        # handle database results
                        elif mapped_param.map_type == "db":
                            load_db_result = LoadDBResult()
                            db_result = load_db_result(
                                task_run_result,
                                attribute_index=mapped_param.attribute_index,
                            )
                            flow.prefect_flow.replace(
                                flow_params.pop(param_name), db_result
                            )

                            # add db result parameters to the task and create edge
                            for param in load_db_result.parameters.values():
                                flow.prefect_flow.add_task(param)
                                flow.prefect_flow.add_edge(
                                    param, load_db_result, mapped=True
                                )

                        else:
                            # should never reach if instantiating MappedParameter
                            mapped_param_types = get_args(
                                MappedParameter.__fields__["map_type"].type_
                            )
                            raise ValueError(
                                f"Task type {mapped_param.map_type} not in task. \
                                    Allowed types: {mapped_param_types}."
                            )

                        # add flow to upstream
                        upstream_flows.add(mapped_param.parent_flow_name)

                    # add creation of flow run to flow
                    flow_run = create_flow_run(
                        flow_id=flow.flow_id,
                        parameters=flow_params,
                        labels=flow.labels,
                    )

                # configure upstreams if any
                for upstream in upstream_flows:
                    flow_run.set_upstream(flow_waits[upstream])

            flow_wait = wait_for_flow_run(flow_run, raise_final_state=True)
            flow_runs[flow_name] = flow_run
            flow_waits[flow_name] = flow_wait

    # validate flow of flows
    composed_flow.validate()

    # assign to obj
    self.prefect_flow = composed_flow
    self.image = f"{image_name}:{image_tag}"

    return composed_flow

compose_and_register()

Compose flow and register with project.

Returns:

Name Type Description
str str

Registered flow id

Source code in lume_services/flows/flow_of_flows.py
234
235
236
237
238
239
240
241
242
243
244
def compose_and_register(self) -> str:
    """Compose flow and register with project.

    Returns:
        str: Registered flow id

    """

    flow = self.compose()
    self.prefect_flow = flow
    return self.register(self.project_name)

validate(values)

Validate composing flow data against Prefect server.

Source code in lume_services/flows/flow_of_flows.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@root_validator(pre=True)
def validate(cls, values: dict):
    """Validate composing flow data against Prefect server."""
    flows = {}

    scheduling_service = None
    if "scheduling_service" in values:
        scheduling_service = values.pop("scheduling_service")

    # validate composing flow existence
    composing_flows = values.get("composing_flows")

    if isinstance(composing_flows, (dict,)):
        pass

    # iterate to create dict
    elif isinstance(composing_flows, (list,)):
        for flow in values["composing_flows"]:

            # compose flow objects
            flow_obj = Flow(
                name=flow["name"],
                project_name=flow["project_name"],
                mapped_parameters=flow.get("mapped_parameters"),
            )

            # load Prefect parameters
            if scheduling_service is not None:
                flow_obj.load_flow(scheduling_service=scheduling_service)
            else:
                flow_obj.load_flow()

            flows[flow["name"]] = flow_obj

    # validate flow parameters
    for flow_name, flow in flows.items():
        if flow.mapped_parameters is not None:
            for parameter_name, parameter in flow.mapped_parameters.items():

                # validate parameter is in flow spec
                parameter_obj = flow.parameters.get(parameter_name)
                if parameter_obj is None:
                    raise ParameterNotInFlowError(parameter_name, flow_name)

                # validate parent flow is included in listed flows
                parent_flow = flows.get(parameter.parent_flow_name)
                if parent_flow is None:
                    raise ParentFlowNotInFlowsError(
                        parameter.parent_flow_name, list(flows.keys())
                    )

                # validate task is in the parent flow
                task = parent_flow.task_slugs.get(parameter.parent_task_name)

                if task is None:
                    raise TaskNotInFlowError(
                        parameter.parent_flow_name, parameter.parent_task_name
                    )

    values["composing_flows"] = flows

    return values