15
15
method.
16
16
"""
17
17
18
+ import copy
18
19
import os
19
20
from http import HTTPStatus
20
21
from multiprocessing import Lock
49
50
f"{ _PICKLE_DIRECTORY } /running-workflow-step.pickle"
50
51
)
51
52
_INSTANCE_PICKLE_FILE : str = f"{ _PICKLE_DIRECTORY } /instance.pickle"
53
+ _MOCK_STEP_OUTPUT_FILE : str = f"{ _PICKLE_DIRECTORY } /mock-output.pickle"
52
54
53
55
54
56
class UnitTestWorkflowAPIAdapter (WorkflowAPIAdapter ):
@@ -73,12 +75,13 @@ def __init__(self):
73
75
_RUNNING_WORKFLOW_PICKLE_FILE ,
74
76
_RUNNING_WORKFLOW_STEP_PICKLE_FILE ,
75
77
_INSTANCE_PICKLE_FILE ,
78
+ _MOCK_STEP_OUTPUT_FILE ,
76
79
]:
77
80
with open (file , "wb" ) as pickle_file :
78
81
Pickler (pickle_file ).dump ({})
79
82
UnitTestWorkflowAPIAdapter .lock .release ()
80
83
81
- def get_workflow (self , * , workflow_id : str ) -> dict [str , Any ]:
84
+ def get_workflow (self , * , workflow_id : str ) -> tuple [ dict [str , Any ], int ]:
82
85
UnitTestWorkflowAPIAdapter .lock .acquire ()
83
86
with open (_WORKFLOW_PICKLE_FILE , "rb" ) as pickle_file :
84
87
workflow = Unpickler (pickle_file ).load ()
@@ -138,7 +141,7 @@ def create_running_workflow_step(
138
141
step : str ,
139
142
replica : int = 0 ,
140
143
prior_running_workflow_step_id : str | None = None ,
141
- ) -> dict [str , Any ]:
144
+ ) -> tuple [ dict [str , Any ], int ]:
142
145
if replica :
143
146
assert replica > 0
144
147
@@ -172,7 +175,7 @@ def create_running_workflow_step(
172
175
173
176
def get_running_workflow_step (
174
177
self , * , running_workflow_step_id : str
175
- ) -> dict [str , Any ]:
178
+ ) -> tuple [ dict [str , Any ], int ]:
176
179
UnitTestWorkflowAPIAdapter .lock .acquire ()
177
180
with open (_RUNNING_WORKFLOW_STEP_PICKLE_FILE , "rb" ) as pickle_file :
178
181
running_workflow_step = Unpickler (pickle_file ).load ()
@@ -188,7 +191,7 @@ def get_running_workflow_step(
188
191
189
192
def get_running_workflow_step_by_name (
190
193
self , * , name : str , running_workflow_id : str , replica : int = 0
191
- ) -> dict [str , Any ]:
194
+ ) -> tuple [ dict [str , Any ], int ]:
192
195
if replica :
193
196
assert replica > 0
194
197
UnitTestWorkflowAPIAdapter .lock .acquire ()
@@ -293,7 +296,9 @@ def get_instance(self, *, instance_id: str) -> dict[str, Any]:
293
296
response = {} if instance_id not in instances else instances [instance_id ]
294
297
return response , 0
295
298
296
- def get_job (self , * , collection : str , job : str , version : str ) -> dict [str , Any ]:
299
+ def get_job (
300
+ self , * , collection : str , job : str , version : str
301
+ ) -> tuple [dict [str , Any ], int ]:
297
302
assert collection == _JOB_DEFINITIONS ["collection" ]
298
303
assert job in _JOB_DEFINITIONS ["jobs" ]
299
304
assert version
@@ -391,14 +396,61 @@ def get_running_workflow_steps(self, *, running_workflow_id: str) -> dict[str, A
391
396
return {"count" : len (steps ), "running_workflow_steps" : steps }
392
397
393
398
def get_running_workflow_step_output_values_for_output (
394
- self , * , running_workflow_step_id : str , output : str
399
+ self , * , running_workflow_step_id : str , output_variable : str
395
400
) -> tuple [dict [str , Any ], int ]:
396
- del running_workflow_step_id
397
- del output
398
- return {"outputs" : []}, HTTPStatus .OK
401
+ """We use the 'mock' data to return output values, otherwise
402
+ we return an empty list. And we need to get the step in order to get its name.
403
+ """
404
+ # The RunningWorkflowStep must exist...
405
+ step , _ = self .get_running_workflow_step (
406
+ running_workflow_step_id = running_workflow_step_id
407
+ )
408
+ assert step
409
+ step_name : str = step ["name" ]
410
+ # Now we can inspect the 'mock' data...
411
+ UnitTestWorkflowAPIAdapter .lock .acquire ()
412
+ with open (_MOCK_STEP_OUTPUT_FILE , "rb" ) as pickle_file :
413
+ mock_output = Unpickler (pickle_file ).load ()
414
+ UnitTestWorkflowAPIAdapter .lock .release ()
415
+
416
+ if step_name not in mock_output :
417
+ return {"output" : []}, 0
418
+ # The record's output variable must match (there's only one record per step atm)
419
+ assert mock_output [step_name ]["output_variable" ] == output_variable
420
+ # Now return what was provided to the mock method...
421
+ response = {"output" : copy .copy (mock_output [step_name ]["output" ])}
422
+ return response , 0
399
423
400
424
def realise_outputs (
401
425
self , * , running_workflow_step_id : str
402
426
) -> tuple [dict [str , Any ], int ]:
403
427
del running_workflow_step_id
404
428
return {}, HTTPStatus .OK
429
+
430
+ # Custom (test) methods
431
+ # Methods not declared in the ABC
432
+
433
+ def mock_get_running_workflow_step_output_values_for_output (
434
+ self , * , step_name : str , output_variable : str , output : list [str ]
435
+ ) -> None :
436
+ """Sets the output response for a step.
437
+ Limitation is that there can only be one record for each step name
438
+ so, for now, the output_variable is superfluous and only used
439
+ to check the output variable name matches."""
440
+ assert isinstance (step_name , str )
441
+ assert isinstance (output_variable , str )
442
+ assert isinstance (output , list )
443
+
444
+ UnitTestWorkflowAPIAdapter .lock .acquire ()
445
+ with open (_MOCK_STEP_OUTPUT_FILE , "rb" ) as pickle_file :
446
+ mock_output = Unpickler (pickle_file ).load ()
447
+
448
+ record = {
449
+ "output_variable" : output_variable ,
450
+ "output" : output ,
451
+ }
452
+ mock_output [step_name ] = record
453
+
454
+ with open (_MOCK_STEP_OUTPUT_FILE , "wb" ) as pickle_file :
455
+ Pickler (pickle_file ).dump (mock_output )
456
+ UnitTestWorkflowAPIAdapter .lock .release ()
0 commit comments