Skip to content

Commit 451a2f6

Browse files
committed
🎨 format: change path type of audit model.
1 parent c3cefb2 commit 451a2f6

File tree

6 files changed

+104
-91
lines changed

6 files changed

+104
-91
lines changed

docs/examples/conf/02-ml-model-training.yml

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
name: "ml-model-training"
22
type: "Workflow"
3-
description: "Machine learning model training pipeline with hyperparameter optimization and model evaluation"
4-
3+
description: |
4+
Machine learning model training pipeline with hyperparameter optimization
5+
and model evaluation
6+
on:
7+
schedule:
8+
- cronjob: "0 0 * * 0" # Weekly on Sunday at midnight
9+
timezone: "UTC"
510
params:
611
dataset_path:
712
type: str
@@ -10,7 +15,10 @@ params:
1015

1116
model_type:
1217
type: choice
13-
options: ["random_forest", "xgboost", "neural_network"]
18+
options:
19+
- "random_forest"
20+
- "xgboost"
21+
- "neural_network"
1422
desc: "Type of model to train"
1523

1624
experiment_name:
@@ -370,8 +378,3 @@ jobs:
370378
channel: "#ml-deployments"
371379
message: "Model ${{ params.experiment_name }} v1 deployed successfully"
372380
color: "good"
373-
374-
on:
375-
schedule:
376-
- cronjob: "0 0 * * 0" # Weekly on Sunday at midnight
377-
timezone: "UTC"

src/ddeutil/workflow/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@
5050
from .__cron import CronRunner
5151
from .__types import DictData, DictStr, Matrix, Re, TupleStr
5252
from .audits import (
53-
EVENT,
53+
DRYRUN,
5454
FORCE,
5555
NORMAL,
5656
RERUN,
5757
Audit,
58-
FileAudit,
58+
LocalFileAudit,
5959
get_audit,
6060
)
6161
from .conf import (

src/ddeutil/workflow/audits.py

Lines changed: 64 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -74,31 +74,38 @@ class ReleaseType(str, Enum):
7474
Attributes:
7575
NORMAL: Standard workflow release execution
7676
RERUN: Re-execution of previously failed workflow
77-
EVENT: Event-triggered workflow execution
77+
DRYRUN: Dry-execution workflow
7878
FORCE: Forced execution bypassing normal conditions
7979
"""
8080

8181
NORMAL = "normal"
8282
RERUN = "rerun"
83-
EVENT = "event"
8483
FORCE = "force"
84+
DRYRUN = "dryrun"
8585

8686

8787
NORMAL = ReleaseType.NORMAL
8888
RERUN = ReleaseType.RERUN
89-
EVENT = ReleaseType.EVENT
89+
DRYRUN = ReleaseType.DRYRUN
9090
FORCE = ReleaseType.FORCE
9191

9292

9393
class AuditData(BaseModel):
94-
"""Audit Data model."""
94+
"""Audit Data model that use to be the core data for any Audit model manage
95+
logging at the target pointer system or service like file-system, sqlite
96+
database, etc.
97+
"""
9598

9699
model_config = ConfigDict(use_enum_values=True)
97100

98101
name: str = Field(description="A workflow name.")
99102
release: datetime = Field(description="A release datetime.")
100103
type: ReleaseType = Field(
101-
default=NORMAL, description="A running type before logging."
104+
default=NORMAL,
105+
description=(
106+
"An execution type that should be value in ('normal', 'rerun', "
107+
"'force', 'dryrun')."
108+
),
102109
)
103110
context: DictData = Field(
104111
default_factory=dict,
@@ -121,18 +128,17 @@ class BaseAudit(BaseModel, ABC):
121128
for logging subclasses like file, sqlite, etc.
122129
"""
123130

124-
type: str
131+
type: Literal["base"] = "base"
132+
logging_name: str = "ddeutil.workflow"
125133
extras: DictData = Field(
126134
default_factory=dict,
127135
description="An extras parameter that want to override core config",
128136
)
129137

130138
@field_validator("extras", mode="before")
131-
def validate_extras(cls, v: Any) -> DictData:
139+
def __prepare_extras(cls, v: Any) -> Any:
132140
"""Validate extras field to ensure it's a dictionary."""
133-
if v is None:
134-
return {}
135-
return v
141+
return {} if v is None else v
136142

137143
@model_validator(mode="after")
138144
def __model_action(self) -> Self:
@@ -148,7 +154,7 @@ def __model_action(self) -> Self:
148154
self.do_before()
149155

150156
# NOTE: Start setting log config in this line with cache.
151-
set_logging("ddeutil.workflow")
157+
set_logging(self.logging_name)
152158
return self
153159

154160
@abstractmethod
@@ -248,27 +254,33 @@ def save(
248254
raise NotImplementedError("Audit should implement `save` method.")
249255

250256

251-
class FileAudit(BaseAudit):
257+
class LocalFileAudit(BaseAudit):
252258
"""File Audit Pydantic Model for saving log data from workflow execution.
253259
254260
This class inherits from BaseAudit and implements file-based storage
255261
for audit logs. It saves workflow execution results to JSON files
256262
in a structured directory hierarchy.
257263
258264
Attributes:
259-
filename_fmt: Class variable defining the filename format for audit files.
265+
file_fmt: Class variable defining the filename format for audit log.
266+
file_release_fmt: Class variable defining the filename format for audit
267+
release log.
260268
"""
261269

262-
filename_fmt: ClassVar[str] = (
263-
"workflow={name}/release={release:%Y%m%d%H%M%S}"
264-
)
270+
file_fmt: ClassVar[str] = "workflow={name}"
271+
file_release_fmt: ClassVar[str] = "release={release:%Y%m%d%H%M%S}"
265272

266273
type: Literal["file"] = "file"
267-
path: str = Field(
268-
default="./audits",
274+
path: Path = Field(
275+
default=Path("./audits"),
269276
description="A file path that use to manage audit logs.",
270277
)
271278

279+
@field_validator("path", mode="before", json_schema_input_type=str)
280+
def __prepare_path(cls, data: Any) -> Any:
281+
"""Prepare path that passing with string to Path instance."""
282+
return Path(data) if isinstance(data, str) else data
283+
272284
def do_before(self) -> None:
273285
"""Create directory of release before saving log file.
274286
@@ -278,7 +290,10 @@ def do_before(self) -> None:
278290
Path(self.path).mkdir(parents=True, exist_ok=True)
279291

280292
def find_audits(
281-
self, name: str, *, extras: Optional[DictData] = None
293+
self,
294+
name: str,
295+
*,
296+
extras: Optional[DictData] = None,
282297
) -> Iterator[AuditData]:
283298
"""Generate audit data found from logs path for a specific workflow name.
284299
@@ -292,7 +307,7 @@ def find_audits(
292307
Raises:
293308
FileNotFoundError: If the workflow directory does not exist.
294309
"""
295-
pointer: Path = Path(self.path) / f"workflow={name}"
310+
pointer: Path = self.path / self.file_fmt.format(name=name)
296311
if not pointer.exists():
297312
raise FileNotFoundError(f"Pointer: {pointer.absolute()}.")
298313

@@ -325,7 +340,7 @@ def find_audit_with_release(
325340
ValueError: If no releases found when release is None.
326341
"""
327342
if release is None:
328-
pointer: Path = Path(self.path) / f"workflow={name}"
343+
pointer: Path = self.path / self.file_fmt.format(name=name)
329344
if not pointer.exists():
330345
raise FileNotFoundError(f"Pointer: {pointer.absolute()}.")
331346

@@ -382,8 +397,10 @@ def pointer(self, data: AuditData) -> Path:
382397
Returns:
383398
Path: The directory path for the current workflow and release.
384399
"""
385-
return Path(self.path) / self.filename_fmt.format(
386-
name=data.name, release=data.release
400+
return (
401+
self.path
402+
/ self.file_fmt.format(**data.model_dump(by_alias=True))
403+
/ self.file_release_fmt.format(**data.model_dump(by_alias=True))
387404
)
388405

389406
def save(self, data: Any, excluded: Optional[list[str]] = None) -> Self:
@@ -459,19 +476,19 @@ def cleanup(self, max_age_days: int = 180) -> int: # pragma: no cov
459476
return cleaned_count
460477

461478

462-
class SQLiteAudit(BaseAudit): # pragma: no cov
479+
class LocalSQLiteAudit(BaseAudit): # pragma: no cov
463480
"""SQLite Audit model for database-based audit storage.
464481
465482
This class inherits from BaseAudit and implements SQLite database storage
466483
for audit logs with compression support.
467484
468485
Attributes:
469486
table_name: Class variable defining the database table name.
470-
schemas: Class variable defining the database schema.
487+
ddl: Class variable defining the database schema.
471488
"""
472489

473490
table_name: ClassVar[str] = "audits"
474-
schemas: ClassVar[
491+
ddl: ClassVar[
475492
str
476493
] = """
477494
CREATE TABLE IF NOT EXISTS audits (
@@ -489,22 +506,21 @@ class SQLiteAudit(BaseAudit): # pragma: no cov
489506
"""
490507

491508
type: Literal["sqlite"] = "sqlite"
492-
path: str
509+
path: Path = Field(
510+
default=Path("./audits.db"),
511+
description="A SQLite filepath.",
512+
)
493513

494-
def _ensure_table_exists(self) -> None:
514+
def do_before(self) -> None:
495515
"""Ensure the audit table exists in the database."""
496-
audit_url = dynamic("audit_url", extras=self.extras)
497-
if audit_url is None or not audit_url.path:
516+
if self.path.is_dir():
498517
raise ValueError(
499-
"SQLite audit_url must specify a database file path"
518+
"SQLite path must specify a database file path not dir."
500519
)
501520

502-
audit_url_parse: ParseResult = urlparse(audit_url)
503-
db_path = Path(audit_url_parse.path)
504-
db_path.parent.mkdir(parents=True, exist_ok=True)
505-
506-
with sqlite3.connect(db_path) as conn:
507-
conn.execute(self.schemas)
521+
self.path.parent.mkdir(parents=True, exist_ok=True)
522+
with sqlite3.connect(self.path) as conn:
523+
conn.execute(self.ddl)
508524
conn.commit()
509525

510526
def is_pointed(
@@ -771,24 +787,31 @@ def cleanup(self, max_age_days: int = 180) -> int:
771787
return cursor.rowcount
772788

773789

790+
class PostgresAudit(BaseAudit, ABC): ... # pragma: no cov
791+
792+
774793
Audit = Annotated[
775794
Union[
776-
FileAudit,
777-
SQLiteAudit,
795+
LocalFileAudit,
796+
LocalSQLiteAudit,
778797
],
779798
Field(discriminator="type"),
780799
]
781800

782801

783-
def get_audit(extras: Optional[DictData] = None) -> Audit: # pragma: no cov
802+
def get_audit(
803+
audit_conf: Optional[DictData] = None,
804+
extras: Optional[DictData] = None,
805+
) -> Audit: # pragma: no cov
784806
"""Get an audit model dynamically based on the config audit path value.
785807
786808
Args:
809+
audit_conf (DictData):
787810
extras: Optional extra parameters to override the core config.
788811
789812
Returns:
790813
Audit: The appropriate audit model class based on configuration.
791814
"""
792-
audit_conf = dynamic("audit_conf", extras=extras)
815+
audit_conf = dynamic("audit_conf", f=audit_conf, extras=extras)
793816
model = TypeAdapter(Audit).validate_python(audit_conf | {"extras": extras})
794817
return model

src/ddeutil/workflow/traces.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ class Metadata(BaseModel): # pragma: no cov
239239
default=None, description="Environment (dev, staging, prod)."
240240
)
241241

242-
# System context
242+
# NOTE: System context
243243
hostname: Optional[str] = Field(
244244
default=None, description="Hostname where workflow is running."
245245
)
@@ -253,7 +253,7 @@ class Metadata(BaseModel): # pragma: no cov
253253
default=None, description="Workflow package version."
254254
)
255255

256-
# Custom metadata
256+
# NOTE: Custom metadata
257257
tags: Optional[list[str]] = Field(
258258
default_factory=list, description="Custom tags for categorization."
259259
)
@@ -320,6 +320,8 @@ def make(
320320
import socket
321321
import sys
322322

323+
from .__about__ import __version__
324+
323325
frame: Optional[FrameType] = currentframe()
324326
if frame is None:
325327
raise ValueError("Cannot get current frame")
@@ -384,7 +386,7 @@ def make(
384386
hostname=hostname,
385387
ip_address=ip_address,
386388
python_version=python_version,
387-
package_version=extras_data.get("package_version"),
389+
package_version=__version__,
388390
# NOTE: Custom metadata
389391
tags=extras_data.get("tags", []),
390392
metadata=extras_data.get("metadata", {}),
@@ -2046,7 +2048,7 @@ def get_trace(
20462048
Args:
20472049
run_id (str): A running ID.
20482050
parent_run_id (str | None, default None): A parent running ID.
2049-
handlers:
2051+
handlers (list):
20502052
extras: An extra parameter that want to override the core
20512053
config values.
20522054
auto_pre_process (bool, default False)
@@ -2057,7 +2059,7 @@ def get_trace(
20572059
handlers: list[DictData] = dynamic(
20582060
"trace_handlers", f=handlers, extras=extras
20592061
)
2060-
trace = Trace.model_validate(
2062+
trace: Trace = Trace.model_validate(
20612063
{
20622064
"run_id": run_id,
20632065
"parent_run_id": parent_run_id,

src/ddeutil/workflow/utils.py

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,6 @@
88
This module provides essential utility functions used throughout the workflow
99
system for ID generation, datetime handling, string processing, template
1010
operations, and other common tasks.
11-
12-
Functions:
13-
to_train: Convert camel case strings to train case format
14-
prepare_newline: Format messages with multiple newlines
15-
replace_sec: Replace seconds and microseconds in datetime objects
16-
clear_tz: Clear timezone info from datetime objects
17-
get_dt_now: Get current datetime with timezone
18-
get_d_now: Get current date
19-
get_diff_sec: Calculate time difference in seconds
20-
reach_next_minute: Check if datetime reaches next minute
21-
wait_until_next_minute: Wait until next minute
22-
delay: Add random delay to execution
23-
gen_id: Generate unique identifiers for workflow components
24-
default_gen_id: Generate default running ID
25-
make_exec: Make files executable
26-
filter_func: Filter function objects from data structures
27-
cross_product: Generate cross product of matrix values
28-
cut_id: Cut running ID to specified length
29-
dump_all: Serialize nested BaseModel objects to dictionaries
30-
obj_name: Get object name or class name
3111
"""
3212
from __future__ import annotations
3313

@@ -327,6 +307,8 @@ def cut_id(run_id: str, *, num: int = 8) -> str:
327307
Example:
328308
>>> cut_id(run_id='20240101081330000000T1354680202')
329309
'202401010813680202'
310+
>>> cut_id(run_id='20240101081330000000T1354680202')
311+
'54680202'
330312
331313
Args:
332314
run_id: A running ID to cut.

0 commit comments

Comments
 (0)