Skip to content

Commit 1ee4327

Browse files
committed
🎯 feat: update trace module.
1 parent 2ca8df8 commit 1ee4327

File tree

5 files changed

+151
-232
lines changed

5 files changed

+151
-232
lines changed

src/ddeutil/workflow/audits.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060

6161
from .__types import DictData
6262
from .conf import dynamic
63-
from .traces import Trace, get_trace, set_logging
63+
from .traces import Trace, get_trace
6464

6565
logger = logging.getLogger("ddeutil.workflow")
6666

@@ -129,7 +129,6 @@ class BaseAudit(BaseModel, ABC):
129129
"""
130130

131131
type: Literal["base"] = "base"
132-
logging_name: str = "ddeutil.workflow"
133132
extras: DictData = Field(
134133
default_factory=dict,
135134
description="An extras parameter that want to override core config",
@@ -152,9 +151,6 @@ def __model_action(self) -> Self:
152151
"""
153152
if dynamic("enable_write_audit", extras=self.extras):
154153
self.do_before()
155-
156-
# NOTE: Start setting log config in this line with cache.
157-
set_logging(self.logging_name)
158154
return self
159155

160156
@abstractmethod

src/ddeutil/workflow/conf.py

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ def registry_filter(self) -> list[str]:
101101
"""Register Filter that is a list of importable string for the filter
102102
template.
103103
104-
:rtype: list[str]
104+
Returns:
105+
list[str]: A list of module import string.
105106
"""
106107
regis_filter_str: str = env(
107108
"CORE_REGISTRY_FILTER", "ddeutil.workflow.templates"
@@ -130,17 +131,6 @@ def log_tz(self) -> ZoneInfo:
130131
"""
131132
return ZoneInfo(env("LOG_TIMEZONE", "UTC"))
132133

133-
@property
134-
def log_format(self) -> str:
135-
return env(
136-
"LOG_FORMAT",
137-
(
138-
"%(asctime)s.%(msecs)03d (%(process)-5d, "
139-
"%(thread)-5d) [%(levelname)-7s] (%(cut_id)s) %(message)-120s "
140-
"(%(filename)s:%(lineno)s) (%(name)-10s)"
141-
),
142-
)
143-
144134
@property
145135
def audit_conf(self) -> dict[str, Any]:
146136
return json.loads(
@@ -151,10 +141,6 @@ def audit_conf(self) -> dict[str, Any]:
151141
def enable_write_audit(self) -> bool:
152142
return str2bool(env("LOG_AUDIT_ENABLE_WRITE", "false"))
153143

154-
@property
155-
def log_datetime_format(self) -> str:
156-
return env("LOG_DATETIME_FORMAT", "%Y-%m-%d %H:%M:%S")
157-
158144
@property
159145
def stage_default_id(self) -> bool:
160146
return str2bool(env("CORE_STAGE_DEFAULT_ID", "false"))
@@ -441,7 +427,8 @@ def type(self) -> str:
441427
"""Return object of string type which implement on any registry. The
442428
object type.
443429
444-
:rtype: str
430+
Returns:
431+
str: A type that get from config data.
445432
"""
446433
if _typ := self.data.get("type"):
447434
return _typ

src/ddeutil/workflow/job.py

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -865,34 +865,14 @@ def get_outputs(
865865
else:
866866
return output.get("jobs", {}).get(_id, {})
867867

868-
def execute(
868+
def _execute(
869869
self,
870870
params: DictData,
871-
*,
872-
run_id: StrOrNone = None,
871+
run_id: str,
872+
parent_run_id: Optional[str] = None,
873873
event: Optional[Event] = None,
874874
) -> Result:
875-
"""Job execution with passing dynamic parameters from the workflow
876-
execution. It will generate matrix values at the first step and run
877-
multithread on this metrics to the `stages` field of this job.
878-
879-
This method be execution routing for call dynamic execution function
880-
with specific target `runs-on` value.
881-
882-
Args
883-
params: (DictData) A parameter context that also pass from the
884-
workflow execute method.
885-
run_id: (str) An execution running ID.
886-
event: (Event) An Event manager instance that use to cancel this
887-
execution if it forces stopped by parent execution.
888-
889-
Returns
890-
Result: Return Result object that create from execution context.
891-
"""
892-
ts: float = time.monotonic()
893-
parent_run_id, run_id = extract_id(
894-
(self.id or "EMPTY"), run_id=run_id, extras=self.extras
895-
)
875+
""""""
896876
trace: Trace = get_trace(
897877
run_id, parent_run_id=parent_run_id, extras=self.extras
898878
)
@@ -908,7 +888,7 @@ def execute(
908888
params,
909889
run_id=parent_run_id,
910890
event=event,
911-
).make_info({"execution_time": time.monotonic() - ts})
891+
)
912892
elif self.runs_on.type == SELF_HOSTED: # pragma: no cov
913893
pass
914894
elif self.runs_on.type == AZ_BATCH: # pragma: no cov
@@ -919,14 +899,14 @@ def execute(
919899
params,
920900
run_id=parent_run_id,
921901
event=event,
922-
).make_info({"execution_time": time.monotonic() - ts})
902+
)
923903
elif self.runs_on.type == DOCKER: # pragma: no cov
924904
return docker_execution(
925905
self,
926906
params,
927907
run_id=parent_run_id,
928908
event=event,
929-
).make_info({"execution_time": time.monotonic() - ts})
909+
)
930910
elif self.runs_on.type == CONTAINER: # pragma: no cov
931911
from .plugins.providers.container import container_execute
932912

@@ -935,7 +915,7 @@ def execute(
935915
params,
936916
run_id=parent_run_id,
937917
event=event,
938-
).make_info({"execution_time": time.monotonic() - ts})
918+
)
939919
elif self.runs_on.type == AWS_BATCH: # pragma: no cov
940920
from .plugins.providers.aws import aws_batch_execute
941921

@@ -944,7 +924,7 @@ def execute(
944924
params,
945925
run_id=parent_run_id,
946926
event=event,
947-
).make_info({"execution_time": time.monotonic() - ts})
927+
)
948928
elif self.runs_on.type == GCP_BATCH: # pragma: no cov
949929
from .plugins.providers.gcs import gcp_batch_execute
950930

@@ -953,7 +933,7 @@ def execute(
953933
params,
954934
run_id=parent_run_id,
955935
event=event,
956-
).make_info({"execution_time": time.monotonic() - ts})
936+
)
957937

958938
trace.error(
959939
f"[JOB]: Execution not support runs-on: {self.runs_on.type.value!r} "
@@ -970,10 +950,51 @@ def execute(
970950
f"not support yet."
971951
).to_dict(),
972952
},
973-
info={"execution_time": time.monotonic() - ts},
974953
extras=self.extras,
975954
)
976955

956+
def execute(
957+
self,
958+
params: DictData,
959+
*,
960+
run_id: StrOrNone = None,
961+
event: Optional[Event] = None,
962+
) -> Result:
963+
"""Job execution with passing dynamic parameters from the workflow
964+
execution. It will generate matrix values at the first step and run
965+
multithread on this metrics to the `stages` field of this job.
966+
967+
This method be execution routing for call dynamic execution function
968+
with specific target `runs-on` value.
969+
970+
Args
971+
params: (DictData) A parameter context that also pass from the
972+
workflow execute method.
973+
run_id: (str) An execution running ID.
974+
event: (Event) An Event manager instance that use to cancel this
975+
execution if it forces stopped by parent execution.
976+
977+
Returns
978+
Result: Return Result object that create from execution context.
979+
"""
980+
ts: float = time.monotonic()
981+
parent_run_id, run_id = extract_id(
982+
(self.id or "EMPTY"), run_id=run_id, extras=self.extras
983+
)
984+
trace: Trace = get_trace(
985+
run_id, parent_run_id=parent_run_id, extras=self.extras
986+
)
987+
trace.info(f"[JOB]: Handler {self.id or 'EMPTY'}")
988+
result_caught: Result = self._execute(
989+
params,
990+
run_id=run_id,
991+
parent_run_id=parent_run_id,
992+
event=event,
993+
)
994+
return result_caught.make_info(
995+
{"execution_time": time.monotonic() - ts}
996+
)
997+
977998

978999
def mark_errors(context: DictData, error: JobError) -> None:
9791000
"""Make the errors context result with the refs value depends on the nested

0 commit comments

Comments
 (0)