Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c6153aa
WIP
schustmi Oct 17, 2025
98d115e
Dynamic step config
schustmi Oct 21, 2025
6b51cc5
misc
schustmi Oct 21, 2025
0906233
Optional entrypoint args
schustmi Oct 21, 2025
e217f58
misc
schustmi Oct 21, 2025
34aeb41
Merge branch 'develop' into feature/dynamic-pipelines
schustmi Oct 27, 2025
7956099
Dynamic config
schustmi Oct 27, 2025
226b65f
Maybe solution for step configs
schustmi Oct 27, 2025
d745345
DB migration
schustmi Oct 27, 2025
60e9358
Cleanup
schustmi Oct 27, 2025
b0b6162
Misc fixes
schustmi Oct 28, 2025
ec7bae4
Fix DAG
schustmi Oct 28, 2025
ddc9e32
Merge branch 'develop' into feature/dynamic-pipelines
schustmi Oct 28, 2025
f87f325
Output schema
schustmi Oct 28, 2025
0a27ff6
More fixes
schustmi Oct 28, 2025
137413b
More fixes
schustmi Oct 28, 2025
438b003
Threadsafe static pipeline compilation
schustmi Oct 29, 2025
70bf95b
Refactoring
schustmi Oct 29, 2025
dc9fa0f
Merge branch 'develop' into feature/dynamic-pipelines
schustmi Oct 31, 2025
f63e3e4
Big refactoring
schustmi Oct 31, 2025
5162911
Split futures
schustmi Oct 31, 2025
050970f
Error logs
schustmi Nov 3, 2025
22e7a9d
Mypy and docstrings
schustmi Nov 3, 2025
291787a
Bool flag on decorator
schustmi Nov 3, 2025
c246d2d
Experimental warning
schustmi Nov 3, 2025
1bb1cd3
Kubernetes orchestrator rework
schustmi Nov 3, 2025
b9224d6
Step context changes
schustmi Nov 3, 2025
eeeff39
Fix unit tests
schustmi Nov 3, 2025
aa6552e
Validate dynamic pipeline source resolving
schustmi Nov 3, 2025
71ae283
Integration test fixes
schustmi Nov 3, 2025
e902eaf
Linting
schustmi Nov 3, 2025
9646d24
Fix DB migration
schustmi Nov 3, 2025
23ca5eb
Docstring fix
schustmi Nov 3, 2025
c082950
Merge branch 'develop' into feature/dynamic-pipelines
schustmi Nov 3, 2025
e3653ce
Fix wrong test
schustmi Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 51 additions & 29 deletions src/zenml/config/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,18 @@ def compile(
merge=False,
)

# If we're compiling a dynamic pipeline, the steps are only templates
# and might not have all inputs defined, so we skip the input
# validation.
skip_input_validation = pipeline.is_dynamic

steps = {
invocation_id: self._compile_step_invocation(
invocation=invocation,
stack=stack,
step_config=(run_configuration.steps or {}).get(invocation_id),
pipeline_configuration=pipeline.configuration,
skip_input_validation=skip_input_validation,
)
for invocation_id, invocation in self._get_sorted_invocations(
pipeline=pipeline
Expand All @@ -153,6 +159,7 @@ def compile(

snapshot = PipelineSnapshotBase(
run_name_template=run_name,
is_dynamic=pipeline.is_dynamic,
pipeline_configuration=pipeline.configuration,
step_configurations=steps,
client_environment=get_run_environment_dict(),
Expand Down Expand Up @@ -463,6 +470,7 @@ def _compile_step_invocation(
stack: "Stack",
step_config: Optional["StepConfigurationUpdate"],
pipeline_configuration: "PipelineConfiguration",
skip_input_validation: bool = False,
) -> Step:
"""Compiles a ZenML step.

Expand All @@ -471,6 +479,7 @@ def _compile_step_invocation(
stack: The stack on which the pipeline will be run.
step_config: Run configuration for the step.
pipeline_configuration: Configuration for the pipeline.
skip_input_validation: If True, will skip the input validation.

Returns:
The compiled step.
Expand All @@ -480,35 +489,41 @@ def _compile_step_invocation(
invocation.step = copy.deepcopy(invocation.step)

step = invocation.step
if step_config:
step._apply_configuration(
step_config, runtime_parameters=invocation.parameters
)
with step._suspend_dynamic_configuration():
if step_config:
step._apply_configuration(
step_config, runtime_parameters=invocation.parameters
)

convert_component_shortcut_settings_keys(
step.configuration.settings, stack=stack
)
step_spec = self._get_step_spec(invocation=invocation)
step_secrets = secret_utils.resolve_and_verify_secrets(
step.configuration.secrets
)
step_settings = self._filter_and_validate_settings(
settings=step.configuration.settings,
configuration_level=ConfigurationLevel.STEP,
stack=stack,
)
step.configure(
secrets=step_secrets,
settings=step_settings,
merge=False,
)
# Apply the dynamic configuration (which happened while executing the
# pipeline function) after all other step-specific configurations.
step._merge_dynamic_configuration()

parameters_to_ignore = (
set(step_config.parameters or {}) if step_config else set()
)
step_configuration_overrides = invocation.finalize(
parameters_to_ignore=parameters_to_ignore
)
convert_component_shortcut_settings_keys(
step.configuration.settings, stack=stack
)
step_spec = self._get_step_spec(invocation=invocation)
step_secrets = secret_utils.resolve_and_verify_secrets(
step.configuration.secrets
)
step_settings = self._filter_and_validate_settings(
settings=step.configuration.settings,
configuration_level=ConfigurationLevel.STEP,
stack=stack,
)
step.configure(
secrets=step_secrets,
settings=step_settings,
merge=False,
)

parameters_to_ignore = (
set(step_config.parameters or {}) if step_config else set()
)
step_configuration_overrides = invocation.finalize(
parameters_to_ignore=parameters_to_ignore,
skip_input_validation=skip_input_validation,
)
full_step_config = (
step_configuration_overrides.apply_pipeline_configuration(
pipeline_configuration=pipeline_configuration
Expand All @@ -533,8 +548,15 @@ def _get_sorted_invocations(
pipeline: The pipeline of which to sort the invocations

Returns:
The sorted steps.
The sorted step invocations.
"""
if pipeline.is_dynamic:
# In dynamic pipelines, we require the static invocations to be
# sorted the same way they were passed in `pipeline.depends_on`, as
# we index this list later to figure out the correct template for
# each step invocation.
return list(pipeline.invocations.items())

from zenml.orchestrators.dag_runner import reverse_dag
from zenml.orchestrators.topsort import topsorted_layers

Expand Down Expand Up @@ -634,7 +656,7 @@ def _compute_pipeline_spec(
Raises:
ValueError: If the pipeline has no steps.
"""
if not step_specs:
if not step_specs and not pipeline.is_dynamic:
raise ValueError(
f"Pipeline '{pipeline.name}' cannot be compiled because it has "
f"no steps. Please make sure that your steps are decorated "
Expand Down
9 changes: 9 additions & 0 deletions src/zenml/config/step_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ class StepConfigurationUpdate(FrozenBaseModel):
default=None,
description="The cache policy for the step.",
)
in_process: Optional[bool] = Field(
default=None,
description="Whether to run the step in process. This is only "
"applicable for dynamic pipelines. If not set, the step will by "
"default run in-process unless it requires a different Docker image "
"or has special resource requirements.",
)

outputs: Mapping[str, PartialArtifactConfiguration] = {}

Expand Down Expand Up @@ -254,6 +261,8 @@ class PartialStepConfiguration(StepConfigurationUpdate):
"""Class representing a partial step configuration."""

name: str
# TODO: maybe move to spec?
template: Optional[str] = None
parameters: Dict[str, Any] = {}
settings: Dict[str, SerializeAsAny[BaseSettings]] = {}
environment: Dict[str, str] = {}
Expand Down
33 changes: 23 additions & 10 deletions src/zenml/config/step_run_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

from zenml.config.frozen_base_model import FrozenBaseModel
from zenml.config.pipeline_configurations import PipelineConfiguration
from zenml.config.step_configurations import StepConfiguration
from zenml.config.step_configurations import StepConfiguration, StepSpec
from zenml.logger import get_logger
from zenml.models import PipelineSnapshotResponse

logger = get_logger(__name__)


class StepRunInfo(FrozenBaseModel):
Expand All @@ -30,7 +34,9 @@ class StepRunInfo(FrozenBaseModel):
pipeline_step_name: str

config: StepConfiguration
spec: StepSpec
pipeline: PipelineConfiguration
snapshot: PipelineSnapshotResponse

force_write_logs: Callable[..., Any]

Expand All @@ -46,15 +52,22 @@ def get_image(self, key: str) -> str:
Returns:
The image name or digest.
"""
from zenml.client import Client

run = Client().get_pipeline_run(self.run_id)
if not run.build:
if not self.snapshot.build:
raise RuntimeError(
f"Missing build for run {run.id}. This is probably because "
"the build was manually deleted."
f"Missing build for snapshot {self.snapshot.id}. This is "
"probably because the build was manually deleted."
)

return run.build.get_image(
component_key=key, step=self.pipeline_step_name
)
if self.snapshot.is_dynamic:
step_key = self.config.template
if not step_key:
logger.warning(
"Unable to find config template for step %s. Falling "
"back to the pipeline image.",
self.pipeline_step_name,
)
step_key = None
else:
step_key = self.pipeline_step_name

return self.snapshot.build.get_image(component_key=key, step=step_key)
5 changes: 0 additions & 5 deletions src/zenml/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,6 @@ def handle_int_env_var(var: str, default: int = 0) -> int:
# ZenML Analytics Server - URL
ANALYTICS_SERVER_URL = "https://analytics.zenml.io/"

# Container utils
SHOULD_PREVENT_PIPELINE_EXECUTION = handle_bool_env_var(
ENV_ZENML_PREVENT_PIPELINE_EXECUTION
)

# Repository and local store directory paths:
REPOSITORY_DIRECTORY_NAME = ".zen"
LOCAL_STORES_DIRECTORY_NAME = "local_stores"
Expand Down
8 changes: 4 additions & 4 deletions src/zenml/deployers/server/entrypoint_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# permissions and limitations under the License.
"""ZenML Pipeline Deployment Entrypoint Configuration."""

from typing import Any, List, Set
from typing import Any, Dict, List
from uuid import UUID

from zenml.client import Client
Expand All @@ -39,14 +39,14 @@ class DeploymentEntrypointConfiguration(BaseEntrypointConfiguration):
"""

@classmethod
def get_entrypoint_options(cls) -> Set[str]:
def get_entrypoint_options(cls) -> Dict[str, bool]:
"""Gets all options required for the deployment entrypoint.

Returns:
Set of required option names
"""
return {
DEPLOYMENT_ID_OPTION,
DEPLOYMENT_ID_OPTION: True,
}

@classmethod
Expand Down Expand Up @@ -113,7 +113,7 @@ def run(self) -> None:
raise RuntimeError(f"Deployment {deployment.id} has no snapshot")

# Download code if necessary (for remote execution environments)
self.download_code_if_necessary(snapshot=deployment.snapshot)
self.download_code_if_necessary()

app_runner = BaseDeploymentAppRunner.load_app_runner(deployment)
app_runner.run()
Loading
Loading