Skip to content

Conversation

@schustmi
Copy link
Contributor

@schustmi schustmi commented Oct 21, 2025

Describe changes

This PR implements dynamic pipelines for the local/kubernetes orchestrators.

Example

from zenml import step, pipeline

@step
def generate_int() -> int:
  return 3

@step
def do_something(index: int) -> None:
  ...

@pipeline(dynamic=True)
def dynamic_pipeline() -> None:
  count = generate_int()
  # `count` is an artifact, we now load the data
  count_data = count.load()

  for idx in range(count_data):
    # This will run sequentially, like regular python code would.
    # See the features below for an example on how to run steps
    # in parallel.
    do_something(idx)

if __name__ == "__main__":
  dynamic_pipeline()

Features

  • Dynamic configuration for steps:
@pipeline(dynamic=True)
def dynamic_pipeline():
  some_step.with_options(enable_cache=False)()
  • By default, all steps will run inside the orchestration environment. By specifying in_process=False for a step, the orchestrator will launch a separate container in which the step will be executed (only works if using the kubernetes orchestrator):
@step(in_process=False)
def some_step() -> None:
  ...
  • Execute multiple steps in parallel by using step.submit(...). This will either execute the step or launch a new container in a new thread.
@step
def some_step(arg: int) -> int:
  ...

@pipeline(dynamic=True)
def dynamic_pipeline():
  future = some_step.submit(arg=1)
  artifact = future.result()  # wait and get artifact response(s)
  data = future.load()  # wait and load artifact data
  downstream_step(future)  # pass the output to another step

  # Run multiple steps in parallel
  for idx in range(3):
    some_step.submit(arg=idx)
  • Specify config templates for steps using depends_on:
# config.yaml
steps:
  some_step:
    parameters:
      arg: 3
# run.py
@step
def some_step(arg: int) -> None:
  ...

@pipeline(dynamic=True, depends_on=[some_step])
def dynamic_pipeline():
  some_step()

if __name__ == "__main__":
  dynamic_pipeline.with_options(config_path="config.yaml")()

Known issues

  • Our logging storage isn't threadsafe yet, which means logs of parallel steps are mixed up.
  • When running multiple steps concurrently, failure in one step does not stop the other threads. Instead, they continue executing until finished.

Pre-requisites

Please ensure you have done the following:

  • I have read the CONTRIBUTING.md document.
  • I have added tests to cover my changes.
  • I have based my new branch on develop and the open PR is targeting develop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.
  • IMPORTANT: I made sure that my changes are reflected properly in the following resources:
    • ZenML Docs
    • Dashboard: Needs to be communicated to the frontend team.
    • Templates: Might need adjustments (that are not reflected in the template tests) in case of non-breaking changes and deprecations.
    • Projects: Depending on the version dependencies, different projects might get affected.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Other (add details above)

@github-actions github-actions bot added internal To filter out internal PRs and issues enhancement New feature or request labels Oct 21, 2025
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 4c02511 to c6153aa Compare October 21, 2025 02:02
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch 3 times, most recently from 08b3069 to 250e5d3 Compare October 23, 2025 02:42
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 250e5d3 to e217f58 Compare October 23, 2025 08:49
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 575843d to 226b65f Compare October 27, 2025 05:23
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch 5 times, most recently from 597b9ce to a83c54c Compare October 28, 2025 02:25
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch 2 times, most recently from e82964c to 093ee9a Compare October 28, 2025 07:06
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 093ee9a to b0b6162 Compare October 28, 2025 07:43
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from cffc807 to ec7bae4 Compare October 28, 2025 08:24
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from b13c693 to 70bf95b Compare October 30, 2025 08:37
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 8c13c97 to 8162fd4 Compare November 3, 2025 06:07
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 8162fd4 to 1bb1cd3 Compare November 3, 2025 06:22
@schustmi schustmi marked this pull request as ready for review November 3, 2025 08:40
@github-actions
Copy link
Contributor

github-actions bot commented Nov 3, 2025

ZenML CLI Performance Comparison (Threshold: 1.0s, Timeout: 60s, Slow: 5s)

❌ Failed Commands on Current Branch (feature/dynamic-pipelines)

  • zenml stack list: Command failed on run 1 (exit code: 1)
  • zenml pipeline list: Command failed on run 1 (exit code: 1)
  • zenml model list: Command failed on run 1 (exit code: 1)

🚨 New Failures Introduced

The following commands fail on your branch but worked on the target branch:

  • zenml stack list
  • zenml pipeline list
  • zenml model list

Performance Comparison

Command develop Time (s) feature/dynamic-pipelines Time (s) Difference Status
zenml --help 1.492634 ± 0.016523 1.485113 ± 0.004690 -0.008s ✓ No significant change
zenml model list Not tested Failed N/A ❌ Broken in current branch
zenml pipeline list Not tested Failed N/A ❌ Broken in current branch
zenml stack --help 1.481887 ± 0.026134 1.503180 ± 0.006481 +0.021s ✓ No significant change
zenml stack list Not tested Failed N/A ❌ Broken in current branch

Summary

  • Total commands analyzed: 5
  • Commands compared for timing: 2
  • Commands improved: 0 (0.0% of compared)
  • Commands degraded: 0 (0.0% of compared)
  • Commands unchanged: 2 (100.0% of compared)
  • Failed commands: 3 (NEW FAILURES INTRODUCED)
  • Timed out commands: 0
  • Slow commands: 0

Environment Info

  • Target branch: Linux 6.11.0-1018-azure
  • Current branch: Linux 6.11.0-1018-azure
  • Test timestamp: 2025-11-03T09:14:20Z
  • Timeout: 60 seconds
  • Slow threshold: 5 seconds

@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from d5f90e8 to e902eaf Compare November 3, 2025 09:11
"""
return self._wrapped.result()

def load(self) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should keep a cache dictionary for materialized artifacts, so if (for some weird reason) users do multiple load calls (instead of assigning and reusing) we can return the cached results.

else:
raise ValueError(f"Invalid step run output: {result}")

def __getitem__(self, key: Union[str, int]) -> ArtifactFuture:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should provide a custom implementation for setitem as a better UX/guidance to users than getting an error like this: object does not support item assignment.

"""
if isinstance(key, str):
index = self._output_keys.index(key)
elif isinstance(key, int):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend adopting one kind of wrapper behavior and stick to it. If we enable users to access futures both dict-style and list/tuple style I think the more we extend the class magic functions the more the behavior may be more confusing regarding what gets executed and what the users expects to be executed.

Plus in general accessing by key is a safer operation and would be a good pattern to enforce. The results may change length, order and accessing by position is a bit un-safe.

index=index,
)

def __iter__(self) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would __contains__ make sense to also implement here? 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request internal To filter out internal PRs and issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants