Skip to content

Commit 9a5b2ae

Browse files
committed
Added examples for error scenarios in multi-app workflow
Signed-off-by: Albert Callarisa <[email protected]>
1 parent 822f241 commit 9a5b2ae

File tree

4 files changed

+131
-14
lines changed

4 files changed

+131
-14
lines changed

examples/workflow/README.md

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,11 +291,10 @@ expected_stdout_lines:
291291
- '== APP == app1 - received workflow result'
292292
- '== APP == app1 - returning workflow result'
293293
background: true
294-
sleep: 5
294+
sleep: 20
295295
-->
296296

297297
```sh
298-
pip install ./ext/dapr-ext-workflow
299298
dapr run --app-id wfexample3 --dapr-grpc-port 50003 python3 cross-app3.py &
300299
dapr run --app-id wfexample2 --dapr-grpc-port 50002 python3 cross-app2.py &
301300
dapr run --app-id wfexample1 --dapr-grpc-port 50001 python3 cross-app1.py
@@ -310,3 +309,89 @@ app2 - triggering app3 activity
310309
...
311310
```
312311
among others. This shows that the workflow calls are working as expected.
312+
313+
314+
#### Error handling on activity calls
315+
316+
This example demonstrates how the error handling works on activity calls across apps.
317+
318+
Error handling on activity calls across apps works as normal workflow activity calls.
319+
320+
In this example we run `app3` in failing mode, which makes the activity call return error constantly. The activity call from `app2` will fail after the retry policy is exhausted.
321+
322+
<!-- STEP
323+
name: Run apps
324+
expected_stdout_lines:
325+
- '== APP == app1 - triggering app1 workflow'
326+
- '== APP == app1 - received workflow call'
327+
- '== APP == app1 - triggering app2 workflow'
328+
- '== APP == app2 - received workflow call'
329+
- '== APP == app2 - triggering app3 activity'
330+
- '== APP == app3 - received activity call'
331+
- '== APP == app3 - raising error in activity due to error mode being enabled'
332+
- '== APP == app2 - received activity error from app3'
333+
- '== APP == app2 - returning workflow result'
334+
- '== APP == app1 - received workflow result'
335+
- '== APP == app1 - returning workflow result'
336+
sleep: 20
337+
-->
338+
339+
```sh
340+
export ERROR_ACTIVITY_MODE=true
341+
dapr run --app-id wfexample3 --dapr-grpc-port 50013 python3 cross-app3.py &
342+
dapr run --app-id wfexample2 --dapr-grpc-port 50012 python3 cross-app2.py &
343+
dapr run --app-id wfexample1 --dapr-grpc-port 50011 python3 cross-app1.py
344+
```
345+
<!-- END_STEP -->
346+
347+
348+
When you run the apps with the `ERROR_ACTIVITY_MODE` environment variable set, you will see output like this:
349+
```
350+
...
351+
app3 - received activity call
352+
app3 - raising error in activity due to error mode being enabled
353+
app2 - received activity error from app3
354+
...
355+
```
356+
among others. This shows that the activity calls are failing as expected, and they are being handled as expected too.
357+
358+
359+
#### Error handling on workflow calls
360+
361+
This example demonstrates how the error handling works on workflow calls across apps.
362+
363+
Error handling on workflow calls across apps works as normal workflow calls.
364+
365+
In this example we run `app2` in failing mode, which makes the workflow call return error constantly. The workflow call from `app1` will fail after the retry policy is exhausted.
366+
367+
<!-- STEP
368+
name: Run apps
369+
expected_stdout_lines:
370+
- '== APP == app1 - triggering app1 workflow'
371+
- '== APP == app1 - received workflow call'
372+
- '== APP == app1 - triggering app2 workflow'
373+
- '== APP == app2 - received workflow call'
374+
- '== APP == app2 - raising error in workflow due to error mode being enabled'
375+
- '== APP == app1 - received workflow error from app2'
376+
- '== APP == app1 - returning workflow result'
377+
sleep: 20
378+
-->
379+
380+
```sh
381+
export ERROR_WORKFLOW_MODE=true
382+
dapr run --app-id wfexample3 --dapr-grpc-port 50023 python3 cross-app3.py &
383+
dapr run --app-id wfexample2 --dapr-grpc-port 50022 python3 cross-app2.py &
384+
dapr run --app-id wfexample1 --dapr-grpc-port 50021 python3 cross-app1.py
385+
```
386+
<!-- END_STEP -->
387+
388+
When you run the apps with the `ERROR_WORKFLOW_MODE` environment variable set, you will see output like this:
389+
```
390+
...
391+
app2 - received workflow call
392+
app2 - raising error in workflow due to error mode being enabled
393+
app1 - received workflow error from app2
394+
...
395+
```
396+
among others. This shows that the workflow calls are failing as expected, and they are being handled as expected too.
397+

examples/workflow/cross-app1.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
1212

13+
from datetime import timedelta
14+
15+
from durabletask.task import TaskFailedError
1316
import dapr.ext.workflow as wf
1417
import time
1518

@@ -21,14 +24,23 @@ def app1_workflow(ctx: wf.DaprWorkflowContext):
2124
print(f'app1 - received workflow call', flush=True)
2225
print(f'app1 - triggering app2 workflow', flush=True)
2326

24-
yield ctx.call_child_workflow(
25-
workflow='app2_workflow',
26-
input=None,
27-
app_id='wfexample2',
28-
)
29-
print(f'app1 - received workflow result', flush=True)
30-
print(f'app1 - returning workflow result', flush=True)
27+
try:
28+
retry_policy = wf.RetryPolicy(
29+
max_number_of_attempts=2,
30+
first_retry_interval=timedelta(milliseconds=100),
31+
max_retry_interval=timedelta(seconds=3),
32+
)
33+
yield ctx.call_child_workflow(
34+
workflow='app2_workflow',
35+
input=None,
36+
app_id='wfexample2',
37+
retry_policy=retry_policy,
38+
)
39+
print(f'app1 - received workflow result', flush=True)
40+
except TaskFailedError as e:
41+
print(f'app1 - received workflow error from app2', flush=True)
3142

43+
print(f'app1 - returning workflow result', flush=True)
3244
return 1
3345

3446

@@ -41,6 +53,6 @@ def app1_workflow(ctx: wf.DaprWorkflowContext):
4153
instance_id = wf_client.schedule_new_workflow(workflow=app1_workflow)
4254

4355
# Wait for the workflow to complete
44-
time.sleep(5)
56+
time.sleep(7)
4557

4658
wfr.shutdown()

examples/workflow/cross-app2.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
1212

13+
from datetime import timedelta
14+
import os
15+
16+
from durabletask.task import TaskFailedError
1317
import dapr.ext.workflow as wf
1418
import time
1519

@@ -19,11 +23,24 @@
1923
@wfr.workflow
2024
def app2_workflow(ctx: wf.DaprWorkflowContext):
2125
print(f'app2 - received workflow call', flush=True)
26+
if os.getenv('ERROR_WORKFLOW_MODE', 'false') == 'true':
27+
print(f'app2 - raising error in workflow due to error mode being enabled', flush=True)
28+
raise ValueError('Error in workflow due to error mode being enabled')
2229
print(f'app2 - triggering app3 activity', flush=True)
23-
yield ctx.call_activity('app3_activity', input=None, app_id='wfexample3')
24-
print(f'app2 - received activity result', flush=True)
25-
print(f'app2 - returning workflow result', flush=True)
30+
try:
31+
retry_policy = wf.RetryPolicy(
32+
max_number_of_attempts=2,
33+
first_retry_interval=timedelta(milliseconds=100),
34+
max_retry_interval=timedelta(seconds=3),
35+
)
36+
result = yield ctx.call_activity(
37+
'app3_activity', input=None, app_id='wfexample3', retry_policy=retry_policy
38+
)
39+
print(f'app2 - received activity result', flush=True)
40+
except TaskFailedError as e:
41+
print(f'app2 - received activity error from app3', flush=True)
2642

43+
print(f'app2 - returning workflow result', flush=True)
2744
return 2
2845

2946

examples/workflow/cross-app3.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
12-
12+
import os
1313
import dapr.ext.workflow as wf
1414
import time
1515

@@ -19,6 +19,9 @@
1919
@wfr.activity
2020
def app3_activity(ctx: wf.DaprWorkflowContext) -> int:
2121
print(f'app3 - received activity call', flush=True)
22+
if os.getenv('ERROR_ACTIVITY_MODE', 'false') == 'true':
23+
print(f'app3 - raising error in activity due to error mode being enabled', flush=True)
24+
raise ValueError('Error in activity due to error mode being enabled')
2225
print(f'app3 - returning activity result', flush=True)
2326
return 3
2427

0 commit comments

Comments
 (0)