Skip to content

Commit 137413b

Browse files
committed
More fixes
1 parent 0a27ff6 commit 137413b

File tree

4 files changed

+57
-39
lines changed

4 files changed

+57
-39
lines changed

src/zenml/config/compiler.py

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -489,40 +489,41 @@ def _compile_step_invocation(
489489
invocation.step = copy.deepcopy(invocation.step)
490490

491491
step = invocation.step
492-
if step_config:
493-
step._apply_configuration(
494-
step_config, runtime_parameters=invocation.parameters
495-
)
492+
with step._skip_dynamic_configuration():
493+
if step_config:
494+
step._apply_configuration(
495+
step_config, runtime_parameters=invocation.parameters
496+
)
496497

497-
# Apply the dynamic configuration (which happened while executing the
498-
# pipeline function) after all other step-specific configurations.
499-
step._apply_dynamic_configuration()
498+
# Apply the dynamic configuration (which happened while executing the
499+
# pipeline function) after all other step-specific configurations.
500+
step._apply_dynamic_configuration()
500501

501-
convert_component_shortcut_settings_keys(
502-
step.configuration.settings, stack=stack
503-
)
504-
step_spec = self._get_step_spec(invocation=invocation)
505-
step_secrets = secret_utils.resolve_and_verify_secrets(
506-
step.configuration.secrets
507-
)
508-
step_settings = self._filter_and_validate_settings(
509-
settings=step.configuration.settings,
510-
configuration_level=ConfigurationLevel.STEP,
511-
stack=stack,
512-
)
513-
step.configure(
514-
secrets=step_secrets,
515-
settings=step_settings,
516-
merge=False,
517-
)
502+
convert_component_shortcut_settings_keys(
503+
step.configuration.settings, stack=stack
504+
)
505+
step_spec = self._get_step_spec(invocation=invocation)
506+
step_secrets = secret_utils.resolve_and_verify_secrets(
507+
step.configuration.secrets
508+
)
509+
step_settings = self._filter_and_validate_settings(
510+
settings=step.configuration.settings,
511+
configuration_level=ConfigurationLevel.STEP,
512+
stack=stack,
513+
)
514+
step.configure(
515+
secrets=step_secrets,
516+
settings=step_settings,
517+
merge=False,
518+
)
518519

519-
parameters_to_ignore = (
520-
set(step_config.parameters or {}) if step_config else set()
521-
)
522-
step_configuration_overrides = invocation.finalize(
523-
parameters_to_ignore=parameters_to_ignore,
524-
skip_input_validation=skip_input_validation,
525-
)
520+
parameters_to_ignore = (
521+
set(step_config.parameters or {}) if step_config else set()
522+
)
523+
step_configuration_overrides = invocation.finalize(
524+
parameters_to_ignore=parameters_to_ignore,
525+
skip_input_validation=skip_input_validation,
526+
)
526527
full_step_config = (
527528
step_configuration_overrides.apply_pipeline_configuration(
528529
pipeline_configuration=pipeline_configuration

src/zenml/pipelines/dynamic/runner.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ def _await_and_validate_input(input: Any) -> Any:
318318

319319
# TODO: we can validate the type of the inputs that are passed as raw data
320320
signature = inspect.signature(step.entrypoint, follow_wrapped=True)
321-
validated_args = signature.bind(*args, **kwargs).arguments
321+
validated_args = signature.bind_partial(*args, **kwargs).arguments
322322

323323
return validated_args, upstream_steps
324324

@@ -351,16 +351,18 @@ def _compile_step(
351351
update={"template": template.spec.invocation_id}
352352
)
353353

354-
# TODO:
355-
# - differentiate between input artifacts and parameters?
356-
# - default parameters
357-
invocation_id = pipeline.add_dynamic_invocation(
354+
_, _, _, _, _, default_parameters = step._parse_call_args(**inputs)
355+
invocation_id = pipeline.add_step_invocation(
358356
step=step,
359357
custom_id=id,
360358
allow_id_suffix=not id,
361359
input_artifacts=input_artifacts,
362360
external_artifacts=external_artifacts,
363361
upstream_steps=upstream_steps,
362+
parameters={},
363+
default_parameters=default_parameters,
364+
model_artifacts_or_metadata={},
365+
client_lazy_loaders={},
364366
)
365367
compiled_step = Compiler()._compile_step_invocation(
366368
invocation=pipeline.invocations[invocation_id],

src/zenml/pipelines/pipeline_definition.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1361,7 +1361,7 @@ def add_step_invocation(
13611361
Returns:
13621362
The step invocation ID.
13631363
"""
1364-
if Pipeline.ACTIVE_PIPELINE != self:
1364+
if not self.is_dynamic and Pipeline.ACTIVE_PIPELINE != self:
13651365
raise RuntimeError(
13661366
"A step invocation can only be added to an active pipeline."
13671367
)

src/zenml/steps/base_step.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
import inspect
1919
from abc import abstractmethod
2020
from collections import defaultdict
21+
from contextlib import contextmanager
2122
from typing import (
2223
TYPE_CHECKING,
2324
Any,
2425
Dict,
26+
Generator,
2527
List,
2628
Mapping,
2729
Optional,
@@ -207,6 +209,7 @@ def __init__(
207209

208210
self._configuration = PartialStepConfiguration(name=name)
209211
self._dynamic_configuration: Optional["StepConfigurationUpdate"] = None
212+
self._should_skip_dynamic_configuration = False
210213
self.configure(
211214
enable_cache=enable_cache,
212215
enable_artifact_metadata=enable_artifact_metadata,
@@ -492,7 +495,9 @@ def __call__(
492495
if context := DynamicPipelineRunContext.get():
493496
after = cast(
494497
Union[
495-
"StepRunOutputsFuture", Sequence["StepRunOutputsFuture"], None
498+
"StepRunOutputsFuture",
499+
Sequence["StepRunOutputsFuture"],
500+
None,
496501
],
497502
after,
498503
)
@@ -910,6 +915,13 @@ def copy(self) -> "BaseStep":
910915

911916
return copy_
912917

918+
@contextmanager
919+
def _skip_dynamic_configuration(self) -> Generator[None, None, None]:
920+
"""Context manager to skip the dynamic configuration."""
921+
self._should_skip_dynamic_configuration = True
922+
yield
923+
self._should_skip_dynamic_configuration = False
924+
913925
def _apply_configuration(
914926
self,
915927
config: "StepConfigurationUpdate",
@@ -929,7 +941,10 @@ def _apply_configuration(
929941

930942
self._validate_configuration(config, runtime_parameters)
931943

932-
if DynamicPipelineRunContext.get():
944+
if (
945+
DynamicPipelineRunContext.get()
946+
and not self._should_skip_dynamic_configuration
947+
):
933948
if self._dynamic_configuration is None:
934949
self._dynamic_configuration = config
935950
else:

0 commit comments

Comments
 (0)