Skip to content

Commit 6667d89

Browse files
authored
Enhance batch future sample (#106)
Added Main.go entry point Added Batch to the Makefile Upgraded go client Created sample readme
1 parent f7320eb commit 6667d89

File tree

7 files changed

+174
-25
lines changed

7 files changed

+174
-25
lines changed

Makefile

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ PROGS = helloworld \
3737
sleep \
3838
dataconverter \
3939
autoscaling-monitoring \
40+
batch \
4041

4142
TEST_ARG ?= -race -v -timeout 5m
4243
BUILD := ./build
@@ -74,7 +75,7 @@ TEST_DIRS=./cmd/samples/cron \
7475
./cmd/samples/recipes/dataconverter \
7576
./cmd/samples/recovery \
7677
./cmd/samples/pso \
77-
78+
./cmd/samples/batch \
7879

7980
cancelactivity:
8081
go build -o bin/cancelactivity cmd/samples/recipes/cancelactivity/*.go
@@ -187,6 +188,21 @@ dataconverter:
187188
autoscaling-monitoring:
188189
go build -o bin/autoscaling-monitoring cmd/samples/advanced/autoscaling-monitoring/*.go
189190

191+
batch:
192+
go build -o bin/batch cmd/samples/batch/*.go
193+
194+
test: bins
195+
@rm -f test
196+
@rm -f test.log
197+
@echo $(TEST_DIRS)
198+
@for dir in $(TEST_DIRS); do \
199+
go test -coverprofile=$@ "$$dir" | tee -a test.log; \
200+
done;
201+
202+
clean:
203+
rm -rf bin
204+
rm -Rf $(BUILD)
205+
190206
bins: helloworld \
191207
versioning \
192208
delaystart \
@@ -220,15 +236,4 @@ bins: helloworld \
220236
sleep \
221237
dataconverter \
222238
autoscaling-monitoring \
223-
224-
test: bins
225-
@rm -f test
226-
@rm -f test.log
227-
@echo $(TEST_DIRS)
228-
@for dir in $(TEST_DIRS); do \
229-
go test -coverprofile=$@ "$$dir" | tee -a test.log; \
230-
done;
231-
232-
clean:
233-
rm -rf bin
234-
rm -Rf $(BUILD)
239+
batch \

cmd/samples/batch/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Batch Processing Sample
2+
3+
This sample demonstrates how to process large batches of activities with controlled concurrency using Cadence's `workflow.NewBatchFuture` functionality, while respecting the 1024 pending activities limit per workflow.
4+
5+
## The problem it solves
6+
7+
**The Problem**: When processing large datasets (thousands of records, files, or API calls), you face a dilemma:
8+
- **Sequential processing**: Too slow, poor user experience
9+
- **Unlimited concurrency**: Overwhelms databases, APIs, or downstream services
10+
- **Manual concurrency control**: Complex error handling and resource management
11+
- **Cadence limits**: Max 1024 pending activities per workflow
12+
13+
**Real-world scenarios**:
14+
- Processing 10,000 user records for a migration
15+
- Sending emails to 50,000 subscribers
16+
- Generating reports for 1,000 customers
17+
- Processing files in a data pipeline
18+
19+
### The Solution
20+
`workflow.NewBatchFuture` provides a robust solution:
21+
22+
**Controlled Concurrency**: Process items in parallel while respecting system limits
23+
**Automatic Error Handling**: Failed activities don't crash the entire batch
24+
**Resource Efficiency**: Optimal throughput without overwhelming downstream services
25+
**Built-in Observability**: Monitoring, retries, and failure tracking
26+
**Workflow Integration**: Seamless integration with Cadence's workflow engine
27+
28+
This eliminates the need to build custom concurrency control, error handling, and monitoring systems.
29+
30+
## Sample behavior
31+
32+
- Creates a configurable number of activities (default: 10)
33+
- Executes them with controlled concurrency (default: 3)
34+
- Simulates work with random delays (900-999ms per activity)
35+
- Handles cancellation gracefully
36+
37+
## Technical considerations
38+
39+
- **Cadence limit**: Maximum 1024 pending activities per workflow
40+
- **Resource management**: Controlled concurrency prevents system overload
41+
- **Error handling**: Failed activities don't crash the entire batch
42+
43+
## How to run
44+
45+
1. Build the sample:
46+
```bash
47+
make batch
48+
```
49+
50+
2. Start Worker:
51+
```bash
52+
./bin/batch -m worker
53+
```
54+
55+
3. Start Workflow:
56+
```bash
57+
./bin/batch -m trigger
58+
```

cmd/samples/batch/main.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"time"
6+
7+
"github.com/pborman/uuid"
8+
"go.uber.org/cadence/client"
9+
"go.uber.org/cadence/worker"
10+
11+
"github.com/uber-common/cadence-samples/cmd/samples/common"
12+
)
13+
14+
// This needs to be done as part of a bootstrap step when the process starts.
15+
// The workers are supposed to be long running.
16+
func startWorkers(h *common.SampleHelper) {
17+
// Configure worker options.
18+
workerOptions := worker.Options{
19+
MetricsScope: h.WorkerMetricScope,
20+
Logger: h.Logger,
21+
FeatureFlags: client.FeatureFlags{
22+
WorkflowExecutionAlreadyCompletedErrorEnabled: true,
23+
},
24+
}
25+
h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
26+
}
27+
28+
func startWorkflow(h *common.SampleHelper) {
29+
workflowOptions := client.StartWorkflowOptions{
30+
ID: "batch_" + uuid.New(),
31+
TaskList: ApplicationName,
32+
ExecutionStartToCloseTimeout: time.Minute * 5,
33+
DecisionTaskStartToCloseTimeout: time.Minute * 5,
34+
}
35+
36+
// Default batch configuration
37+
input := BatchWorkflowInput{
38+
Concurrency: 3,
39+
TotalSize: 10,
40+
}
41+
42+
h.StartWorkflow(workflowOptions, batchWorkflowName, input)
43+
}
44+
45+
func registerWorkflowAndActivity(
46+
h *common.SampleHelper,
47+
) {
48+
h.RegisterWorkflowWithAlias(BatchWorkflow, batchWorkflowName)
49+
h.RegisterActivity(BatchActivity)
50+
}
51+
52+
func main() {
53+
var mode string
54+
flag.StringVar(&mode, "m", "trigger", "Mode is worker or trigger.")
55+
flag.Parse()
56+
57+
var h common.SampleHelper
58+
h.SetupServiceConfig()
59+
60+
switch mode {
61+
case "worker":
62+
registerWorkflowAndActivity(&h)
63+
startWorkers(&h)
64+
65+
// The workers are supposed to be long running process that should not exit.
66+
// Use select{} to block indefinitely for samples, you can quit by CMD+C.
67+
select {}
68+
case "trigger":
69+
startWorkflow(&h)
70+
}
71+
}

cmd/samples/batch/workflow.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package batch
1+
package main
22

33
import (
44
"context"
@@ -7,40 +7,50 @@ import (
77
"time"
88

99
"go.uber.org/cadence/workflow"
10-
"go.uber.org/cadence/x"
1110
)
1211

12+
// ApplicationName is the task list for this sample
13+
const ApplicationName = "batchGroup"
14+
15+
const batchWorkflowName = "batchWorkflow"
16+
1317
type BatchWorkflowInput struct {
1418
Concurrency int
1519
TotalSize int
1620
}
1721

1822
func BatchWorkflow(ctx workflow.Context, input BatchWorkflowInput) error {
23+
// Create activity factories for each task (not yet executed)
1924
factories := make([]func(workflow.Context) workflow.Future, input.TotalSize)
2025
for taskID := 0; taskID < input.TotalSize; taskID++ {
21-
taskID := taskID
26+
taskID := taskID // Capture loop variable for closure
2227
factories[taskID] = func(ctx workflow.Context) workflow.Future {
28+
// Configure activity timeouts
2329
aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
24-
ScheduleToStartTimeout: time.Second * 10,
25-
StartToCloseTimeout: time.Second * 10,
30+
ScheduleToStartTimeout: time.Minute * 1,
31+
StartToCloseTimeout: time.Minute * 1,
2632
})
2733
return workflow.ExecuteActivity(aCtx, BatchActivity, taskID)
2834
}
2935
}
3036

31-
batch, err := x.NewBatchFuture(ctx, input.Concurrency, factories)
37+
// Execute all activities with controlled concurrency
38+
batch, err := workflow.NewBatchFuture(ctx, input.Concurrency, factories)
3239
if err != nil {
3340
return fmt.Errorf("failed to create batch future: %w", err)
3441
}
3542

43+
// Wait for all activities to complete
3644
return batch.Get(ctx, nil)
3745
}
3846

3947
func BatchActivity(ctx context.Context, taskID int) error {
4048
select {
4149
case <-ctx.Done():
50+
// Return error if workflow/activity is cancelled
4251
return fmt.Errorf("batch activity %d failed: %w", taskID, ctx.Err())
4352
case <-time.After(time.Duration(rand.Int63n(100))*time.Millisecond + 900*time.Millisecond):
53+
// Wait for random duration (900-999ms) to simulate work, then return success
4454
return nil
4555
}
4656
}

cmd/samples/batch/workflow_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package batch
1+
package main
22

33
import (
44
"testing"
@@ -8,17 +8,21 @@ import (
88
)
99

1010
func Test_BatchWorkflow(t *testing.T) {
11+
// Create test environment for workflow testing
1112
testSuite := &testsuite.WorkflowTestSuite{}
1213
env := testSuite.NewTestWorkflowEnvironment()
1314

15+
// Register the workflow and activity functions
1416
env.RegisterWorkflow(BatchWorkflow)
1517
env.RegisterActivity(BatchActivity)
1618

19+
// Execute workflow with 3 concurrent workers processing 10 tasks
1720
env.ExecuteWorkflow(BatchWorkflow, BatchWorkflowInput{
18-
Concurrency: 2,
21+
Concurrency: 3,
1922
TotalSize: 10,
2023
})
2124

25+
// Assert workflow completed successfully without errors
2226
assert.True(t, env.IsWorkflowCompleted())
2327
assert.Nil(t, env.GetWorkflowError())
2428
}

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ require (
1414
github.com/uber-go/tally v3.4.3+incompatible
1515
github.com/uber/cadence-idl v0.0.0-20250616185004-cc6f52f87bc6
1616
github.com/uber/jaeger-client-go v2.30.0+incompatible
17-
go.uber.org/cadence v1.3.1-rc.8
17+
go.uber.org/cadence v1.3.1-rc.10
1818
go.uber.org/yarpc v1.60.0
1919
go.uber.org/zap v1.23.0
2020
gopkg.in/yaml.v2 v2.4.0
21+
gopkg.in/yaml.v3 v3.0.1
2122
)
2223

2324
require (
@@ -70,6 +71,5 @@ require (
7071
google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce // indirect
7172
google.golang.org/grpc v1.28.0 // indirect
7273
google.golang.org/protobuf v1.31.0 // indirect
73-
gopkg.in/yaml.v3 v3.0.1 // indirect
7474
honnef.co/go/tools v0.3.2 // indirect
7575
)

go.sum

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
9494
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
9595
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
9696
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
97+
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
9798
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
9899
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
99100
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -239,8 +240,8 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
239240
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
240241
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
241242
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
242-
go.uber.org/cadence v1.3.1-rc.8 h1:9m1h7KZsPqJsNW87iVd3Mgi85PojCPyaUeIdzTjVu3Y=
243-
go.uber.org/cadence v1.3.1-rc.8/go.mod h1:/Lv4o2eahjro+LKjXmuYm20wg5+84t+h2pu0xm7gUfM=
243+
go.uber.org/cadence v1.3.1-rc.10 h1:2ZQba0IPCTcjYtjVaPM1cgnu9ZFvnw2BTC2xrEO25mU=
244+
go.uber.org/cadence v1.3.1-rc.10/go.mod h1:Yf2WaRFj6TtqrUbGRWxLU/8Vou3WPn0M2VIdfEIlEjE=
244245
go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
245246
go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
246247
go.uber.org/dig v1.17.0 h1:5Chju+tUvcC+N7N6EV08BJz41UZuO3BmHcN4A287ZLI=

0 commit comments

Comments
 (0)