Skip to content

Commit 70bf95b

Browse files
committed
Refactoring
1 parent 438b003 commit 70bf95b

File tree

12 files changed

+486
-489
lines changed

12 files changed

+486
-489
lines changed

src/zenml/config/compiler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -489,15 +489,15 @@ def _compile_step_invocation(
489489
invocation.step = copy.deepcopy(invocation.step)
490490

491491
step = invocation.step
492-
with step._skip_dynamic_configuration():
492+
with step._suspend_dynamic_configuration():
493493
if step_config:
494494
step._apply_configuration(
495495
step_config, runtime_parameters=invocation.parameters
496496
)
497497

498498
# Apply the dynamic configuration (which happened while executing the
499499
# pipeline function) after all other step-specific configurations.
500-
step._apply_dynamic_configuration()
500+
step._merge_dynamic_configuration()
501501

502502
convert_component_shortcut_settings_keys(
503503
step.configuration.settings, stack=stack

src/zenml/execution/step/utils.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# Copyright (c) ZenML GmbH 2025. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at:
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
# or implied. See the License for the specific language governing
13+
# permissions and limitations under the License.
14+
"""Step execution utilities."""
15+
16+
import time
17+
from typing import (
18+
TYPE_CHECKING,
19+
)
20+
21+
from zenml.config.step_configurations import Step
22+
from zenml.exceptions import RunStoppedException
23+
from zenml.logger import get_logger
24+
from zenml.models import (
25+
PipelineSnapshotResponse,
26+
)
27+
from zenml.models.v2.core.step_run import StepRunResponse
28+
from zenml.orchestrators.step_launcher import StepLauncher
29+
30+
if TYPE_CHECKING:
31+
from zenml.config.step_configurations import Step
32+
33+
34+
logger = get_logger(__name__)
35+
36+
37+
def launch_step(
38+
snapshot: "PipelineSnapshotResponse",
39+
step: "Step",
40+
orchestrator_run_id: str,
41+
retry: bool = False,
42+
) -> StepRunResponse:
43+
"""Launch a step.
44+
45+
Args:
46+
snapshot: The snapshot.
47+
step: The step to run.
48+
orchestrator_run_id: The orchestrator run ID.
49+
retry: Whether to retry the step if it fails.
50+
51+
Returns:
52+
The step run response.
53+
"""
54+
55+
def _launch_without_retry() -> StepRunResponse:
56+
launcher = StepLauncher(
57+
snapshot=snapshot,
58+
step=step,
59+
orchestrator_run_id=orchestrator_run_id,
60+
)
61+
return launcher.launch()
62+
63+
if not retry:
64+
step_run = _launch_without_retry()
65+
else:
66+
retries = 0
67+
retry_config = step.config.retry
68+
max_retries = retry_config.max_retries if retry_config else 0
69+
delay = retry_config.delay if retry_config else 0
70+
backoff = retry_config.backoff if retry_config else 1
71+
72+
while retries <= max_retries:
73+
try:
74+
step_run = _launch_without_retry()
75+
except RunStoppedException:
76+
# Don't retry if the run was stopped
77+
raise
78+
except BaseException:
79+
retries += 1
80+
if retries <= max_retries:
81+
logger.info(
82+
"Sleeping for %d seconds before retrying step `%s`.",
83+
delay,
84+
step.config.name,
85+
)
86+
time.sleep(delay)
87+
delay *= backoff
88+
else:
89+
if max_retries > 0:
90+
logger.error(
91+
"Failed to run step `%s` after %d retries.",
92+
step.config.name,
93+
max_retries,
94+
)
95+
raise
96+
else:
97+
break
98+
99+
return step_run

src/zenml/orchestrators/base_orchestrator.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,18 @@ def submit_dynamic_pipeline(
213213
environment: Dict[str, str],
214214
placeholder_run: Optional["PipelineRunResponse"] = None,
215215
) -> Optional[SubmissionResult]:
216-
"""Submits a dynamic pipeline to the orchestrator."""
216+
"""Submits a dynamic pipeline to the orchestrator.
217+
218+
Args:
219+
snapshot: The pipeline snapshot to submit.
220+
stack: The stack the pipeline will run on.
221+
environment: Environment variables to set in the orchestration
222+
environment.
223+
placeholder_run: An optional placeholder run.
224+
225+
Returns:
226+
Optional submission result.
227+
"""
217228
return None
218229

219230
def prepare_or_run_pipeline(
@@ -427,9 +438,11 @@ def run_step(
427438
RunStoppedException: If the run was stopped.
428439
BaseException: If the step failed all retries.
429440
"""
430-
from zenml.pipelines.dynamic.runner import _run_step_sync
441+
from zenml.execution.step.utils import launch_step
442+
443+
assert self._active_snapshot
431444

432-
_run_step_sync(
445+
launch_step(
433446
snapshot=self._active_snapshot,
434447
step=step,
435448
orchestrator_run_id=self.get_orchestrator_run_id(),
@@ -438,23 +451,41 @@ def run_step(
438451

439452
@property
440453
def supports_dynamic_pipelines(self) -> bool:
454+
"""Whether the orchestrator supports dynamic pipelines.
455+
456+
Returns:
457+
Whether the orchestrator supports dynamic pipelines.
458+
"""
441459
return (
442460
getattr(self.submit_dynamic_pipeline, "__func__", None)
443461
is not BaseOrchestrator.submit_dynamic_pipeline
444462
)
445463

446464
@property
447-
def supports_dynamic_out_of_process_steps(self) -> bool:
465+
def can_launch_dynamic_steps(self) -> bool:
466+
"""Whether the orchestrator can launch dynamic steps.
467+
468+
Returns:
469+
Whether the orchestrator can launch dynamic steps.
470+
"""
448471
return (
449-
getattr(self.run_dynamic_out_of_process_step, "__func__", None)
450-
is not BaseOrchestrator.run_dynamic_out_of_process_step
472+
getattr(self.launch_dynamic_step, "__func__", None)
473+
is not BaseOrchestrator.launch_dynamic_step
451474
)
452475

453-
def run_dynamic_out_of_process_step(
476+
def launch_dynamic_step(
454477
self, step_run_info: "StepRunInfo", environment: Dict[str, str]
455478
) -> None:
479+
"""Launch a dynamic step.
480+
481+
Args:
482+
step_run_info: The step run information.
483+
environment: The environment variables to set in the execution
484+
environment.
485+
"""
456486
raise NotImplementedError(
457-
"Running dynamic out of process steps is not implemented for the orchestrator."
487+
"Launching dynamic steps is not implemented for "
488+
f"the {self.__class__.__name__} orchestrator."
458489
)
459490

460491
@staticmethod

src/zenml/orchestrators/step_launcher.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,8 +481,7 @@ def _run_step(
481481
# the orchestrator doesn't support it.
482482
logger.warning(
483483
"The %s does not support running dynamic out of "
484-
"process steps. Running step `%s` in current "
485-
"thread instead.",
484+
"process steps. Running step `%s` locally instead.",
486485
self._stack.orchestrator.__class__.__name__,
487486
self._invocation_id,
488487
)
@@ -574,7 +573,7 @@ def _run_step_with_dynamic_orchestrator(
574573
stack=self._stack,
575574
)
576575
)
577-
self._stack.orchestrator.run_dynamic_out_of_process_step(
576+
self._stack.orchestrator.launch_dynamic_step(
578577
step_run_info=step_run_info,
579578
environment=environment,
580579
)

0 commit comments

Comments
 (0)