Skip to content

Commit f63e3e4

Browse files
committed
Big refactoring
1 parent dc9fa0f commit f63e3e4

File tree

17 files changed

+312
-162
lines changed

17 files changed

+312
-162
lines changed

src/zenml/entrypoints/entrypoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
ENTRYPOINT_CONFIG_SOURCE_OPTION,
2222
BaseEntrypointConfiguration,
2323
)
24-
from zenml.pipelines.run_utils import prevent_pipeline_execution
24+
from zenml.execution.pipeline.utils import prevent_pipeline_execution
2525
from zenml.utils import source_utils
2626

2727

src/zenml/execution/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright (c) ZenML GmbH 2025. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at:
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
# or implied. See the License for the specific language governing
13+
# permissions and limitations under the License.
14+
"""Step and pipeline execution."""
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright (c) ZenML GmbH 2025. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at:
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
# or implied. See the License for the specific language governing
13+
# permissions and limitations under the License.
14+
"""Pipeline execution."""
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright (c) ZenML GmbH 2025. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at:
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
# or implied. See the License for the specific language governing
13+
# permissions and limitations under the License.
14+
"""Dynamic pipeline execution."""
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# Copyright (c) ZenML GmbH 2025. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at:
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
# or implied. See the License for the specific language governing
13+
# permissions and limitations under the License.
14+
"""Dynamic pipeline execution outputs."""
15+
16+
from concurrent.futures import Future
17+
from typing import (
18+
TYPE_CHECKING,
19+
Any,
20+
Tuple,
21+
Union,
22+
)
23+
24+
from zenml.logger import get_logger
25+
from zenml.models import (
26+
ArtifactVersionResponse,
27+
)
28+
29+
30+
logger = get_logger(__name__)
31+
32+
33+
class OutputArtifact(ArtifactVersionResponse):
34+
"""Dynamic step run output artifact."""
35+
36+
output_name: str
37+
step_name: str
38+
39+
40+
StepRunOutputs = Union[
41+
None, OutputArtifact, Tuple[OutputArtifact, ...]
42+
]
43+
44+
45+
# TODO: maybe one future per artifact? But for a step that doesn't return anything, the user wouldn't have a future to wait for.
46+
# Or that step returns a future that returns None? Would be similar to a python function.
47+
class StepRunOutputsFuture:
48+
"""Future for a step run output."""
49+
50+
def __init__(
51+
self, wrapped: Future[StepRunOutputs], invocation_id: str
52+
) -> None:
53+
"""Initialize the future.
54+
55+
Args:
56+
wrapped: The wrapped future object.
57+
invocation_id: The invocation ID of the step run.
58+
"""
59+
self._wrapped = wrapped
60+
self._invocation_id = invocation_id
61+
62+
def wait(self) -> None:
63+
"""Wait for the future to complete."""
64+
self._wrapped.result()
65+
66+
def result(self) -> StepRunOutputs:
67+
"""Get the step run output artifacts.
68+
69+
Returns:
70+
The step run output artifacts.
71+
"""
72+
return self._wrapped.result()
73+
74+
def load(self) -> Any:
75+
"""Get the step run output artifact data.
76+
77+
Raises:
78+
ValueError: If the step run output is invalid.
79+
80+
Returns:
81+
The step run output artifact data.
82+
"""
83+
result = self.result()
84+
85+
if result is None:
86+
return None
87+
elif isinstance(result, ArtifactVersionResponse):
88+
return result.load()
89+
elif isinstance(result, tuple):
90+
return tuple(item.load() for item in result)
91+
else:
92+
raise ValueError(f"Invalid step run output: {result}")

src/zenml/pipelines/dynamic/run_context.py renamed to src/zenml/execution/pipeline/dynamic/run_context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
from zenml.utils import context_utils
2020

2121
if TYPE_CHECKING:
22+
from zenml.execution.pipeline.dynamic.runner import DynamicPipelineRunner
2223
from zenml.models import PipelineRunResponse, PipelineSnapshotResponse
2324
from zenml.pipelines.dynamic.pipeline_definition import DynamicPipeline
24-
from zenml.pipelines.dynamic.runner import DynamicPipelineRunner
2525

2626

2727
class DynamicPipelineRunContext(context_utils.BaseContext):

src/zenml/pipelines/dynamic/runner.py renamed to src/zenml/execution/pipeline/dynamic/runner.py

Lines changed: 14 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import contextvars
1717
import inspect
18-
from concurrent.futures import Future, ThreadPoolExecutor
18+
from concurrent.futures import ThreadPoolExecutor
1919
from typing import (
2020
TYPE_CHECKING,
2121
Any,
@@ -34,6 +34,14 @@
3434
from zenml.client import Client
3535
from zenml.config.compiler import Compiler
3636
from zenml.config.step_configurations import Step
37+
from zenml.execution.pipeline.dynamic.outputs import (
38+
OutputArtifact,
39+
StepRunOutputs,
40+
StepRunOutputsFuture,
41+
)
42+
from zenml.execution.pipeline.dynamic.run_context import (
43+
DynamicPipelineRunContext,
44+
)
3745
from zenml.execution.step.utils import launch_step
3846
from zenml.logger import get_logger
3947
from zenml.logging.step_logging import setup_pipeline_logging
@@ -47,7 +55,6 @@
4755
publish_successful_pipeline_run,
4856
)
4957
from zenml.pipelines.dynamic.pipeline_definition import DynamicPipeline
50-
from zenml.pipelines.dynamic.run_context import DynamicPipelineRunContext
5158
from zenml.pipelines.run_utils import create_placeholder_run
5259
from zenml.stack import Stack
5360
from zenml.steps.entrypoint_function_utils import StepArtifact
@@ -63,68 +70,6 @@
6370
logger = get_logger(__name__)
6471

6572

66-
class DynamicStepRunOutput(ArtifactVersionResponse):
67-
"""Dynamic step run output artifact."""
68-
69-
output_name: str
70-
step_name: str
71-
72-
73-
StepRunOutputs = Union[
74-
None, DynamicStepRunOutput, Tuple[DynamicStepRunOutput, ...]
75-
]
76-
77-
78-
# TODO: maybe one future per artifact? But for a step that doesn't return anything, the user wouldn't have a future to wait for.
79-
# Or that step returns a future that returns None? Would be similar to a python function.
80-
class StepRunOutputsFuture:
81-
"""Future for a step run output."""
82-
83-
def __init__(
84-
self, wrapped: Future[StepRunOutputs], invocation_id: str
85-
) -> None:
86-
"""Initialize the future.
87-
88-
Args:
89-
wrapped: The wrapped future object.
90-
invocation_id: The invocation ID of the step run.
91-
"""
92-
self._wrapped = wrapped
93-
self._invocation_id = invocation_id
94-
95-
def wait(self) -> None:
96-
"""Wait for the future to complete."""
97-
self._wrapped.result()
98-
99-
def result(self) -> StepRunOutputs:
100-
"""Get the step run output artifacts.
101-
102-
Returns:
103-
The step run output artifacts.
104-
"""
105-
return self._wrapped.result()
106-
107-
def load(self) -> Any:
108-
"""Get the step run output artifact data.
109-
110-
Raises:
111-
ValueError: If the step run output is invalid.
112-
113-
Returns:
114-
The step run output artifact data.
115-
"""
116-
result = self.result()
117-
118-
if result is None:
119-
return None
120-
elif isinstance(result, ArtifactVersionResponse):
121-
return result.load()
122-
elif isinstance(result, tuple):
123-
return tuple(item.load() for item in result)
124-
else:
125-
raise ValueError(f"Invalid step run output: {result}")
126-
127-
12873
class DynamicPipelineRunner:
12974
"""Dynamic pipeline runner."""
13075

@@ -352,14 +297,14 @@ def _await_and_validate_input(input: Any) -> Any:
352297
if (
353298
input
354299
and isinstance(input, tuple)
355-
and isinstance(input[0], DynamicStepRunOutput)
300+
and isinstance(input[0], OutputArtifact)
356301
):
357302
raise ValueError(
358303
"Passing multiple step run outputs to another step is not "
359304
"allowed."
360305
)
361306

362-
if isinstance(input, DynamicStepRunOutput):
307+
if isinstance(input, OutputArtifact):
363308
upstream_steps.add(input.step_name)
364309

365310
return input
@@ -383,7 +328,7 @@ def _await_and_validate_input(input: Any) -> Any:
383328
input_artifacts = {}
384329
external_artifacts = {}
385330
for name, value in validated_args.items():
386-
if isinstance(value, DynamicStepRunOutput):
331+
if isinstance(value, OutputArtifact):
387332
input_artifacts[name] = StepArtifact(
388333
invocation_id=value.step_name,
389334
output_name=value.output_name,
@@ -434,8 +379,8 @@ def _load_step_run_outputs(step_run_id: UUID) -> StepRunOutputs:
434379

435380
def _convert_output_artifact(
436381
output_name: str, artifact: ArtifactVersionResponse
437-
) -> DynamicStepRunOutput:
438-
return DynamicStepRunOutput(
382+
) -> OutputArtifact:
383+
return OutputArtifact(
439384
output_name=output_name,
440385
step_name=step_run.name,
441386
**artifact.model_dump(),

0 commit comments

Comments
 (0)