Skip to content

Commit 801bf30

Browse files
authored
Add cancel sample (#82)
This workflow provides an example on how to cancel an workflow and execute subsequent activities upon a cancelled context.
1 parent ef42848 commit 801bf30

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed

new_samples/README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,27 @@ cadence --env development \
101101
--et 60 \
102102
--input '{}'
103103
```
104+
105+
### Cancel workflow
106+
This workflow includes multiple activities and each will respond to the workflow `cancel` action differently.
107+
To start the workflow first, simply run
108+
109+
```bash
110+
cadence --env development \
111+
--domain cadence-samples \
112+
workflow start \
113+
--workflow_type cadence_samples.CancelWorkflow \
114+
--tl cadence-samples-worker \
115+
--et 60 \
116+
--input '{}'
117+
```
118+
119+
You should have a workflow ID output in the terminal. Then to cancel the workflow, run
120+
```bash
121+
cadence --env development \
122+
--domain cadence-samples \
123+
workflow cancel \
124+
--workflow_id <YOUR_WORKFLOW_ID>
125+
```
126+
127+
Inspect the behavior of the activities in the console.

new_samples/worker/worker.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ func StartWorker() {
5555
w.RegisterWorkflowWithOptions(workflows.ParallelBranchPickFirstWorkflow, workflow.RegisterOptions{Name: "cadence_samples.ParallelBranchPickFirstWorkflow"})
5656
w.RegisterActivityWithOptions(workflows.ParallelActivity, activity.RegisterOptions{Name: "cadence_samples.ParallelActivity"})
5757

58+
// Cancel workflow registration
59+
w.RegisterWorkflowWithOptions(workflows.CancelWorkflow, workflow.RegisterOptions{Name: "cadence_samples.CancelWorkflow"})
60+
w.RegisterActivityWithOptions(workflows.ActivityToBeSkipped, activity.RegisterOptions{Name: "cadence_samples.ActivityToBeSkipped"})
61+
w.RegisterActivityWithOptions(workflows.ActivityToBeCanceled, activity.RegisterOptions{Name: "cadence_samples.ActivityToBeCanceled"})
62+
w.RegisterActivityWithOptions(workflows.CleanupActivity, activity.RegisterOptions{Name: "cadence_samples.CleanupActivity"})
63+
5864
err := w.Start()
5965
if err != nil {
6066
panic("Failed to start worker: " + err.Error())
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package workflows
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"go.uber.org/cadence"
7+
"go.uber.org/cadence/activity"
8+
"go.uber.org/cadence/workflow"
9+
"go.uber.org/zap"
10+
"time"
11+
)
12+
13+
func CancelWorkflow(ctx workflow.Context) (retError error) {
14+
ao := workflow.ActivityOptions{
15+
ScheduleToStartTimeout: time.Minute,
16+
StartToCloseTimeout: time.Minute * 30,
17+
HeartbeatTimeout: time.Second * 5,
18+
WaitForCancellation: true,
19+
}
20+
ctx = workflow.WithActivityOptions(ctx, ao)
21+
logger := workflow.GetLogger(ctx)
22+
logger.Info("cancel workflow started")
23+
24+
defer func() {
25+
if cadence.IsCanceledError(retError) {
26+
// When workflow is canceled, it has to get a new disconnected context to execute any activities
27+
newCtx, _ := workflow.NewDisconnectedContext(ctx)
28+
err := workflow.ExecuteActivity(newCtx, CleanupActivity).Get(ctx, nil)
29+
if err != nil {
30+
logger.Error("Cleanup activity failed", zap.Error(err))
31+
retError = err
32+
return
33+
}
34+
retError = nil
35+
logger.Info("Workflow completed.")
36+
}
37+
}()
38+
39+
var result string
40+
err := workflow.ExecuteActivity(ctx, ActivityToBeCanceled).Get(ctx, &result)
41+
if err != nil && !cadence.IsCanceledError(err) {
42+
logger.Error("Error from activityToBeCanceled", zap.Error(err))
43+
return err
44+
}
45+
logger.Info(fmt.Sprintf("activityToBeCanceled returns %v, %v", result, err))
46+
47+
// Execute activity using a canceled ctx,
48+
// activity won't be scheduled and a cancelled error will be returned
49+
err = workflow.ExecuteActivity(ctx, ActivityToBeSkipped).Get(ctx, nil)
50+
if err != nil && !cadence.IsCanceledError(err) {
51+
logger.Error("Error from activityToBeSkipped", zap.Error(err))
52+
}
53+
54+
return err
55+
}
56+
57+
func ActivityToBeCanceled(ctx context.Context) (string, error) {
58+
logger := activity.GetLogger(ctx)
59+
logger.Info("activity started, to cancel workflow, use CLI: 'cadence --do samples-domain wf cancel -w <WorkflowID>' to cancel")
60+
for {
61+
select {
62+
case <-time.After(1 * time.Second):
63+
logger.Info("heart beating...")
64+
activity.RecordHeartbeat(ctx, "")
65+
case <-ctx.Done():
66+
logger.Info("context is cancelled")
67+
// returned canceled error here so that in workflow history we can see ActivityTaskCanceled event
68+
// or if not cancelled, return timeout error
69+
return "I am canceled by Done", ctx.Err()
70+
}
71+
}
72+
}
73+
74+
func CleanupActivity(ctx context.Context) error {
75+
logger := activity.GetLogger(ctx)
76+
logger.Info("cleanupActivity started")
77+
return nil
78+
}
79+
80+
func ActivityToBeSkipped(ctx context.Context) error {
81+
logger := activity.GetLogger(ctx)
82+
logger.Info("this activity will be skipped due to cancellation")
83+
return nil
84+
}

0 commit comments

Comments
 (0)