Skip to content

Commit ef42848

Browse files
authored
Add pick first from parallel branches workflow sample (#80)
* Add pick first from parallel branches workflow sample * update readme
1 parent 40908e4 commit ef42848

File tree

3 files changed

+89
-0
lines changed

3 files changed

+89
-0
lines changed

new_samples/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,17 @@ cadence --env development \
8787
```
8888
Try to inspect the log message for the output.
8989

90+
### Parallel pick first workflow
91+
This example runs two activities in parallel and output the first successful response. Change the
92+
configuration for `WaitForCancellation` in the activity options will change the behavior of canceling
93+
other unfinished activities. Here is a sample CLI
94+
95+
```bash
96+
cadence --env development \
97+
--domain cadence-samples \
98+
workflow start \
99+
--workflow_type cadence_samples.ParallelBranchPickFirstWorkflow \
100+
--tl cadence-samples-worker \
101+
--et 60 \
102+
--input '{}'
103+
```

new_samples/worker/worker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ func StartWorker() {
5151
w.RegisterWorkflowWithOptions(workflows.DynamicWorkflow, workflow.RegisterOptions{Name: "cadence_samples.DynamicWorkflow"})
5252
w.RegisterActivityWithOptions(workflows.DynamicGreetingActivity, activity.RegisterOptions{Name: "cadence_samples.DynamicGreetingActivity"})
5353

54+
// ParallelPickFirst workflow registration
55+
w.RegisterWorkflowWithOptions(workflows.ParallelBranchPickFirstWorkflow, workflow.RegisterOptions{Name: "cadence_samples.ParallelBranchPickFirstWorkflow"})
56+
w.RegisterActivityWithOptions(workflows.ParallelActivity, activity.RegisterOptions{Name: "cadence_samples.ParallelActivity"})
57+
5458
err := w.Start()
5559
if err != nil {
5660
panic("Failed to start worker: " + err.Error())
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package workflows
2+
3+
import (
4+
"context"
5+
"go.uber.org/cadence/activity"
6+
"go.uber.org/cadence/workflow"
7+
"time"
8+
)
9+
10+
type parallelBranchInput struct {
11+
Message string `json:"message"`
12+
}
13+
14+
// ParallelBranchPickFirstWorkflow is a sample workflow simulating two parallel activities running
15+
// at the same time and picking the first successful result.
16+
func ParallelBranchPickFirstWorkflow(ctx workflow.Context) (string, error) {
17+
logger := workflow.GetLogger(ctx)
18+
logger.Info("ParallelBranchPickFirstWorkflow started")
19+
20+
selector := workflow.NewSelector(ctx)
21+
var firstResp string
22+
23+
// Use a cancel handler to cancel all rest of other activities.
24+
childCtx, cancelHandler := workflow.WithCancel(ctx)
25+
ao := workflow.ActivityOptions{
26+
ScheduleToStartTimeout: time.Minute,
27+
StartToCloseTimeout: time.Minute,
28+
HeartbeatTimeout: time.Second * 20,
29+
WaitForCancellation: true, // wait for cancellation to complete
30+
}
31+
childCtx = workflow.WithActivityOptions(childCtx, ao)
32+
33+
// Set WaitForCancellation to true to demonstrate the cancellation to the other activities. In real world case,
34+
// you might not care about them and could set WaitForCancellation to false (which is default value).
35+
36+
// Run two activities in parallel
37+
f1 := workflow.ExecuteActivity(childCtx, ParallelActivity, parallelBranchInput{Message: "first activity"}, time.Second*10)
38+
f2 := workflow.ExecuteActivity(childCtx, ParallelActivity, parallelBranchInput{Message: "second activity"}, time.Second*2)
39+
pendingFutures := []workflow.Future{f1, f2}
40+
selector.AddFuture(f1, func(f workflow.Future) {
41+
f.Get(ctx, &firstResp)
42+
}).AddFuture(f2, func(f workflow.Future) {
43+
f.Get(ctx, &firstResp)
44+
})
45+
46+
// wait for any of the future to complete
47+
selector.Select(ctx)
48+
49+
// now if at least one future is complete, cancel all other pending futures
50+
cancelHandler()
51+
52+
// - If you want to wait for pending activities to finish after issuing cancellation
53+
// then wait for the future to complete.
54+
// - if you don't want to wait for completion of pending activities cancellation then you can choose to
55+
// set WaitForCancellation to false through WithWaitForCancellation(false)
56+
for _, f := range pendingFutures {
57+
err := f.Get(ctx, &firstResp)
58+
if err != nil {
59+
return "", err
60+
}
61+
}
62+
63+
logger.Info("ParallelBranchPickFirstWorkflow completed")
64+
return firstResp, nil
65+
}
66+
67+
func ParallelActivity(ctx context.Context, input parallelBranchInput) (string, error) {
68+
logger := activity.GetLogger(ctx)
69+
logger.Info("ParallelActivity started")
70+
return "Hello " + input.Message, nil
71+
}

0 commit comments

Comments
 (0)