Skip to content

Commit e442ce4

Browse files
committed
Add sample of batch executions
1 parent 84120a4 commit e442ce4

File tree

2 files changed

+90
-0
lines changed

2 files changed

+90
-0
lines changed

cmd/samples/batch/workflow.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package batch
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/rand"
7+
"time"
8+
9+
"go.uber.org/cadence/workflow"
10+
"go.uber.org/multierr"
11+
)
12+
13+
type BatchWorkflowInput struct {
14+
Concurrency int
15+
TotalSize int
16+
}
17+
18+
func BatchWorkflow(ctx workflow.Context, input BatchWorkflowInput) error {
19+
wg := workflow.NewWaitGroup(ctx)
20+
21+
buffered := workflow.NewBufferedChannel(ctx, input.Concurrency)
22+
futures := workflow.NewNamedChannel(ctx, "futures")
23+
24+
var errs error
25+
wg.Add(1)
26+
// task result collector
27+
workflow.Go(ctx, func(ctx workflow.Context) {
28+
defer wg.Done()
29+
for {
30+
var future workflow.Future
31+
ok := futures.Receive(ctx, &future)
32+
if !ok {
33+
break
34+
}
35+
err := future.Get(ctx, nil)
36+
errs = multierr.Append(errs, err)
37+
buffered.Receive(ctx, nil)
38+
}
39+
})
40+
41+
// submit all tasks
42+
for taskID := 0; taskID < input.TotalSize; taskID++ {
43+
taskID := taskID
44+
buffered.Send(ctx, nil)
45+
46+
aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
47+
ScheduleToStartTimeout: time.Second * 10,
48+
StartToCloseTimeout: time.Second * 10,
49+
})
50+
futures.Send(ctx, workflow.ExecuteActivity(aCtx, BatchActivity, taskID))
51+
}
52+
futures.Close()
53+
54+
wg.Wait(ctx)
55+
56+
return errs
57+
}
58+
59+
func BatchActivity(ctx context.Context, taskID int) error {
60+
select {
61+
case <-ctx.Done():
62+
return fmt.Errorf("batch activity %d failed: %w", taskID, ctx.Err())
63+
case <-time.After(time.Duration(rand.Int63n(100))*time.Millisecond + 900*time.Millisecond):
64+
return nil
65+
}
66+
}

cmd/samples/batch/workflow_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package batch
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"go.uber.org/cadence/testsuite"
8+
)
9+
10+
func Test_BatchWorkflow(t *testing.T) {
11+
testSuite := &testsuite.WorkflowTestSuite{}
12+
env := testSuite.NewTestWorkflowEnvironment()
13+
14+
env.RegisterWorkflow(BatchWorkflow)
15+
env.RegisterActivity(BatchActivity)
16+
17+
env.ExecuteWorkflow(BatchWorkflow, BatchWorkflowInput{
18+
Concurrency: 2,
19+
TotalSize: 10,
20+
})
21+
22+
assert.True(t, env.IsWorkflowCompleted())
23+
assert.Nil(t, env.GetWorkflowError())
24+
}

0 commit comments

Comments
 (0)