-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.py
More file actions
126 lines (102 loc) · 3.38 KB
/
models.py
File metadata and controls
126 lines (102 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import datetime
from enum import StrEnum
from pydantic import model_validator
from job_executor.adapter.fs.models.datastore_versions import (
DatastoreVersion,
)
from job_executor.common.models import CamelModel
class MaintenanceStatus(CamelModel):
paused: bool
msg: str
timestamp: str
class JobStatus(StrEnum):
QUEUED = "queued"
INITIATED = "initiated"
DECRYPTING = "decrypting"
VALIDATING = "validating"
TRANSFORMING = "transforming"
PSEUDONYMIZING = "pseudonymizing"
ENRICHING = "enriching"
CONVERTING = "converting"
PARTITIONING = "partitioning"
BUILT = "built"
IMPORTING = "importing"
COMPLETED = "completed"
FAILED = "failed"
class Operation(StrEnum):
BUMP = "BUMP"
ADD = "ADD"
CHANGE = "CHANGE"
PATCH_METADATA = "PATCH_METADATA"
SET_STATUS = "SET_STATUS"
DELETE_DRAFT = "DELETE_DRAFT"
REMOVE = "REMOVE"
ROLLBACK_REMOVE = "ROLLBACK_REMOVE"
DELETE_ARCHIVE = "DELETE_ARCHIVE"
GENERATE_RSA_KEYS = "GENERATE_RSA_KEYS"
class ReleaseStatus(StrEnum):
DRAFT = "DRAFT"
PENDING_RELEASE = "PENDING_RELEASE"
PENDING_DELETE = "PENDING_DELETE"
class UserInfo(CamelModel, extra="forbid"):
user_id: str
first_name: str
last_name: str
class JobParameters(CamelModel, use_enum_values=True):
operation: Operation
target: str
bump_manifesto: DatastoreVersion | None = None
description: str | None = None
release_status: ReleaseStatus | None = None
bump_from_version: str | None = None
bump_to_version: str | None = None
@model_validator(mode="after")
def validate_job_type(self) -> "JobParameters":
operation: Operation = self.operation
if operation == Operation.BUMP and (
self.bump_manifesto is None
or self.description is None
or self.bump_from_version is None
or self.bump_to_version is None
or self.target != "DATASTORE"
):
raise ValueError("No supplied bump manifesto for BUMP operation")
elif operation == Operation.REMOVE and self.description is None:
raise ValueError("Missing parameters for REMOVE operation")
elif operation == Operation.SET_STATUS and self.release_status is None:
raise ValueError("Missing parameters for SET STATUS operation")
else:
return self
class Log(CamelModel, extra="forbid"):
at: datetime.datetime
message: str
class Job(CamelModel, use_enum_values=True):
job_id: str
datastore_rdn: str
status: JobStatus
parameters: JobParameters
log: list[Log] | None = []
created_at: str
created_by: UserInfo
class JobQueryResult:
queued_worker_jobs: list[Job]
built_jobs: list[Job]
queued_manager_jobs: list[Job]
def __init__(
self,
queued_worker_jobs: list[Job] = [],
built_jobs: list[Job] = [],
queued_manager_jobs: list[Job] = [],
) -> None:
self.queued_worker_jobs = queued_worker_jobs
self.built_jobs = built_jobs
self.queued_manager_jobs = queued_manager_jobs
@property
def available_jobs_count(self) -> int:
return (
len(self.queued_worker_jobs)
+ len(self.built_jobs)
+ len(self.queued_manager_jobs)
)
def queued_manager_and_built_jobs(self) -> list[Job]:
return self.queued_manager_jobs + self.built_jobs