Skip to content

Commit a83c54c

Browse files
committed
Cleanup
1 parent d745345 commit a83c54c

File tree

19 files changed

+381
-347
lines changed

19 files changed

+381
-347
lines changed

src/zenml/config/compiler.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
from zenml.exceptions import StackValidationError
4343
from zenml.models import PipelineSnapshotBase
4444
from zenml.pipelines.run_utils import get_default_run_name
45-
from zenml.steps.step_invocation import StepInvocation
4645
from zenml.utils import pydantic_utils, secret_utils, settings_utils
4746

4847
if TYPE_CHECKING:
@@ -127,12 +126,18 @@ def compile(
127126
merge=False,
128127
)
129128

129+
# If we're compiling a dynamic pipeline, the steps are only templates
130+
# and might not have all inputs defined, so we skip the input
131+
# validation.
132+
skip_input_validation = pipeline.is_dynamic
133+
130134
steps = {
131135
invocation_id: self._compile_step_invocation(
132136
invocation=invocation,
133137
stack=stack,
134138
step_config=(run_configuration.steps or {}).get(invocation_id),
135139
pipeline_configuration=pipeline.configuration,
140+
skip_input_validation=skip_input_validation,
136141
)
137142
for invocation_id, invocation in self._get_sorted_invocations(
138143
pipeline=pipeline
@@ -465,6 +470,7 @@ def _compile_step_invocation(
465470
stack: "Stack",
466471
step_config: Optional["StepConfigurationUpdate"],
467472
pipeline_configuration: "PipelineConfiguration",
473+
skip_input_validation: bool = False,
468474
) -> Step:
469475
"""Compiles a ZenML step.
470476
@@ -473,6 +479,7 @@ def _compile_step_invocation(
473479
stack: The stack on which the pipeline will be run.
474480
step_config: Run configuration for the step.
475481
pipeline_configuration: Configuration for the pipeline.
482+
skip_input_validation: If True, will skip the input validation.
476483
477484
Returns:
478485
The compiled step.
@@ -487,6 +494,10 @@ def _compile_step_invocation(
487494
step_config, runtime_parameters=invocation.parameters
488495
)
489496

497+
# Apply the dynamic configuration (which happened while executing the
498+
# pipeline function) after all other step-specific configurations.
499+
step._apply_dynamic_configuration()
500+
490501
convert_component_shortcut_settings_keys(
491502
step.configuration.settings, stack=stack
492503
)
@@ -509,7 +520,8 @@ def _compile_step_invocation(
509520
set(step_config.parameters or {}) if step_config else set()
510521
)
511522
step_configuration_overrides = invocation.finalize(
512-
parameters_to_ignore=parameters_to_ignore
523+
parameters_to_ignore=parameters_to_ignore,
524+
skip_input_validation=skip_input_validation,
513525
)
514526
full_step_config = (
515527
step_configuration_overrides.apply_pipeline_configuration(
@@ -535,9 +547,13 @@ def _get_sorted_invocations(
535547
pipeline: The pipeline of which to sort the invocations
536548
537549
Returns:
538-
The sorted steps.
550+
The sorted step invocations.
539551
"""
540552
if pipeline.is_dynamic:
553+
# In dynamic pipelines, we require the static invocations to be
554+
# sorted the same way they were passed in `pipeline.depends_on`, as
555+
# we index this list later to figure out the correct template for
556+
# each step invocation.
541557
return list(pipeline.invocations.items())
542558

543559
from zenml.orchestrators.dag_runner import reverse_dag

src/zenml/config/step_configurations.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ class PartialStepConfiguration(StepConfigurationUpdate):
261261
"""Class representing a partial step configuration."""
262262

263263
name: str
264+
# TODO: maybe move to spec?
264265
template: Optional[str] = None
265266
parameters: Dict[str, Any] = {}
266267
settings: Dict[str, SerializeAsAny[BaseSettings]] = {}

0 commit comments

Comments
 (0)