@@ -29,29 +29,31 @@ def basic_engine():
29
29
api_adapter = api_adapter , instance_launcher = instance_launcher
30
30
)
31
31
message_queue .set_receiver (workflow_engine .handle_message )
32
- return [
32
+ print ("Starting message queue..." )
33
+ message_queue .start ()
34
+
35
+ yield [
33
36
api_adapter ,
34
37
message_queue ,
35
38
message_dispatcher ,
36
39
workflow_engine ,
37
40
]
38
41
42
+ print ("Stopping message queue..." )
43
+ message_queue .stop ()
44
+ message_queue .join ()
45
+ print ("Stopped" )
39
46
40
- def test_workflow_engine_with_two_step_nop (basic_engine ):
41
- # Arrange
42
- da , mq , md , _ = basic_engine
43
47
44
- # Act
45
- # To test the WorkflowEngine we need to:
46
- # 1. Start the message queue
47
- # 2. Load and create a Workflow Definition
48
- # 3. Create a Running Workflow record
49
- # 4. Send a Workflow START message
48
+ def start_workflow (md , da , workflow_file_name ) -> str :
49
+ """A convenience function to handle all the 'START' logic for a workflow."""
50
+
51
+ # To start a workflow we need to:
52
+ # 1. Load and create a Workflow Definition
53
+ # 2. Create a Running Workflow record
54
+ # 3. Send a Workflow START message
50
55
#
51
- # 1. (Start the message queue)
52
- mq .start ()
53
- # 2. (Load/create the workflow definition to be tested)
54
- workflow_file_name = "example-two-step-nop"
56
+ # 1.
55
57
workflow_path = os .path .join (
56
58
os .path .dirname (__file__ ), "workflow-definitions" , f"{ workflow_file_name } .yaml"
57
59
)
@@ -61,22 +63,30 @@ def test_workflow_engine_with_two_step_nop(basic_engine):
61
63
wfid = da .create_workflow (workflow_definition = wf_definition )
62
64
assert wfid
63
65
print (f"Created workflow definition { wfid } " )
64
- # 3. (Create a running workflow record)
66
+ # 2.
65
67
response = da .create_running_workflow (workflow_definition_id = wfid )
66
68
r_wfid = response ["id" ]
67
69
assert r_wfid
68
70
print (f"Created running workflow { r_wfid } " )
69
- # 4. (Send the Workflow START message)
71
+ # 3.
70
72
msg = WorkflowMessage ()
71
73
msg .timestamp = f"{ datetime .now (timezone .utc ).isoformat ()} Z"
72
74
msg .action = "START"
73
75
msg .running_workflow = r_wfid
74
76
md .send (msg )
75
77
print ("Sent START message" )
76
78
77
- # Assert
78
- # Wait until the workflow is done (successfully)
79
- # But don't wait for ever!
79
+ return r_wfid
80
+
81
+
82
+ def wait_for_workflow (da , mq , r_wfid , expect_success = True ):
83
+ """A convenience function to wait for and check a workflow execution
84
+ (by inspecting the anticipated DB/API records)."""
85
+
86
+ # We wait for the workflow to complete by polling the API and checking
87
+ # the running workflow's 'done' status. The user can specify whether
88
+ # the workflow is expected to succeed or fail. Any further checks
89
+ # are the responsibility of the caller.
80
90
attempts = 0
81
91
done = False
82
92
r_wf = None
@@ -91,18 +101,42 @@ def test_workflow_engine_with_two_step_nop(basic_engine):
91
101
if attempts > 10 :
92
102
break
93
103
time .sleep (0.5 )
94
- # Stop the message queue
95
- print ("Stopping message queue..." )
96
- mq .stop ()
97
- mq .join ()
98
- print ("Stopped" )
99
104
assert r_wf
100
105
assert r_wf ["done" ]
101
- assert r_wf ["success" ]
102
- # Now check there are the right number of RunningWorkflowStep Records
106
+ assert r_wf ["success" ] == expect_success
107
+
108
+
109
+ def test_workflow_engine_with_two_step_nop (basic_engine ):
110
+ # Arrange
111
+ da , mq , md , _ = basic_engine
112
+
113
+ # Act
114
+ r_wfid = start_workflow (md , da , "example-two-step-nop" )
115
+
116
+ # Assert
117
+ wait_for_workflow (da , mq , r_wfid )
118
+ # Additional, detailed checks...
119
+ # Check there are the right number of RunningWorkflowStep Records
103
120
# (and they're all set to success/done)
104
121
response = da .get_running_workflow_steps (running_workflow_id = r_wfid )
105
122
assert response ["count" ] == 2
106
123
for step in response ["running_workflow_steps" ]:
107
124
assert step ["running_workflow_step" ]["done" ]
108
125
assert step ["running_workflow_step" ]["success" ]
126
+
127
+
128
+ def test_workflow_engine_with_nop_fail (basic_engine ):
129
+ # Arrange
130
+ da , mq , md , _ = basic_engine
131
+
132
+ # Act
133
+ r_wfid = start_workflow (md , da , "example-nop-fail" )
134
+
135
+ # Assert
136
+ wait_for_workflow (da , mq , r_wfid , expect_success = False )
137
+ # Additional, detailed checks...
138
+ # Check we only haver one step, and it failed
139
+ response = da .get_running_workflow_steps (running_workflow_id = r_wfid )
140
+ assert response ["count" ] == 1
141
+ assert response ["running_workflow_steps" ][0 ]["running_workflow_step" ]["done" ]
142
+ assert not response ["running_workflow_steps" ][0 ]["running_workflow_step" ]["success" ]
0 commit comments