Skip to content

Commit c297164

Browse files
committed
🎯 feat: add dryrun mode to stage exec.
1 parent 0590582 commit c297164

File tree

4 files changed

+103
-16
lines changed

4 files changed

+103
-16
lines changed

src/ddeutil/workflow/stages.py

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -610,12 +610,31 @@ def dryrun(
610610
*,
611611
parent_run_id: Optional[str] = None,
612612
event: Optional[Event] = None,
613-
) -> Optional[Result]: # pragma: no cov
613+
) -> Optional[Result]:
614614
"""Pre-process method that will use to run with dry-run mode, and it
615615
should be used before process method.
616+
617+
Args:
618+
params (DictData): A parameter data that want to use in this
619+
execution.
620+
run_id (str): A running stage ID.
621+
context (DictData): A context data.
622+
parent_run_id: A parent running ID. (Default is None)
623+
event: An event manager that use to track parent process
624+
was not force stopped.
625+
626+
Returns:
627+
Result: The execution result with status and context data.
616628
"""
629+
trace: Trace = get_trace(
630+
run_id, parent_run_id=parent_run_id, extras=self.extras
631+
)
632+
trace.debug("[STAGE]: Start Dryrun ...")
633+
return self.to_empty().process(
634+
params, run_id, context, parent_run_id=parent_run_id, event=event
635+
)
617636

618-
def to_empty(self, sleep: int = 0.35) -> EmptyStage: # pragma: no cov
637+
def to_empty(self, sleep: int = 0.35) -> EmptyStage:
619638
"""Convert the current Stage model to the EmptyStage model for dry-run
620639
mode if the `action_stage` class attribute has set.
621640
@@ -627,14 +646,14 @@ def to_empty(self, sleep: int = 0.35) -> EmptyStage: # pragma: no cov
627646
message.
628647
"""
629648
if isinstance(self, EmptyStage):
630-
return self
649+
return self.model_copy(update={"sleep": sleep})
631650
return EmptyStage.model_validate(
632651
{
633652
"name": self.name,
634653
"id": self.id,
635654
"desc": self.desc,
636655
"if": self.condition,
637-
"echo": f"Convert from {self.__class__.__name__}",
656+
"echo": f"Convert from {self.__class__.__name__} to EmptyStage",
638657
"sleep": sleep,
639658
}
640659
)
@@ -852,18 +871,20 @@ def _execute(
852871
trace: Trace = get_trace(
853872
run_id, parent_run_id=parent_run_id, extras=self.extras
854873
)
855-
model: Union[Self, EmptyStage] = (
856-
self.to_empty()
874+
# NOTE: First execution for not pass to retry step if it passes.
875+
try:
857876
if (
858877
self.extras.get("__sys_release_dryrun_mode", False)
859878
and self.action_stage
860-
)
861-
else self
862-
)
863-
864-
# NOTE: First execution for not pass to retry step if it passes.
865-
try:
866-
return model.process(
879+
):
880+
return self.dryrun(
881+
params | {"retry": current_retry},
882+
run_id=run_id,
883+
context=context,
884+
parent_run_id=parent_run_id,
885+
event=event,
886+
)
887+
return self.process(
867888
params | {"retry": current_retry},
868889
run_id=run_id,
869890
context=context,
@@ -889,7 +910,18 @@ def _execute(
889910
status=WAIT,
890911
updated={"retry": current_retry},
891912
)
892-
return model.process(
913+
if (
914+
self.extras.get("__sys_release_dryrun_mode", False)
915+
and self.action_stage
916+
):
917+
return self.dryrun(
918+
params | {"retry": current_retry},
919+
run_id=run_id,
920+
context=context,
921+
parent_run_id=parent_run_id,
922+
event=event,
923+
)
924+
return self.process(
893925
params | {"retry": current_retry},
894926
run_id=run_id,
895927
context=context,
@@ -953,6 +985,17 @@ async def _axecute(
953985

954986
# NOTE: First execution for not pass to retry step if it passes.
955987
try:
988+
if (
989+
self.extras.get("__sys_release_dryrun_mode", False)
990+
and self.action_stage
991+
):
992+
return self.dryrun(
993+
params | {"retry": current_retry},
994+
run_id=run_id,
995+
context=context,
996+
parent_run_id=parent_run_id,
997+
event=event,
998+
)
956999
return await model.async_process(
9571000
params | {"retry": current_retry},
9581001
run_id=run_id,
@@ -979,6 +1022,17 @@ async def _axecute(
9791022
status=WAIT,
9801023
updated={"retry": current_retry},
9811024
)
1025+
if (
1026+
self.extras.get("__sys_release_dryrun_mode", False)
1027+
and self.action_stage
1028+
):
1029+
return self.dryrun(
1030+
params | {"retry": current_retry},
1031+
run_id=run_id,
1032+
context=context,
1033+
parent_run_id=parent_run_id,
1034+
event=event,
1035+
)
9821036
return await model.async_process(
9831037
params | {"retry": current_retry},
9841038
run_id=run_id,

tests/stages/test_stage_call.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,19 @@ def test_call_stage_exec_necessary_args():
8080
}
8181

8282

83+
def test_call_stage():
84+
stage: Stage = CallStage.model_validate(
85+
obj={
86+
"name": "Extract & Load Local System",
87+
"id": "second-job",
88+
"uses": "tasks/simple-task@demo",
89+
"with": {"source": "src", "sink": "sink"},
90+
}
91+
)
92+
empty_stage = stage.to_empty()
93+
assert empty_stage.sleep == 0.35
94+
95+
8396
def test_call_stage_exec():
8497
stage: Stage = CallStage.model_validate(
8598
obj={

tests/stages/test_stage_empty.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ def test_empty_stage():
3333
)
3434
assert stage.desc == "This is a test stage\n\tnewline"
3535

36+
empty_stage = stage.to_empty(sleep=10)
37+
assert empty_stage.sleep == 10
38+
3639

3740
def test_empty_stage_execute():
3841
stage: EmptyStage = EmptyStage(name="Empty Stage", echo="hello world")

tests/test_workflow_release.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,25 @@ def test_workflow_release_dryrun():
257257
}
258258
)
259259
rs: Result = workflow.release(
260-
release=datetime.now(),
260+
release=datetime(2024, 10, 1),
261261
params={},
262262
release_type=DRYRUN,
263263
)
264-
print(rs)
264+
assert rs.status == SUCCESS
265+
assert rs.context == {
266+
"status": "SUCCESS",
267+
"params": {},
268+
"release": {
269+
"type": DRYRUN,
270+
"logical_date": datetime(2024, 10, 1, tzinfo=ZoneInfo(key="UTC")),
271+
},
272+
"jobs": {
273+
"first-job": {
274+
"status": SUCCESS,
275+
"stages": {
276+
"7782830343": {"outputs": {}, "status": SUCCESS},
277+
"second-stage": {"outputs": {}, "status": SUCCESS},
278+
},
279+
},
280+
},
281+
}

0 commit comments

Comments
 (0)