Skip to content

Commit 4174dd8

Browse files
committed
🎯 feat: update retry strategy on job execution.
1 parent 634f7a2 commit 4174dd8

File tree

3 files changed

+175
-67
lines changed

3 files changed

+175
-67
lines changed

src/ddeutil/workflow/job.py

Lines changed: 146 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@
5656
from pydantic.functional_validators import field_validator, model_validator
5757
from typing_extensions import Self
5858

59+
from . import JobSkipError
5960
from .__types import DictData, DictStr, Matrix, StrOrNone
61+
from .conf import pass_env
6062
from .errors import JobCancelError, JobError, to_dict
6163
from .result import (
6264
CANCEL,
@@ -452,41 +454,22 @@ class Job(BaseModel):
452454
execution, dependency management, conditional execution, and multienvironment
453455
deployment.
454456
455-
Jobs are the primary execution units within workflows, providing:
456-
- Stage lifecycle management
457-
- Execution environment abstraction
458-
- Matrix strategy support for parallel execution
459-
- Dependency resolution via job needs
460-
- Output coordination between stages
461-
462-
Attributes:
463-
id (str, optional): Unique job identifier within workflow
464-
desc (str, optional): Job description in Markdown format
465-
runs_on (RunsOnModel): Execution environment configuration
466-
condition (str, optional): Conditional execution expression
467-
stages (list[Stage]): Ordered list of stages to execute
468-
trigger_rule (Rule): Rule for handling job dependencies
469-
needs (list[str]): List of prerequisite job IDs
470-
strategy (Strategy): Matrix strategy for parameterized execution
471-
extras (dict): Additional configuration parameters
472-
473457
Example:
474-
```python
475-
job = Job(
476-
id="data-processing",
477-
desc="Process daily data files",
478-
runs_on=OnLocal(),
479-
stages=[
480-
EmptyStage(name="Start", echo="Processing started"),
481-
PyStage(name="Process", run="process_data()"),
482-
EmptyStage(name="Complete", echo="Processing finished")
483-
],
484-
strategy=Strategy(
485-
matrix={'env': ['dev', 'prod']},
486-
max_parallel=2
487-
)
488-
)
489-
```
458+
>>> from ddeutil.workflow.stages import EmptyStage, PyStage
459+
>>> job = Job(
460+
... id="data-processing",
461+
... desc="Process daily data files",
462+
... runs_on=OnLocal(),
463+
... stages=[
464+
... EmptyStage(name="Start", echo="Processing started"),
465+
... PyStage(name="Process", run="process_data()"),
466+
... EmptyStage(name="Complete", echo="Processing finished")
467+
... ],
468+
... strategy=Strategy(
469+
... matrix={'env': ['dev', 'prod']},
470+
... max_parallel=2
471+
... )
472+
... )
490473
"""
491474

492475
id: StrOrNone = Field(
@@ -514,6 +497,15 @@ class Job(BaseModel):
514497
default_factory=list,
515498
description="A list of Stage model of this job.",
516499
)
500+
retry: int = Field(
501+
default=0,
502+
ge=0,
503+
lt=20,
504+
description=(
505+
"A retry number if job route execution got the error exclude skip "
506+
"and cancel exception class."
507+
),
508+
)
517509
trigger_rule: Rule = Field(
518510
default=Rule.ALL_SUCCESS,
519511
validate_default=True,
@@ -751,7 +743,7 @@ def is_skipped(self, params: DictData) -> bool:
751743
# should use the `re` module to validate eval-string before
752744
# running.
753745
rs: bool = eval(
754-
param2template(self.condition, params, extras=self.extras),
746+
self.pass_template(self.condition, params),
755747
globals() | params,
756748
{},
757749
)
@@ -802,8 +794,9 @@ def set_outputs(
802794
extract from the result context if it exists. If it does not found, it
803795
will not set on the received context.
804796
805-
:raise JobError: If the job's ID does not set and the setting
806-
default job ID flag does not set.
797+
Raises:
798+
JobError: If the job's ID does not set and the setting default job
799+
ID flag does not set.
807800
808801
Args:
809802
output: (DictData) A result data context that want to extract
@@ -854,8 +847,9 @@ def get_outputs(
854847
"""Get the outputs from jobs data. It will get this job ID or passing
855848
custom ID from the job outputs mapping.
856849
857-
:param output: (DictData) A job outputs data that want to extract
858-
:param job_id: (StrOrNone) A job ID if the `id` field does not set.
850+
Args:
851+
output (DictData): A job outputs data that want to extract
852+
job_id (StrOrNone): A job ID if the `id` field does not set.
859853
860854
:rtype: DictData
861855
"""
@@ -865,14 +859,28 @@ def get_outputs(
865859
else:
866860
return output.get("jobs", {}).get(_id, {})
867861

868-
def _execute(
862+
def pass_template(self, value: Any, params: DictData) -> Any:
863+
"""Pass template and environment variable to any value that can
864+
templating.
865+
866+
Args:
867+
value (Any): An any value.
868+
params (DictData): A parameter data that want to use in this
869+
execution.
870+
871+
Returns:
872+
Any: A templated value.
873+
"""
874+
return pass_env(param2template(value, params, extras=self.extras))
875+
876+
def process(
869877
self,
870878
params: DictData,
871879
run_id: str,
872880
parent_run_id: Optional[str] = None,
873881
event: Optional[Event] = None,
874882
) -> Result:
875-
""""""
883+
"""Process job method."""
876884
trace: Trace = get_trace(
877885
run_id, parent_run_id=parent_run_id, extras=self.extras
878886
)
@@ -953,6 +961,77 @@ def _execute(
953961
extras=self.extras,
954962
)
955963

964+
def _execute(
965+
self,
966+
params: DictData,
967+
run_id: str,
968+
parent_run_id: Optional[str] = None,
969+
event: Optional[Event] = None,
970+
) -> Result:
971+
"""Wrapped the route execute method before returning to handler
972+
execution.
973+
974+
Args:
975+
params: A parameter data that want to use in this execution
976+
run_id:
977+
parent_run_id:
978+
event:
979+
980+
Returns:
981+
Result: The wrapped execution result.
982+
"""
983+
current_retry: int = 0
984+
exception: Exception
985+
trace: Trace = get_trace(
986+
run_id, parent_run_id=parent_run_id, extras=self.extras
987+
)
988+
try:
989+
return self.process(
990+
params,
991+
run_id,
992+
parent_run_id=parent_run_id,
993+
event=event,
994+
)
995+
except (JobCancelError, JobSkipError):
996+
trace.debug("[JOB]: process raise skip or cancel error.")
997+
raise
998+
except Exception as e:
999+
current_retry += 1
1000+
exception = e
1001+
finally:
1002+
trace.debug("[JOB]: Failed at the first execution.")
1003+
1004+
if self.retry == 0:
1005+
raise exception
1006+
1007+
trace.warning(
1008+
f"[JOB]: Retry count: {current_retry} ... "
1009+
f"( {exception.__class__.__name__} )"
1010+
)
1011+
while current_retry < (self.retry + 1):
1012+
try:
1013+
return self.process(
1014+
params,
1015+
run_id,
1016+
parent_run_id=parent_run_id,
1017+
event=event,
1018+
)
1019+
except (JobCancelError, JobSkipError):
1020+
trace.debug("[JOB]: process raise skip or cancel error.")
1021+
raise
1022+
except exception as e:
1023+
current_retry += 1
1024+
trace.warning(
1025+
f"[JOB]: Retry count: {current_retry} ... "
1026+
f"( {e.__class__.__name__} )"
1027+
)
1028+
exception = e
1029+
time.sleep(1.2**current_retry)
1030+
1031+
# NOTE: Raise because
1032+
trace.error(f"[JOB]: Reach the maximum of retry number: {self.retry}.")
1033+
raise exception
1034+
9561035
def execute(
9571036
self,
9581037
params: DictData,
@@ -984,24 +1063,31 @@ def execute(
9841063
trace: Trace = get_trace(
9851064
run_id, parent_run_id=parent_run_id, extras=self.extras
9861065
)
987-
trace.info(f"[JOB]: Handler {self.id or 'EMPTY'}")
988-
result_caught: Result = self._execute(
989-
params,
990-
run_id=run_id,
991-
parent_run_id=parent_run_id,
992-
event=event,
993-
)
994-
return result_caught.make_info(
995-
{"execution_time": time.monotonic() - ts}
996-
)
1066+
try:
1067+
trace.info(
1068+
f"[JOB]: Handler {self.runs_on.type.name}: "
1069+
f"{(self.id or 'EMPTY')!r}."
1070+
)
1071+
result_caught: Result = self._execute(
1072+
params,
1073+
run_id=run_id,
1074+
parent_run_id=parent_run_id,
1075+
event=event,
1076+
)
1077+
return result_caught.make_info(
1078+
{"execution_time": time.monotonic() - ts}
1079+
)
1080+
finally:
1081+
trace.debug("[JOB]: End Handler job execution.")
9971082

9981083

9991084
def mark_errors(context: DictData, error: JobError) -> None:
10001085
"""Make the errors context result with the refs value depends on the nested
10011086
execute func.
10021087
1003-
:param context: (DictData) A context data.
1004-
:param error: (JobError) A stage exception object.
1088+
Args:
1089+
context (DictData): A context data.
1090+
error (JobError): A stage exception object.
10051091
"""
10061092
if "errors" in context:
10071093
context["errors"][error.refs] = error.to_dict()
@@ -1010,6 +1096,9 @@ def mark_errors(context: DictData, error: JobError) -> None:
10101096

10111097

10121098
def pop_stages(context: DictData) -> DictData:
1099+
"""Pop a stages key from the context data. It will return empty dict if it
1100+
does not exist.
1101+
"""
10131102
return filter_func(context.pop("stages", {}))
10141103

10151104

@@ -1045,9 +1134,10 @@ def local_execute_strategy(
10451134
:param event: (Event) An Event manager instance that use to cancel this
10461135
execution if it forces stopped by parent execution.
10471136
1048-
:raise JobError: If event was set.
1049-
:raise JobError: If stage execution raise any error as `StageError`.
1050-
:raise JobError: If the result from execution has `FAILED` status.
1137+
Raises:
1138+
JobError: If event was set.
1139+
JobError: If stage execution raise any error as `StageError`.
1140+
JobError: If the result from execution has `FAILED` status.
10511141
10521142
:rtype: tuple[Status, DictData]
10531143
"""
@@ -1423,10 +1513,6 @@ def self_hosted_execute(
14231513
)
14241514

14251515

1426-
# Azure Batch execution is now handled by the Azure Batch provider
1427-
# See src/ddeutil/workflow/plugins/providers/az.py for implementation
1428-
1429-
14301516
def docker_execution(
14311517
job: Job,
14321518
params: DictData,

src/ddeutil/workflow/plugins/providers/az.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@
8989
from azure.core.exceptions import AzureError
9090
from azure.storage.blob import BlobServiceClient
9191

92-
AZURE_AVAILABLE = True
92+
AZURE_AVAILABLE: bool = True
9393
except ImportError:
94-
AZURE_AVAILABLE = False
94+
AZURE_AVAILABLE: bool = False
9595

9696
from pydantic import BaseModel, Field
9797

0 commit comments

Comments
 (0)