Skip to content

Commit 08b3069

Browse files
committed
misc
1 parent 0906233 commit 08b3069

File tree

9 files changed

+73
-129
lines changed

9 files changed

+73
-129
lines changed

src/zenml/config/compiler.py

Lines changed: 11 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
from zenml.config.step_configurations import (
3636
InputSpec,
3737
Step,
38-
StepConfiguration,
3938
StepConfigurationUpdate,
4039
StepSpec,
4140
)
@@ -48,7 +47,6 @@
4847
if TYPE_CHECKING:
4948
from zenml.pipelines.pipeline_definition import Pipeline
5049
from zenml.stack import Stack, StackComponent
51-
from zenml.steps.base_step import BaseStep
5250
from zenml.steps.step_invocation import StepInvocation
5351

5452
from zenml.logger import get_logger
@@ -128,29 +126,17 @@ def compile(
128126
merge=False,
129127
)
130128

131-
if pipeline.is_dynamic:
132-
step_templates = {
133-
step.name: self._compile_config_template(
134-
step=step, stack=stack
135-
)
136-
for step in pipeline.depends_on
137-
}
138-
steps = {}
139-
else:
140-
step_templates = None
141-
steps = {
142-
invocation_id: self._compile_step_invocation(
143-
invocation=invocation,
144-
stack=stack,
145-
step_config=(run_configuration.steps or {}).get(
146-
invocation_id
147-
),
148-
pipeline_configuration=pipeline.configuration,
149-
)
150-
for invocation_id, invocation in self._get_sorted_invocations(
151-
pipeline=pipeline
152-
)
153-
}
129+
steps = {
130+
invocation_id: self._compile_step_invocation(
131+
invocation=invocation,
132+
stack=stack,
133+
step_config=(run_configuration.steps or {}).get(invocation_id),
134+
pipeline_configuration=pipeline.configuration,
135+
)
136+
for invocation_id, invocation in self._get_sorted_invocations(
137+
pipeline=pipeline
138+
)
139+
}
154140

155141
self._ensure_required_stack_components_exist(stack=stack, steps=steps)
156142

@@ -170,7 +156,6 @@ def compile(
170156
is_dynamic=pipeline.is_dynamic,
171157
pipeline_configuration=pipeline.configuration,
172158
step_configurations=steps,
173-
step_configuration_templates=step_templates,
174159
client_environment=get_run_environment_dict(),
175160
client_version=client_version,
176161
server_version=server_version,
@@ -536,48 +521,6 @@ def _compile_step_invocation(
536521
step_config_overrides=step_configuration_overrides,
537522
)
538523

539-
def _compile_config_template(
540-
self,
541-
step: "BaseStep",
542-
stack: "Stack",
543-
step_config: Optional["StepConfigurationUpdate"],
544-
) -> StepConfiguration:
545-
"""Compiles a ZenML step.
546-
547-
Args:
548-
invocation: The step invocation to compile.
549-
stack: The stack on which the pipeline will be run.
550-
step_config: Run configuration for the step.
551-
pipeline_configuration: Configuration for the pipeline.
552-
553-
Returns:
554-
The compiled step.
555-
"""
556-
if step_config:
557-
step._apply_configuration(step_config)
558-
559-
convert_component_shortcut_settings_keys(
560-
step.configuration.settings, stack=stack
561-
)
562-
step_secrets = secret_utils.resolve_and_verify_secrets(
563-
step.configuration.secrets
564-
)
565-
step_settings = self._filter_and_validate_settings(
566-
settings=step.configuration.settings,
567-
configuration_level=ConfigurationLevel.STEP,
568-
stack=stack,
569-
)
570-
step.configure(
571-
secrets=step_secrets,
572-
settings=step_settings,
573-
merge=False,
574-
)
575-
576-
# TODO: apply pipeline config
577-
return StepConfiguration.model_validate(
578-
step.configuration.model_dump()
579-
)
580-
581524
def _get_sorted_invocations(
582525
self,
583526
pipeline: "Pipeline",

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -857,6 +857,7 @@ def run_dynamic_out_of_process_step(
857857
settings = cast(
858858
KubernetesOrchestratorSettings, self.get_settings(step_run_info)
859859
)
860+
# TODO: Way to fetch the correct docker image here.
860861
image_name = step_run_info.get_image(key=ORCHESTRATOR_DOCKER_IMAGE_KEY)
861862
command = StepOperatorEntrypointConfiguration.get_entrypoint_command()
862863
args = StepOperatorEntrypointConfiguration.get_entrypoint_arguments(

src/zenml/models/v2/core/pipeline_snapshot.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from zenml.config.pipeline_configurations import PipelineConfiguration
3232
from zenml.config.pipeline_run_configuration import PipelineRunConfiguration
3333
from zenml.config.pipeline_spec import PipelineSpec
34-
from zenml.config.step_configurations import Step, StepConfiguration
34+
from zenml.config.step_configurations import Step
3535
from zenml.constants import STR_FIELD_MAX_LENGTH, TEXT_FIELD_MAX_LENGTH
3636
from zenml.enums import ExecutionStatus, StackComponentType
3737
from zenml.models.v2.base.base import BaseUpdate, BaseZenModel
@@ -82,12 +82,6 @@ class PipelineSnapshotBase(BaseZenModel):
8282
step_configurations: Dict[str, Step] = Field(
8383
default={}, title="The step configurations for this snapshot."
8484
)
85-
step_configuration_templates: Optional[Dict[str, StepConfiguration]] = (
86-
Field(
87-
default=None,
88-
title="The step configuration templates of the snapshot.",
89-
)
90-
)
9185
client_environment: Dict[str, Any] = Field(
9286
default={}, title="The client environment for this snapshot."
9387
)

src/zenml/orchestrators/containerized_orchestrator.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -120,24 +120,6 @@ def get_docker_builds(
120120
builds.append(pipeline_build)
121121
included_pipeline_build = True
122122

123-
for name, step_config in snapshot.step_configuration_templates.items():
124-
step_settings = step_config.docker_settings
125-
126-
if step_settings != pipeline_settings:
127-
build = BuildConfiguration(
128-
key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
129-
settings=step_settings,
130-
step_name=name,
131-
)
132-
builds.append(build)
133-
elif not included_pipeline_build:
134-
pipeline_build = BuildConfiguration(
135-
key=ORCHESTRATOR_DOCKER_IMAGE_KEY,
136-
settings=pipeline_settings,
137-
)
138-
builds.append(pipeline_build)
139-
included_pipeline_build = True
140-
141123
if not included_pipeline_build and self.should_build_pipeline_image(
142124
snapshot
143125
):

src/zenml/pipelines/dynamic/context.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,3 @@ def __enter__(self) -> Self:
6969
"Calling a pipeline within a dynamic pipeline is not allowed."
7070
)
7171
return super().__enter__()
72-
73-
74-
def get_dynamic_pipeline_run_context() -> Optional[DynamicPipelineRunContext]:
75-
return DynamicPipelineRunContext.get()

src/zenml/pipelines/dynamic/pipeline_definition.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@
3030
from zenml import ExternalArtifact
3131
from zenml.client import Client
3232
from zenml.logger import get_logger
33-
from zenml.logging.step_logging import setup_pipeline_logging
3433
from zenml.models import ArtifactVersionResponse, PipelineRunResponse
3534
from zenml.pipelines.pipeline_definition import Pipeline
3635
from zenml.pipelines.run_utils import (
37-
create_placeholder_run,
3836
should_prevent_pipeline_execution,
3937
)
4038
from zenml.steps.step_invocation import StepInvocation
@@ -54,7 +52,15 @@ class DynamicPipeline(Pipeline):
5452

5553
@property
5654
def depends_on(self) -> List["BaseStep"]:
57-
return []
55+
# TODO: Even with this, it will not be possible to define all potential
56+
# docker builds:
57+
# If a step will be called multiple times, once without step operator
58+
# and once with, it might require multiple docker images. Even if this
59+
# list had two copies of the same step, how would we map at runtime
60+
# which one to use?
61+
# TODO: maybe this needs to be a dict, and steps can select which
62+
# "template" config (including the docker image) to use?
63+
return getattr(self, "_depends_on", [])
5864

5965
@property
6066
def is_dynamic(self) -> bool:
@@ -141,6 +147,18 @@ def pipeline_(param_name: str):
141147

142148
self._parameters = validated_args
143149
self._invocations = {}
150+
with self:
151+
for step in self.depends_on:
152+
self.add_step_invocation(
153+
step,
154+
input_artifacts={},
155+
external_artifacts={},
156+
model_artifacts_or_metadata={},
157+
client_lazy_loaders={},
158+
parameters={},
159+
default_parameters={},
160+
upstream_steps=set(),
161+
)
144162

145163
def add_dynamic_invocation(
146164
self,
@@ -179,28 +197,13 @@ def __call__(
179197
return
180198

181199
stack = Client().active_stack
182-
183200
if not stack.orchestrator.supports_dynamic_pipelines:
184201
raise RuntimeError(
185202
f"The {stack.orchestrator.__class__.__name__} does not support dynamic pipelines. "
186203
)
187204

188205
self.prepare(*args, **kwargs)
189-
snapshot = self._create_snapshot(**self._run_args)
190-
191-
with setup_pipeline_logging(
192-
source="client", snapshot=snapshot
193-
) as logs_request:
194-
run = create_placeholder_run(
195-
snapshot=snapshot,
196-
logs=logs_request,
197-
)
198-
stack.orchestrator.run(
199-
snapshot=snapshot,
200-
stack=stack,
201-
placeholder_run=run,
202-
)
203-
return Client().get_pipeline_run(run.id)
206+
return self._run()
204207

205208
def _call_entrypoint(self, *args: Any, **kwargs: Any) -> None:
206209
"""Calls the pipeline entrypoint function with the given arguments.
@@ -227,6 +230,9 @@ def _call_entrypoint(self, *args: Any, **kwargs: Any) -> None:
227230
"Check out the pydantic error above for more details."
228231
) from e
229232

233+
# Clear the invocations as they might still contain invocations from
234+
# the compilation phase.
235+
self._invocations = {}
230236
self.entrypoint(**validated_args)
231237

232238
def _compute_output_schema(self) -> Optional[Dict[str, Any]]:

src/zenml/pipelines/dynamic/runner.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from zenml import ExternalArtifact
1919
from zenml.client import Client
2020
from zenml.config.compiler import Compiler
21-
from zenml.config.step_configurations import Step
21+
from zenml.config.step_configurations import Step, StepConfiguration
2222
from zenml.exceptions import RunStoppedException
2323
from zenml.logger import get_logger
2424
from zenml.logging.step_logging import setup_pipeline_logging
@@ -174,9 +174,10 @@ def run_step_sync(
174174
"StepRunResultFuture", Sequence["StepRunResultFuture"], None
175175
] = None,
176176
) -> StepRunResult:
177+
step = step.copy()
177178
inputs, upstream_steps = _prepare_step_run(step, args, kwargs, after)
178179
compiled_step, _ = _compile_step(
179-
self.pipeline, step, id, upstream_steps, inputs
180+
self._snapshot, self.pipeline, step, id, upstream_steps, inputs
180181
)
181182
step_run = _run_step_sync(
182183
snapshot=self._snapshot,
@@ -197,9 +198,10 @@ def run_step_in_thread(
197198
"StepRunResultFuture", Sequence["StepRunResultFuture"], None
198199
] = None,
199200
) -> StepRunResultFuture:
201+
step = step.copy()
200202
inputs, upstream_steps = _prepare_step_run(step, args, kwargs, after)
201203
compiled_step, invocation_id = _compile_step(
202-
self.pipeline, step, id, upstream_steps, inputs
204+
self._snapshot, self.pipeline, step, id, upstream_steps, inputs
203205
)
204206

205207
def _run() -> StepRunResult:
@@ -266,6 +268,7 @@ def _await_and_validate_input(input: Any):
266268

267269

268270
def _compile_step(
271+
snapshot: "PipelineSnapshotResponse",
269272
pipeline: "DynamicPipeline",
270273
step: "BaseStep",
271274
id: Optional[str],
@@ -287,6 +290,9 @@ def _compile_step(
287290
else:
288291
external_artifacts[name] = ExternalArtifact(value=value)
289292

293+
if config := get_dynamic_step_configuration(snapshot, step):
294+
step._configuration = config
295+
290296
invocation_id = pipeline.add_dynamic_invocation(
291297
step=step,
292298
custom_id=id,
@@ -409,3 +415,16 @@ def _runs_in_process(step: "Step") -> bool:
409415
return False
410416

411417
return True
418+
419+
420+
def get_dynamic_step_configuration(
421+
snapshot: "PipelineSnapshotResponse",
422+
step: "BaseStep",
423+
) -> Optional["StepConfiguration"]:
424+
step_source = step.resolve().import_path
425+
426+
for compiled_step in snapshot.step_configurations.values():
427+
if compiled_step.spec.source.import_path == step_source:
428+
return compiled_step.config
429+
430+
return None

src/zenml/steps/base_step.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,12 @@ def __init__(
200200
},
201201
)
202202

203-
self._configuration = PartialStepConfiguration(
204-
name=name,
203+
self._configuration = PartialStepConfiguration(name=name)
204+
self.configure(
205205
enable_cache=enable_cache,
206206
enable_artifact_metadata=enable_artifact_metadata,
207207
enable_artifact_visualization=enable_artifact_visualization,
208208
enable_step_logs=enable_step_logs,
209-
)
210-
self.configure(
211209
experiment_tracker=experiment_tracker,
212210
step_operator=step_operator,
213211
output_materializers=output_materializers,
@@ -1061,6 +1059,7 @@ def _finalize_configuration(
10611059
external_artifacts: Dict[str, "ExternalArtifactConfiguration"],
10621060
model_artifacts_or_metadata: Dict[str, "ModelVersionDataLazyLoader"],
10631061
client_lazy_loaders: Dict[str, "ClientLazyLoader"],
1062+
skip_input_validation: bool = False,
10641063
) -> "StepConfiguration":
10651064
"""Finalizes the configuration after the step was called.
10661065
@@ -1075,6 +1074,7 @@ def _finalize_configuration(
10751074
model_artifacts_or_metadata: The model artifacts or metadata of
10761075
this step.
10771076
client_lazy_loaders: The client lazy loaders of this step.
1077+
skip_input_validation: If True, will skip the input validation.
10781078
10791079
Raises:
10801080
StepInterfaceError: If explicit materializers were specified for an
@@ -1164,12 +1164,13 @@ def _finalize_configuration(
11641164

11651165
parameters = self._finalize_parameters()
11661166
self.configure(parameters=parameters, merge=False)
1167-
self._validate_inputs(
1168-
input_artifacts=input_artifacts,
1169-
external_artifacts=external_artifacts,
1170-
model_artifacts_or_metadata=model_artifacts_or_metadata,
1171-
client_lazy_loaders=client_lazy_loaders,
1172-
)
1167+
if not skip_input_validation:
1168+
self._validate_inputs(
1169+
input_artifacts=input_artifacts,
1170+
external_artifacts=external_artifacts,
1171+
model_artifacts_or_metadata=model_artifacts_or_metadata,
1172+
client_lazy_loaders=client_lazy_loaders,
1173+
)
11731174

11741175
values = dict_utils.remove_none_values({"outputs": outputs or None})
11751176
config = StepConfigurationUpdate(**values)

src/zenml/steps/step_invocation.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,6 @@ def finalize(self, parameters_to_ignore: Set[str]) -> "StepConfiguration":
119119
external_artifacts=external_artifacts,
120120
model_artifacts_or_metadata=self.model_artifacts_or_metadata,
121121
client_lazy_loaders=self.client_lazy_loaders,
122+
# Skip input validation for dynamic pipeline step templates
123+
skip_input_validation=self.pipeline.is_dynamic,
122124
)

0 commit comments

Comments
 (0)