Skip to content

Commit c6afa49

Browse files
Mohmnankatiyar
andauthored
add support for running multiple pipelines via --pipelines (#5296)
* add support for running multiple pipelines via --pipelines Signed-off-by: Mohmn <naqashmohmin1@gmail.com> Signed-off-by: Mohmn <naqashmohmin1@gmail.com> * Fix tests and release notes Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com> --------- Signed-off-by: Mohmn <naqashmohmin1@gmail.com> Signed-off-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com> Co-authored-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> Co-authored-by: Ankita Katiyar <ankitakatiyar2401@gmail.com>
1 parent 81ed297 commit c6afa49

File tree

5 files changed

+137
-31
lines changed

5 files changed

+137
-31
lines changed

RELEASE.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
## Major features and improvements
33
* Added `@experimental` decorator to mark unstable or early-stage public APIs.
44
* Added the new `support-agent-langgraph` starter. This starter contains pipelines that leverage LangGraph for agentic workflows and Langfuse or Opik for prompt management and tracing.
5+
* Added support for running multiple pipelines in a single Kedro session run via the `--pipelines` CLI option and `pipeline_names` argument in `KedroSession.run()` method.
56

67
## Experimental features
78
* Added experimental `llm_context_node` and `LLMContextNode` for assembling LLMs, prompts, and tools into a runtime `LLMContext` within Kedro pipelines.
@@ -14,8 +15,9 @@
1415
## Documentation changes
1516
* Added beginner-friendly notes on `uvx` installation.
1617

17-
1818
## Community contributions
19+
Many thanks to the following Kedroids for contributing PRs to this release:
20+
* [Mohmn](https://github.com/Mohmn)
1921

2022
# Release 1.1.1
2123
## Bug fixes and other changes

kedro/framework/cli/project.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import click
1111

1212
from kedro.framework.cli.utils import (
13+
KedroCliError,
1314
_check_module_importable,
1415
_config_file_callback,
1516
_split_load_versions,
@@ -53,6 +54,9 @@
5354
override the loaded ones."""
5455
PIPELINE_ARG_HELP = """Name of the registered pipeline to run.
5556
If not set, the '__default__' pipeline is run."""
57+
PIPELINES_ARG_HELP = """Comma-separated names of registered pipelines to run.
58+
Example: --pipelines data_engineering,feature_engineering
59+
If not set, the '__default__' pipeline is run."""
5660
NAMESPACES_ARG_HELP = """Run only node namespaces with specified names."""
5761
PARAMS_ARG_HELP = """Specify extra parameters that you want to pass
5862
to the context initialiser. Items must be separated by comma, keys - by colon or equals sign,
@@ -200,6 +204,9 @@ def package(metadata: ProjectMetadata) -> None:
200204
callback=_split_load_versions,
201205
)
202206
@click.option("--pipeline", "-p", type=str, default=None, help=PIPELINE_ARG_HELP)
207+
@click.option(
208+
"--pipelines", type=str, default="", help=PIPELINES_ARG_HELP, callback=split_string
209+
)
203210
@click.option(
204211
"--namespaces",
205212
"-ns",
@@ -244,6 +251,7 @@ def run( # noqa: PLR0913
244251
to_outputs: str,
245252
load_versions: dict[str, str] | None,
246253
pipeline: str,
254+
pipelines: list[str],
247255
config: str,
248256
conf_source: str,
249257
params: dict[str, Any],
@@ -252,6 +260,13 @@ def run( # noqa: PLR0913
252260
) -> dict[str, Any]:
253261
"""Run the pipeline."""
254262

263+
if pipeline and pipelines:
264+
raise KedroCliError(
265+
"Options '--pipeline' and '--pipelines' cannot be used together"
266+
)
267+
268+
pipelines_to_run = list(set(pipelines)) if pipelines else None
269+
255270
runner_obj = load_obj(runner or "SequentialRunner", "kedro.runner")
256271
tuple_tags = tuple(tags)
257272
tuple_node_names = tuple(node_names)
@@ -269,6 +284,7 @@ def run( # noqa: PLR0913
269284
to_outputs=to_outputs,
270285
load_versions=load_versions,
271286
pipeline_name=pipeline,
287+
pipeline_names=pipelines_to_run,
272288
namespaces=namespaces,
273289
only_missing_outputs=only_missing_outputs,
274290
)

kedro/framework/session/session.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
)
2727
from kedro.io.core import generate_timestamp
2828
from kedro.io.data_catalog import SharedMemoryDataCatalog
29+
from kedro.pipeline.pipeline import Pipeline
2930
from kedro.runner import AbstractRunner, ParallelRunner, SequentialRunner
3031
from kedro.utils import find_kedro_project
3132

@@ -281,6 +282,7 @@ def __exit__(self, exc_type: Any, exc_value: Any, tb_: Any) -> None:
281282
def run( # noqa: PLR0913
282283
self,
283284
pipeline_name: str | None = None,
285+
pipeline_names: list[str] | None = None,
284286
tags: Iterable[str] | None = None,
285287
runner: AbstractRunner | None = None,
286288
node_names: Iterable[str] | None = None,
@@ -296,6 +298,7 @@ def run( # noqa: PLR0913
296298
297299
Args:
298300
pipeline_name: Name of the pipeline that is being run.
301+
pipeline_names: Name of the pipelines that is being run.
299302
tags: An optional list of node tags which should be used to
300303
filter the nodes of the ``Pipeline``. If specified, only the nodes
301304
containing *any* of these tags will be run.
@@ -330,6 +333,12 @@ def run( # noqa: PLR0913
330333
# Report project name
331334
project_name = self._package_name or self._project_path.name
332335
self._logger.info("Kedro project %s", project_name)
336+
if pipeline_name:
337+
self._logger.warning(
338+
"`pipeline_name` is deprecated and will be removed in a future release. "
339+
"Please use `pipeline_names` instead."
340+
)
341+
pipeline_names = [pipeline_name]
333342

334343
if self._run_called:
335344
raise KedroSessionError(
@@ -343,18 +352,19 @@ def run( # noqa: PLR0913
343352
runtime_params = self.store.get("runtime_params") or {}
344353
context = self.load_context()
345354

346-
name = pipeline_name or "__default__"
347-
348-
try:
349-
pipeline = pipelines[name]
350-
except KeyError as exc:
351-
raise ValueError(
352-
f"Failed to find the pipeline named '{name}'. "
353-
f"It needs to be generated and returned "
354-
f"by the 'register_pipelines' function."
355-
) from exc
356-
357-
filtered_pipeline = pipeline.filter(
355+
names = pipeline_names or ["__default__"]
356+
combined_pipelines = Pipeline([])
357+
for name in names:
358+
try:
359+
combined_pipelines += pipelines[name]
360+
except KeyError as exc:
361+
raise ValueError(
362+
f"Failed to find the pipeline named '{name}'. "
363+
f"It needs to be generated and returned "
364+
f"by the 'register_pipelines' function."
365+
) from exc
366+
367+
filtered_pipeline = combined_pipelines.filter(
358368
tags=tags,
359369
from_nodes=from_nodes,
360370
to_nodes=to_nodes,
@@ -377,7 +387,7 @@ def run( # noqa: PLR0913
377387
"to_outputs": to_outputs,
378388
"load_versions": load_versions,
379389
"runtime_params": runtime_params,
380-
"pipeline_name": pipeline_name,
390+
"pipeline_names": pipeline_names,
381391
"namespaces": namespaces,
382392
"runner": getattr(runner, "__name__", str(runner)),
383393
"only_missing_outputs": only_missing_outputs,

tests/framework/cli/test_cli.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,7 @@ def test_run_successfully(
577577
to_outputs=[],
578578
load_versions={},
579579
pipeline_name=None,
580+
pipeline_names=None,
580581
namespaces=[],
581582
only_missing_outputs=False,
582583
)
@@ -618,6 +619,7 @@ def test_run_specific_nodes(
618619
to_outputs=[],
619620
load_versions={},
620621
pipeline_name=None,
622+
pipeline_names=None,
621623
namespaces=[],
622624
only_missing_outputs=False,
623625
)
@@ -659,6 +661,7 @@ def test_run_with_tags(
659661
to_outputs=[],
660662
load_versions={},
661663
pipeline_name=None,
664+
pipeline_names=None,
662665
namespaces=[],
663666
only_missing_outputs=False,
664667
)
@@ -691,6 +694,7 @@ def test_run_with_pipeline_filters(
691694
to_outputs=[],
692695
load_versions={},
693696
pipeline_name=None,
697+
pipeline_names=None,
694698
namespaces=["fake_namespace"],
695699
only_missing_outputs=False,
696700
)
@@ -716,6 +720,7 @@ def test_run_successfully_parallel(
716720
to_outputs=[],
717721
load_versions={},
718722
pipeline_name=None,
723+
pipeline_names=None,
719724
namespaces=[],
720725
only_missing_outputs=False,
721726
)
@@ -757,10 +762,65 @@ def test_run_with_config(
757762
to_outputs=[],
758763
load_versions={},
759764
pipeline_name="pipeline1",
765+
pipeline_names=None,
760766
namespaces=[],
761767
only_missing_outputs=False,
762768
)
763769

770+
def test_run_multiple_pipelines(
771+
self, fake_project_cli, fake_metadata, fake_session
772+
):
773+
result = CliRunner().invoke(
774+
fake_project_cli,
775+
["run", "--pipelines", "pipe1,pipe2"],
776+
obj=fake_metadata,
777+
)
778+
779+
assert not result.exit_code
780+
assert fake_session.run.call_count == 1
781+
assert fake_session.run.call_args.kwargs["pipeline_name"] is None
782+
783+
pipelines = fake_session.run.call_args.kwargs["pipeline_names"]
784+
assert "pipe1" in pipelines
785+
assert "pipe2" in pipelines
786+
787+
def test_run_multiple_pipelines_with_duplicate_name(
788+
self, fake_project_cli, fake_metadata, fake_session
789+
):
790+
result = CliRunner().invoke(
791+
fake_project_cli,
792+
["run", "--pipelines", "pipe1,pipe1"],
793+
obj=fake_metadata,
794+
)
795+
796+
assert not result.exit_code
797+
assert fake_session.run.call_count == 1
798+
assert fake_session.run.call_args.kwargs["pipeline_name"] is None
799+
assert fake_session.run.call_args.kwargs["pipeline_names"] == ["pipe1"]
800+
801+
def test_pipeline_and_pipelines_mutually_exclusive(
802+
self, fake_project_cli, fake_metadata
803+
):
804+
result = CliRunner().invoke(
805+
fake_project_cli,
806+
["run", "--pipeline", "pipe1", "--pipelines", "pipe2"],
807+
obj=fake_metadata,
808+
)
809+
810+
assert result.exit_code != 0
811+
assert "cannot be used together" in result.output.lower()
812+
813+
def test_pipeline_name_deprecation_warning(
814+
self, fake_project_cli, fake_metadata, caplog
815+
):
816+
CliRunner().invoke(
817+
fake_project_cli,
818+
["run", "--pipeline", "pipe1"],
819+
obj=fake_metadata,
820+
)
821+
822+
assert "deprecated" in caplog.text.lower()
823+
764824
@mark.parametrize("config_flag", ["--config", "-c"])
765825
def test_run_with_invalid_config(
766826
self,
@@ -823,6 +883,7 @@ def test_run_with_params_in_config(
823883
to_outputs=[],
824884
load_versions={},
825885
pipeline_name="pipeline1",
886+
pipeline_names=None,
826887
namespaces=[],
827888
only_missing_outputs=False,
828889
)
@@ -938,6 +999,7 @@ def test_split_load_versions(
938999
to_outputs=[],
9391000
load_versions=lv_dict,
9401001
pipeline_name=None,
1002+
pipeline_names=None,
9411003
namespaces=[],
9421004
only_missing_outputs=False,
9431005
)
@@ -1001,6 +1063,7 @@ def test_safe_split_option_arguments(
10011063
to_outputs=[],
10021064
load_versions={},
10031065
pipeline_name=None,
1066+
pipeline_names=None,
10041067
namespaces=[],
10051068
only_missing_outputs=False,
10061069
)

0 commit comments

Comments
 (0)