Skip to content

Conversation

@schustmi
Copy link
Contributor

@schustmi schustmi commented Oct 21, 2025

Describe changes

This PR implements dynamic pipelines for the local/kubernetes orchestrators.

Example

from zenml import step, pipeline

@step
def generate_int() -> int:
  return 3

@step
def do_something(index: int) -> None:
  ...

@pipeline(dynamic=True)
def dynamic_pipeline() -> None:
  count = generate_int()
  # `count` is an artifact, we now load the data
  count_data = count.load()

  for idx in range(count_data):
    # This will run sequentially, like regular python code would.
    # See the features below for an example on how to run steps
    # in parallel.
    do_something(idx)

if __name__ == "__main__":
  dynamic_pipeline()

Features

  • Dynamic configuration for steps:
@pipeline(dynamic=True)
def dynamic_pipeline():
  some_step.with_options(enable_cache=False)()
  • By default, all steps will run inside the orchestration environment. By specifying in_process=False for a step, the orchestrator will launch a separate container in which the step will be executed (only works if using the kubernetes orchestrator):
@step(in_process=False)
def some_step() -> None:
  ...
  • Execute multiple steps in parallel by using step.submit(...). This will either execute the step or launch a new container in a new thread.
@step
def some_step(arg: int) -> int:
  ...

@pipeline(dynamic=True)
def dynamic_pipeline():
  future = some_step.submit(arg=1)
  artifact = future.result()  # wait and get artifact response(s)
  data = future.load()  # wait and load artifact data
  downstream_step(future)  # pass the output to another step

  # Run multiple steps in parallel
  for idx in range(3):
    some_step.submit(arg=idx)
  • Specify config templates for steps using depends_on:
# config.yaml
steps:
  some_step:
    parameters:
      arg: 3
# run.py
@step
def some_step(arg: int) -> None:
  ...

@pipeline(dynamic=True, depends_on=[some_step])
def dynamic_pipeline():
  some_step()

if __name__ == "__main__":
  dynamic_pipeline.with_options(config_path="config.yaml")()

Limitations/Known issues

  • Our logging storage isn't threadsafe yet, which means logs of parallel steps are mixed up.
  • When running multiple steps concurrently, failure in one step does not stop the other steps. Instead, they continue executing until finished.

Pre-requisites

Please ensure you have done the following:

  • I have read the CONTRIBUTING.md document.
  • I have added tests to cover my changes.
  • I have based my new branch on develop and the open PR is targeting develop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.
  • IMPORTANT: I made sure that my changes are reflected properly in the following resources:
    • ZenML Docs
    • Dashboard: Needs to be communicated to the frontend team.
    • Templates: Might need adjustments (that are not reflected in the template tests) in case of non-breaking changes and deprecations.
    • Projects: Depending on the version dependencies, different projects might get affected.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Other (add details above)

@github-actions github-actions bot added internal To filter out internal PRs and issues enhancement New feature or request labels Oct 21, 2025
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 4c02511 to c6153aa Compare October 21, 2025 02:02
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch 3 times, most recently from 08b3069 to 250e5d3 Compare October 23, 2025 02:42
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 250e5d3 to e217f58 Compare October 23, 2025 08:49
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 575843d to 226b65f Compare October 27, 2025 05:23
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch 5 times, most recently from 597b9ce to a83c54c Compare October 28, 2025 02:25
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch 2 times, most recently from e82964c to 093ee9a Compare October 28, 2025 07:06
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 093ee9a to b0b6162 Compare October 28, 2025 07:43
@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from cffc807 to ec7bae4 Compare October 28, 2025 08:24
@schustmi schustmi marked this pull request as ready for review November 3, 2025 08:40
@github-actions
Copy link
Contributor

github-actions bot commented Nov 3, 2025

ZenML CLI Performance Comparison (Threshold: 1.0s, Timeout: 60s, Slow: 5s)

❌ Failed Commands on Current Branch (feature/dynamic-pipelines)

  • zenml stack list: Command failed on run 1 (exit code: 1)
  • zenml pipeline list: Command failed on run 1 (exit code: 1)
  • zenml model list: Command failed on run 1 (exit code: 1)

🚨 New Failures Introduced

The following commands fail on your branch but worked on the target branch:

  • zenml stack list
  • zenml pipeline list
  • zenml model list

Performance Comparison

Command develop Time (s) feature/dynamic-pipelines Time (s) Difference Status
zenml --help 1.492634 ± 0.016523 1.485113 ± 0.004690 -0.008s ✓ No significant change
zenml model list Not tested Failed N/A ❌ Broken in current branch
zenml pipeline list Not tested Failed N/A ❌ Broken in current branch
zenml stack --help 1.481887 ± 0.026134 1.503180 ± 0.006481 +0.021s ✓ No significant change
zenml stack list Not tested Failed N/A ❌ Broken in current branch

Summary

  • Total commands analyzed: 5
  • Commands compared for timing: 2
  • Commands improved: 0 (0.0% of compared)
  • Commands degraded: 0 (0.0% of compared)
  • Commands unchanged: 2 (100.0% of compared)
  • Failed commands: 3 (NEW FAILURES INTRODUCED)
  • Timed out commands: 0
  • Slow commands: 0

Environment Info

  • Target branch: Linux 6.11.0-1018-azure
  • Current branch: Linux 6.11.0-1018-azure
  • Test timestamp: 2025-11-03T09:14:20Z
  • Timeout: 60 seconds
  • Slow threshold: 5 seconds

@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from d5f90e8 to e902eaf Compare November 3, 2025 09:11
"""
return self._wrapped.result()

def load(self) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should keep a cache dictionary for materialized artifacts, so if (for some weird reason) users do multiple load calls (instead of assigning and reusing) we can return the cached results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea. I'll also add a boolean flag to the load method to prevent this, so users can at least manually disable it for huge artifacts.

else:
raise ValueError(f"Invalid step run output: {result}")

def __getitem__(self, key: Union[str, int]) -> ArtifactFuture:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should provide a custom implementation for setitem as a better UX/guidance to users than getting an error like this: object does not support item assignment.

"""
if isinstance(key, str):
index = self._output_keys.index(key)
elif isinstance(key, int):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend adopting one kind of wrapper behavior and stick to it. If we enable users to access futures both dict-style and list/tuple style I think the more we extend the class magic functions the more the behavior may be more confusing regarding what gets executed and what the users expects to be executed.

Plus in general accessing by key is a safer operation and would be a good pattern to enforce. The results may change length, order and accessing by position is a bit un-safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, I can get dict-style (by specifying key) but the __iter__ function I may expect it to return the keys not the values. For better consistency I would say we have 2 options:

  1. No magic functions - We provide public methods with clean documentation. Uses can manipulate futures based on the available functions and their signatures. No confusion, users stick to following docstrings.
  2. We implement magic functions but we assume implementation wraps one kind of hidden data structure. Users can manipulate it as dict (preferable imo) or as a list/tuple.

Copy link
Contributor Author

@schustmi schustmi Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I agree we should only support one way. My vote goes for tuple like behaviour though, as the main purpose of my implementations was to actually support the following: When calling any python function with multiple outputs, there are two cases:

def f():
  return 1, "str"

# Return value is a tuple that can be accessed with `int` keys
tuple_result =  f()
int_result = tuple_result[0]

# Automatic unpacking
int_result, str_result = f()

I think the latter is most common, and is the use-case I think we should support to make it feel as pythonic as possible (and also mirrors how you would call sync steps, in which case the return value will not be a future but instead of tuple of artifacts).

Tuples are also immutable which in our case holds true as well as you can't add outputs to the future result of a step run.

We can then add some helper methods like get_output(key: str) like you suggested to get allow fetching specific outputs by key.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, yes, tuple should do it as well. As long as we are consistent I think we are ok.

index=index,
)

def __iter__(self) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would __contains__ make sense to also implement here? 🤔

Copy link
Contributor Author

@schustmi schustmi Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes I thought I already did, that definitely makes sense!
Actually if we do implement it as a tuple-like data structure the contains will be with values and might not make much sense, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yy it would make more sense in a dict-like scenario!

Yields:
None.
"""
with env_utils.temporary_environment(
Copy link
Contributor

@Json-Andriopoulos Json-Andriopoulos Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in the context of this PR but we can do a better job here, thread-safety wise. In general os.environ should be treated as an immutable value, changing its values may also affect un-intentionally other execution paths (for instance code running in other threads).

I think masking the environment under a custom object/class and making that accessible with context vars resolves the issue (context vars are thread-local, wrapper object loads and freezes state and exposes all the operations we want with relative safety).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right! This is not a problem in this case, but when running multiple steps in parallel we also set different environment variables which is problematic. At least in this case I can switch to using a context var instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can work something here, but different story of course. ContextVars should work perfectly, maybe in combo with a centralized BaseSettings object. Will create the story and we can discuss implementation.

Whether to prevent pipeline execution.
"""
return handle_bool_env_var(
ENV_ZENML_PREVENT_PIPELINE_EXECUTION, default=False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to track those environment variables references. Maybe a BaseSettings object to organize those in a single place would be a good idea. Also we wouldn't need this function, pydantic would validate the boolean value for us in one go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep agreed in general that would be a nice thing. We even have some classes that do this for a subset of env variables (ServerConfiguration and GlobalConfiguration), but not for all of them. This function also treats values like "yes" as True, not sure how pydantic would handle that natively, but we could implement it for sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Pydantic captures multiple values as well, I can cross compare. Will open a new story for this just wanted to get your opinion :)

@schustmi schustmi force-pushed the feature/dynamic-pipelines branch from 5303878 to b7ddcd3 Compare November 4, 2025 08:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request internal To filter out internal PRs and issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants