Skip to content

Commit 3b8c47a

Browse files
committed
🎯 feat: update trace module.
1 parent 2ca8df8 commit 3b8c47a

File tree

2 files changed

+105
-202
lines changed

2 files changed

+105
-202
lines changed

src/ddeutil/workflow/job.py

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -865,34 +865,14 @@ def get_outputs(
865865
else:
866866
return output.get("jobs", {}).get(_id, {})
867867

868-
def execute(
868+
def _execute(
869869
self,
870870
params: DictData,
871-
*,
872-
run_id: StrOrNone = None,
871+
run_id: str,
872+
parent_run_id: Optional[str] = None,
873873
event: Optional[Event] = None,
874874
) -> Result:
875-
"""Job execution with passing dynamic parameters from the workflow
876-
execution. It will generate matrix values at the first step and run
877-
multithread on this metrics to the `stages` field of this job.
878-
879-
This method be execution routing for call dynamic execution function
880-
with specific target `runs-on` value.
881-
882-
Args
883-
params: (DictData) A parameter context that also pass from the
884-
workflow execute method.
885-
run_id: (str) An execution running ID.
886-
event: (Event) An Event manager instance that use to cancel this
887-
execution if it forces stopped by parent execution.
888-
889-
Returns
890-
Result: Return Result object that create from execution context.
891-
"""
892-
ts: float = time.monotonic()
893-
parent_run_id, run_id = extract_id(
894-
(self.id or "EMPTY"), run_id=run_id, extras=self.extras
895-
)
875+
""""""
896876
trace: Trace = get_trace(
897877
run_id, parent_run_id=parent_run_id, extras=self.extras
898878
)
@@ -908,7 +888,7 @@ def execute(
908888
params,
909889
run_id=parent_run_id,
910890
event=event,
911-
).make_info({"execution_time": time.monotonic() - ts})
891+
)
912892
elif self.runs_on.type == SELF_HOSTED: # pragma: no cov
913893
pass
914894
elif self.runs_on.type == AZ_BATCH: # pragma: no cov
@@ -919,14 +899,14 @@ def execute(
919899
params,
920900
run_id=parent_run_id,
921901
event=event,
922-
).make_info({"execution_time": time.monotonic() - ts})
902+
)
923903
elif self.runs_on.type == DOCKER: # pragma: no cov
924904
return docker_execution(
925905
self,
926906
params,
927907
run_id=parent_run_id,
928908
event=event,
929-
).make_info({"execution_time": time.monotonic() - ts})
909+
)
930910
elif self.runs_on.type == CONTAINER: # pragma: no cov
931911
from .plugins.providers.container import container_execute
932912

@@ -935,7 +915,7 @@ def execute(
935915
params,
936916
run_id=parent_run_id,
937917
event=event,
938-
).make_info({"execution_time": time.monotonic() - ts})
918+
)
939919
elif self.runs_on.type == AWS_BATCH: # pragma: no cov
940920
from .plugins.providers.aws import aws_batch_execute
941921

@@ -944,7 +924,7 @@ def execute(
944924
params,
945925
run_id=parent_run_id,
946926
event=event,
947-
).make_info({"execution_time": time.monotonic() - ts})
927+
)
948928
elif self.runs_on.type == GCP_BATCH: # pragma: no cov
949929
from .plugins.providers.gcs import gcp_batch_execute
950930

@@ -953,7 +933,7 @@ def execute(
953933
params,
954934
run_id=parent_run_id,
955935
event=event,
956-
).make_info({"execution_time": time.monotonic() - ts})
936+
)
957937

958938
trace.error(
959939
f"[JOB]: Execution not support runs-on: {self.runs_on.type.value!r} "
@@ -970,10 +950,51 @@ def execute(
970950
f"not support yet."
971951
).to_dict(),
972952
},
973-
info={"execution_time": time.monotonic() - ts},
974953
extras=self.extras,
975954
)
976955

956+
def execute(
957+
self,
958+
params: DictData,
959+
*,
960+
run_id: StrOrNone = None,
961+
event: Optional[Event] = None,
962+
) -> Result:
963+
"""Job execution with passing dynamic parameters from the workflow
964+
execution. It will generate matrix values at the first step and run
965+
multithread on this metrics to the `stages` field of this job.
966+
967+
This method be execution routing for call dynamic execution function
968+
with specific target `runs-on` value.
969+
970+
Args
971+
params: (DictData) A parameter context that also pass from the
972+
workflow execute method.
973+
run_id: (str) An execution running ID.
974+
event: (Event) An Event manager instance that use to cancel this
975+
execution if it forces stopped by parent execution.
976+
977+
Returns
978+
Result: Return Result object that create from execution context.
979+
"""
980+
ts: float = time.monotonic()
981+
parent_run_id, run_id = extract_id(
982+
(self.id or "EMPTY"), run_id=run_id, extras=self.extras
983+
)
984+
trace: Trace = get_trace(
985+
run_id, parent_run_id=parent_run_id, extras=self.extras
986+
)
987+
trace.info(f"[JOB]: Handler {self.id or 'EMPTY'}")
988+
result_caught: Result = self._execute(
989+
params,
990+
run_id=run_id,
991+
parent_run_id=parent_run_id,
992+
event=event,
993+
)
994+
return result_caught.make_info(
995+
{"execution_time": time.monotonic() - ts}
996+
)
997+
977998

978999
def mark_errors(context: DictData, error: JobError) -> None:
9791000
"""Make the errors context result with the refs value depends on the nested

0 commit comments

Comments
 (0)