18
18
import os
19
19
from multiprocessing import Lock
20
20
from pickle import Pickler , Unpickler
21
- from typing import Any , Dict , List , Optional
21
+ from typing import Any
22
22
23
23
import yaml
24
24
29
29
os .path .dirname (__file__ ), "job-definitions" , "job-definitions.yaml"
30
30
)
31
31
with open (_JOB_DEFINITION_FILE , "r" , encoding = "utf8" ) as jd_file :
32
- _JOB_DEFINITIONS : Dict [str , Any ] = yaml .load (jd_file , Loader = yaml .FullLoader )
32
+ _JOB_DEFINITIONS : dict [str , Any ] = yaml .load (jd_file , Loader = yaml .FullLoader )
33
33
assert _JOB_DEFINITIONS
34
34
35
35
# Table UUID formats
@@ -80,7 +80,7 @@ def __init__(self):
80
80
Pickler (pickle_file ).dump ({})
81
81
UnitTestWorkflowAPIAdapter .lock .release ()
82
82
83
- def create_workflow (self , * , workflow_definition : Dict [str , Any ]) -> str :
83
+ def create_workflow (self , * , workflow_definition : dict [str , Any ]) -> dict [ str , Any ] :
84
84
UnitTestWorkflowAPIAdapter .lock .acquire ()
85
85
with open (_WORKFLOW_PICKLE_FILE , "rb" ) as pickle_file :
86
86
workflow = Unpickler (pickle_file ).load ()
@@ -95,34 +95,22 @@ def create_workflow(self, *, workflow_definition: Dict[str, Any]) -> str:
95
95
96
96
return {"id" : workflow_definition_id }
97
97
98
- def get_workflow (self , * , workflow_id : str ) -> Dict [str , Any ]:
98
+ def get_workflow (self , * , workflow_id : str ) -> dict [str , Any ]:
99
99
UnitTestWorkflowAPIAdapter .lock .acquire ()
100
100
with open (_WORKFLOW_PICKLE_FILE , "rb" ) as pickle_file :
101
101
workflow = Unpickler (pickle_file ).load ()
102
102
UnitTestWorkflowAPIAdapter .lock .release ()
103
103
104
104
return {"workflow" : workflow [workflow_id ]} if workflow_id in workflow else {}
105
105
106
- def get_workflow_by_name (self , * , name : str , version : str ) -> Dict [str , Any ]:
107
- UnitTestWorkflowAPIAdapter .lock .acquire ()
108
- with open (_WORKFLOW_PICKLE_FILE , "rb" ) as pickle_file :
109
- workflow = Unpickler (pickle_file ).load ()
110
- UnitTestWorkflowAPIAdapter .lock .release ()
111
-
112
- item = {}
113
- for wfid , value in workflow .items ():
114
- if value ["name" ] == name :
115
- item = {"id" : wfid , "workflow" : value }
116
- return item
117
-
118
106
def create_running_workflow (
119
107
self ,
120
108
* ,
121
109
user_id : str ,
122
110
workflow_id : str ,
123
111
project_id : str ,
124
- variables : Dict [str , Any ],
125
- ) -> str :
112
+ variables : dict [str , Any ],
113
+ ) -> dict [ str , Any ] :
126
114
assert user_id
127
115
assert isinstance (variables , dict )
128
116
@@ -149,13 +137,23 @@ def create_running_workflow(
149
137
150
138
return {"id" : running_workflow_id }
151
139
140
+ def get_running_workflow (self , * , running_workflow_id : str ) -> dict [str , Any ]:
141
+ UnitTestWorkflowAPIAdapter .lock .acquire ()
142
+ with open (_RUNNING_WORKFLOW_PICKLE_FILE , "rb" ) as pickle_file :
143
+ running_workflow = Unpickler (pickle_file ).load ()
144
+ UnitTestWorkflowAPIAdapter .lock .release ()
145
+
146
+ if running_workflow_id not in running_workflow :
147
+ return {}
148
+ return {"running_workflow" : running_workflow [running_workflow_id ]}
149
+
152
150
def set_running_workflow_done (
153
151
self ,
154
152
* ,
155
153
running_workflow_id : str ,
156
154
success : bool ,
157
- error : Optional [ int ] = None ,
158
- error_msg : Optional [ str ] = None ,
155
+ error : int | None = None ,
156
+ error_msg : str | None = None ,
159
157
) -> None :
160
158
UnitTestWorkflowAPIAdapter .lock .acquire ()
161
159
with open (_RUNNING_WORKFLOW_PICKLE_FILE , "rb" ) as pickle_file :
@@ -171,19 +169,9 @@ def set_running_workflow_done(
171
169
Pickler (pickle_file ).dump (running_workflow )
172
170
UnitTestWorkflowAPIAdapter .lock .release ()
173
171
174
- def get_running_workflow (self , * , running_workflow_id : str ) -> Dict [str , Any ]:
175
- UnitTestWorkflowAPIAdapter .lock .acquire ()
176
- with open (_RUNNING_WORKFLOW_PICKLE_FILE , "rb" ) as pickle_file :
177
- running_workflow = Unpickler (pickle_file ).load ()
178
- UnitTestWorkflowAPIAdapter .lock .release ()
179
-
180
- if running_workflow_id not in running_workflow :
181
- return {}
182
- return {"running_workflow" : running_workflow [running_workflow_id ]}
183
-
184
172
def create_running_workflow_step (
185
173
self , * , running_workflow_id : str , step : str
186
- ) -> str :
174
+ ) -> dict [ str , Any ] :
187
175
UnitTestWorkflowAPIAdapter .lock .acquire ()
188
176
with open (_RUNNING_WORKFLOW_STEP_PICKLE_FILE , "rb" ) as pickle_file :
189
177
running_workflow_step = Unpickler (pickle_file ).load ()
@@ -208,7 +196,7 @@ def create_running_workflow_step(
208
196
209
197
def get_running_workflow_step (
210
198
self , * , running_workflow_step_id : str
211
- ) -> Dict [str , Any ]:
199
+ ) -> dict [str , Any ]:
212
200
UnitTestWorkflowAPIAdapter .lock .acquire ()
213
201
with open (_RUNNING_WORKFLOW_STEP_PICKLE_FILE , "rb" ) as pickle_file :
214
202
running_workflow_step = Unpickler (pickle_file ).load ()
@@ -225,8 +213,8 @@ def set_running_workflow_step_done(
225
213
* ,
226
214
running_workflow_step_id : str ,
227
215
success : bool ,
228
- error : Optional [ int ] = None ,
229
- error_msg : Optional [ str ] = None ,
216
+ error : int | None = None ,
217
+ error_msg : str | None = None ,
230
218
) -> None :
231
219
UnitTestWorkflowAPIAdapter .lock .acquire ()
232
220
with open (_RUNNING_WORKFLOW_STEP_PICKLE_FILE , "rb" ) as pickle_file :
@@ -242,9 +230,7 @@ def set_running_workflow_step_done(
242
230
Pickler (pickle_file ).dump (running_workflow_step )
243
231
UnitTestWorkflowAPIAdapter .lock .release ()
244
232
245
- def get_running_workflow_steps (
246
- self , * , running_workflow_id : str
247
- ) -> List [Dict [str , Any ]]:
233
+ def get_running_workflow_steps (self , * , running_workflow_id : str ) -> dict [str , Any ]:
248
234
UnitTestWorkflowAPIAdapter .lock .acquire ()
249
235
with open (_RUNNING_WORKFLOW_STEP_PICKLE_FILE , "rb" ) as pickle_file :
250
236
running_workflow_step = Unpickler (pickle_file ).load ()
@@ -257,7 +243,7 @@ def get_running_workflow_steps(
257
243
steps .append (item )
258
244
return {"count" : len (steps ), "running_workflow_steps" : steps }
259
245
260
- def create_instance (self , * , running_workflow_step_id : str ) -> Dict [str , Any ]:
246
+ def create_instance (self , * , running_workflow_step_id : str ) -> dict [str , Any ]:
261
247
UnitTestWorkflowAPIAdapter .lock .acquire ()
262
248
with open (_INSTANCE_PICKLE_FILE , "rb" ) as pickle_file :
263
249
instances = Unpickler (pickle_file ).load ()
@@ -275,15 +261,15 @@ def create_instance(self, *, running_workflow_step_id: str) -> Dict[str, Any]:
275
261
276
262
return {"id" : instance_id }
277
263
278
- def get_instance (self , * , instance_id : str ) -> Dict [str , Any ]:
264
+ def get_instance (self , * , instance_id : str ) -> dict [str , Any ]:
279
265
UnitTestWorkflowAPIAdapter .lock .acquire ()
280
266
with open (_INSTANCE_PICKLE_FILE , "rb" ) as pickle_file :
281
267
instances = Unpickler (pickle_file ).load ()
282
268
UnitTestWorkflowAPIAdapter .lock .release ()
283
269
284
270
return {} if instance_id not in instances else instances [instance_id ]
285
271
286
- def create_task (self , * , instance_id : str ) -> Dict [str , Any ]:
272
+ def create_task (self , * , instance_id : str ) -> dict [str , Any ]:
287
273
UnitTestWorkflowAPIAdapter .lock .acquire ()
288
274
with open (_TASK_PICKLE_FILE , "rb" ) as pickle_file :
289
275
tasks = Unpickler (pickle_file ).load ()
@@ -302,17 +288,15 @@ def create_task(self, *, instance_id: str) -> Dict[str, Any]:
302
288
303
289
return {"id" : task_id }
304
290
305
- def get_task (self , * , task_id : str ) -> Dict [str , Any ]:
291
+ def get_task (self , * , task_id : str ) -> dict [str , Any ]:
306
292
UnitTestWorkflowAPIAdapter .lock .acquire ()
307
293
with open (_TASK_PICKLE_FILE , "rb" ) as pickle_file :
308
294
tasks = Unpickler (pickle_file ).load ()
309
295
UnitTestWorkflowAPIAdapter .lock .release ()
310
296
311
297
return {} if task_id not in tasks else tasks [task_id ]
312
298
313
- def get_job (
314
- self , * , collection : str , job : str , version : str
315
- ) -> Optional [Dict [str , Any ]]:
299
+ def get_job (self , * , collection : str , job : str , version : str ) -> dict [str , Any ]:
316
300
assert collection == _JOB_DEFINITIONS ["collection" ]
317
301
assert job in _JOB_DEFINITIONS ["jobs" ]
318
302
assert version
0 commit comments