Skip to content

Commit e217f58

Browse files
committed
misc
1 parent 0906233 commit e217f58

File tree

13 files changed

+122
-147
lines changed

13 files changed

+122
-147
lines changed

src/zenml/config/compiler.py

Lines changed: 11 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
from zenml.config.step_configurations import (
3636
InputSpec,
3737
Step,
38-
StepConfiguration,
3938
StepConfigurationUpdate,
4039
StepSpec,
4140
)
@@ -48,7 +47,6 @@
4847
if TYPE_CHECKING:
4948
from zenml.pipelines.pipeline_definition import Pipeline
5049
from zenml.stack import Stack, StackComponent
51-
from zenml.steps.base_step import BaseStep
5250
from zenml.steps.step_invocation import StepInvocation
5351

5452
from zenml.logger import get_logger
@@ -128,29 +126,17 @@ def compile(
128126
merge=False,
129127
)
130128

131-
if pipeline.is_dynamic:
132-
step_templates = {
133-
step.name: self._compile_config_template(
134-
step=step, stack=stack
135-
)
136-
for step in pipeline.depends_on
137-
}
138-
steps = {}
139-
else:
140-
step_templates = None
141-
steps = {
142-
invocation_id: self._compile_step_invocation(
143-
invocation=invocation,
144-
stack=stack,
145-
step_config=(run_configuration.steps or {}).get(
146-
invocation_id
147-
),
148-
pipeline_configuration=pipeline.configuration,
149-
)
150-
for invocation_id, invocation in self._get_sorted_invocations(
151-
pipeline=pipeline
152-
)
153-
}
129+
steps = {
130+
invocation_id: self._compile_step_invocation(
131+
invocation=invocation,
132+
stack=stack,
133+
step_config=(run_configuration.steps or {}).get(invocation_id),
134+
pipeline_configuration=pipeline.configuration,
135+
)
136+
for invocation_id, invocation in self._get_sorted_invocations(
137+
pipeline=pipeline
138+
)
139+
}
154140

155141
self._ensure_required_stack_components_exist(stack=stack, steps=steps)
156142

@@ -170,7 +156,6 @@ def compile(
170156
is_dynamic=pipeline.is_dynamic,
171157
pipeline_configuration=pipeline.configuration,
172158
step_configurations=steps,
173-
step_configuration_templates=step_templates,
174159
client_environment=get_run_environment_dict(),
175160
client_version=client_version,
176161
server_version=server_version,
@@ -536,48 +521,6 @@ def _compile_step_invocation(
536521
step_config_overrides=step_configuration_overrides,
537522
)
538523

539-
def _compile_config_template(
540-
self,
541-
step: "BaseStep",
542-
stack: "Stack",
543-
step_config: Optional["StepConfigurationUpdate"],
544-
) -> StepConfiguration:
545-
"""Compiles a ZenML step.
546-
547-
Args:
548-
invocation: The step invocation to compile.
549-
stack: The stack on which the pipeline will be run.
550-
step_config: Run configuration for the step.
551-
pipeline_configuration: Configuration for the pipeline.
552-
553-
Returns:
554-
The compiled step.
555-
"""
556-
if step_config:
557-
step._apply_configuration(step_config)
558-
559-
convert_component_shortcut_settings_keys(
560-
step.configuration.settings, stack=stack
561-
)
562-
step_secrets = secret_utils.resolve_and_verify_secrets(
563-
step.configuration.secrets
564-
)
565-
step_settings = self._filter_and_validate_settings(
566-
settings=step.configuration.settings,
567-
configuration_level=ConfigurationLevel.STEP,
568-
stack=stack,
569-
)
570-
step.configure(
571-
secrets=step_secrets,
572-
settings=step_settings,
573-
merge=False,
574-
)
575-
576-
# TODO: apply pipeline config
577-
return StepConfiguration.model_validate(
578-
step.configuration.model_dump()
579-
)
580-
581524
def _get_sorted_invocations(
582525
self,
583526
pipeline: "Pipeline",

src/zenml/config/step_run_info.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818

1919
from zenml.config.frozen_base_model import FrozenBaseModel
2020
from zenml.config.pipeline_configurations import PipelineConfiguration
21-
from zenml.config.step_configurations import StepConfiguration
21+
from zenml.config.step_configurations import StepConfiguration, StepSpec
22+
from zenml.logger import get_logger
23+
from zenml.models import PipelineSnapshotResponse
24+
25+
logger = get_logger(__name__)
2226

2327

2428
class StepRunInfo(FrozenBaseModel):
@@ -30,7 +34,9 @@ class StepRunInfo(FrozenBaseModel):
3034
pipeline_step_name: str
3135

3236
config: StepConfiguration
37+
spec: StepSpec
3338
pipeline: PipelineConfiguration
39+
snapshot: PipelineSnapshotResponse
3440

3541
force_write_logs: Callable[..., Any]
3642

@@ -46,15 +52,32 @@ def get_image(self, key: str) -> str:
4652
Returns:
4753
The image name or digest.
4854
"""
49-
from zenml.client import Client
50-
51-
run = Client().get_pipeline_run(self.run_id)
52-
if not run.build:
55+
if not self.snapshot.build:
5356
raise RuntimeError(
54-
f"Missing build for run {run.id}. This is probably because "
55-
"the build was manually deleted."
57+
f"Missing build for snapshot {self.snapshot.id}. This is "
58+
"probably because the build was manually deleted."
5659
)
5760

58-
return run.build.get_image(
59-
component_key=key, step=self.pipeline_step_name
60-
)
61+
if self.snapshot.is_dynamic:
62+
# TODO: better way for this
63+
for (
64+
invocation_id,
65+
compiled_step,
66+
) in self.snapshot.step_configurations.values():
67+
if (
68+
compiled_step.spec.source.import_path
69+
== self.spec.source.import_path
70+
):
71+
step_key = invocation_id
72+
break
73+
else:
74+
logger.warning(
75+
"Unable to find config template for step %s. Falling "
76+
"back to the pipeline image.",
77+
self.pipeline_step_name,
78+
)
79+
step_key = None
80+
else:
81+
step_key = self.pipeline_step_name
82+
83+
return self.snapshot.build.get_image(component_key=key, step=step_key)

src/zenml/models/v2/core/pipeline_run.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ class PipelineRunUpdate(BaseUpdate):
175175
max_length=STR_FIELD_MAX_LENGTH,
176176
)
177177
end_time: Optional[datetime] = None
178+
completed: Optional[bool] = Field(
179+
default=None,
180+
title="Whether the pipeline run is completed.",
181+
)
178182
orchestrator_run_id: Optional[str] = None
179183
# TODO: we should maybe have a different update model here, the upper
180184
# three attributes should only be for internal use

src/zenml/models/v2/core/pipeline_snapshot.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from zenml.config.pipeline_configurations import PipelineConfiguration
3232
from zenml.config.pipeline_run_configuration import PipelineRunConfiguration
3333
from zenml.config.pipeline_spec import PipelineSpec
34-
from zenml.config.step_configurations import Step, StepConfiguration
34+
from zenml.config.step_configurations import Step
3535
from zenml.constants import STR_FIELD_MAX_LENGTH, TEXT_FIELD_MAX_LENGTH
3636
from zenml.enums import ExecutionStatus, StackComponentType
3737
from zenml.models.v2.base.base import BaseUpdate, BaseZenModel
@@ -82,12 +82,6 @@ class PipelineSnapshotBase(BaseZenModel):
8282
step_configurations: Dict[str, Step] = Field(
8383
default={}, title="The step configurations for this snapshot."
8484
)
85-
step_configuration_templates: Optional[Dict[str, StepConfiguration]] = (
86-
Field(
87-
default=None,
88-
title="The step configuration templates of the snapshot.",
89-
)
90-
)
9185
client_environment: Dict[str, Any] = Field(
9286
default={}, title="The client environment for this snapshot."
9387
)

src/zenml/orchestrators/containerized_orchestrator.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -120,24 +120,6 @@ def get_docker_builds(
120120
builds.append(pipeline_build)
121121
included_pipeline_build = True
122122

123-
for name, step_config in snapshot.step_configuration_templates.items():
124-
step_settings = step_config.docker_settings
125-
126-
if step_settings != pipeline_settings:
127-
build = BuildConfiguration(
128-
key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
129-
settings=step_settings,
130-
step_name=name,
131-
)
132-
builds.append(build)
133-
elif not included_pipeline_build:
134-
pipeline_build = BuildConfiguration(
135-
key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
136-
settings=pipeline_settings,
137-
)
138-
builds.append(pipeline_build)
139-
included_pipeline_build = True
140-
141123
if not included_pipeline_build and self.should_build_pipeline_image(
142124
snapshot
143125
):

src/zenml/orchestrators/publish_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ def publish_successful_pipeline_run(
131131
run_update=PipelineRunUpdate(
132132
status=ExecutionStatus.COMPLETED,
133133
end_time=utc_now(),
134+
completed=True,
134135
),
135136
)
136137

@@ -151,6 +152,7 @@ def publish_failed_pipeline_run(
151152
run_update=PipelineRunUpdate(
152153
status=ExecutionStatus.FAILED,
153154
end_time=utc_now(),
155+
completed=True,
154156
),
155157
)
156158

src/zenml/orchestrators/step_launcher.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,12 +431,14 @@ def _run_step(
431431
"""
432432
step_run_info = StepRunInfo(
433433
config=self._step.config,
434+
spec=self._step.spec,
434435
pipeline=self._snapshot.pipeline_configuration,
435436
run_name=pipeline_run.name,
436437
pipeline_step_name=self._invocation_id,
437438
run_id=pipeline_run.id,
438439
step_run_id=step_run.id,
439440
force_write_logs=force_write_logs,
441+
snapshot=self._snapshot,
440442
)
441443

442444
output_artifact_uris = output_utils.prepare_output_artifact_uris(

src/zenml/pipelines/dynamic/context.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,3 @@ def __enter__(self) -> Self:
6969
"Calling a pipeline within a dynamic pipeline is not allowed."
7070
)
7171
return super().__enter__()
72-
73-
74-
def get_dynamic_pipeline_run_context() -> Optional[DynamicPipelineRunContext]:
75-
return DynamicPipelineRunContext.get()

src/zenml/pipelines/dynamic/pipeline_definition.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@
3030
from zenml import ExternalArtifact
3131
from zenml.client import Client
3232
from zenml.logger import get_logger
33-
from zenml.logging.step_logging import setup_pipeline_logging
3433
from zenml.models import ArtifactVersionResponse, PipelineRunResponse
3534
from zenml.pipelines.pipeline_definition import Pipeline
3635
from zenml.pipelines.run_utils import (
37-
create_placeholder_run,
3836
should_prevent_pipeline_execution,
3937
)
4038
from zenml.steps.step_invocation import StepInvocation
@@ -54,7 +52,15 @@ class DynamicPipeline(Pipeline):
5452

5553
@property
5654
def depends_on(self) -> List["BaseStep"]:
57-
return []
55+
# TODO: Even with this, it will not be possible to define all potential
56+
# docker builds:
57+
# If a step will be called multiple times, once without step operator
58+
# and once with, it might require multiple docker images. Even if this
59+
# list had two copies of the same step, how would we map at runtime
60+
# which one to use?
61+
# TODO: maybe this needs to be a dict, and steps can select which
62+
# "template" config (including the docker image) to use?
63+
return getattr(self, "_depends_on", [])
5864

5965
@property
6066
def is_dynamic(self) -> bool:
@@ -141,6 +147,18 @@ def pipeline_(param_name: str):
141147

142148
self._parameters = validated_args
143149
self._invocations = {}
150+
with self:
151+
for step in self.depends_on:
152+
self.add_step_invocation(
153+
step,
154+
input_artifacts={},
155+
external_artifacts={},
156+
model_artifacts_or_metadata={},
157+
client_lazy_loaders={},
158+
parameters={},
159+
default_parameters={},
160+
upstream_steps=set(),
161+
)
144162

145163
def add_dynamic_invocation(
146164
self,
@@ -179,28 +197,13 @@ def __call__(
179197
return
180198

181199
stack = Client().active_stack
182-
183200
if not stack.orchestrator.supports_dynamic_pipelines:
184201
raise RuntimeError(
185202
f"The {stack.orchestrator.__class__.__name__} does not support dynamic pipelines. "
186203
)
187204

188205
self.prepare(*args, **kwargs)
189-
snapshot = self._create_snapshot(**self._run_args)
190-
191-
with setup_pipeline_logging(
192-
source="client", snapshot=snapshot
193-
) as logs_request:
194-
run = create_placeholder_run(
195-
snapshot=snapshot,
196-
logs=logs_request,
197-
)
198-
stack.orchestrator.run(
199-
snapshot=snapshot,
200-
stack=stack,
201-
placeholder_run=run,
202-
)
203-
return Client().get_pipeline_run(run.id)
206+
return self._run()
204207

205208
def _call_entrypoint(self, *args: Any, **kwargs: Any) -> None:
206209
"""Calls the pipeline entrypoint function with the given arguments.
@@ -227,6 +230,9 @@ def _call_entrypoint(self, *args: Any, **kwargs: Any) -> None:
227230
"Check out the pydantic error above for more details."
228231
) from e
229232

233+
# Clear the invocations as they might still contain invocations from
234+
# the compilation phase.
235+
self._invocations = {}
230236
self.entrypoint(**validated_args)
231237

232238
def _compute_output_schema(self) -> Optional[Dict[str, Any]]:

0 commit comments

Comments
 (0)