Skip to content

Commit 56eff53

Browse files
Merge branch 'master' into bugfix/adding-blocks-sample
2 parents 114d0cd + 6667d89 commit 56eff53

File tree

5 files changed

+169
-21
lines changed

5 files changed

+169
-21
lines changed

Makefile

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ PROGS = helloworld \
3838
sleep \
3939
dataconverter \
4040
autoscaling-monitoring \
41+
batch \
4142

4243
TEST_ARG ?= -race -v -timeout 5m
4344
BUILD := ./build
@@ -76,7 +77,7 @@ TEST_DIRS=./cmd/samples/cron \
7677
./cmd/samples/recipes/dataconverter \
7778
./cmd/samples/recovery \
7879
./cmd/samples/pso \
79-
80+
./cmd/samples/batch \
8081

8182
cancelactivity:
8283
go build -o bin/cancelactivity cmd/samples/recipes/cancelactivity/*.go
@@ -192,6 +193,21 @@ dataconverter:
192193
autoscaling-monitoring:
193194
go build -o bin/autoscaling-monitoring cmd/samples/advanced/autoscaling-monitoring/*.go
194195

196+
batch:
197+
go build -o bin/batch cmd/samples/batch/*.go
198+
199+
test: bins
200+
@rm -f test
201+
@rm -f test.log
202+
@echo $(TEST_DIRS)
203+
@for dir in $(TEST_DIRS); do \
204+
go test -coverprofile=$@ "$$dir" | tee -a test.log; \
205+
done;
206+
207+
clean:
208+
rm -rf bin
209+
rm -Rf $(BUILD)
210+
195211
bins: helloworld \
196212
blocks \
197213
versioning \
@@ -226,15 +242,4 @@ bins: helloworld \
226242
sleep \
227243
dataconverter \
228244
autoscaling-monitoring \
229-
230-
test: bins
231-
@rm -f test
232-
@rm -f test.log
233-
@echo $(TEST_DIRS)
234-
@for dir in $(TEST_DIRS); do \
235-
go test -coverprofile=$@ "$$dir" | tee -a test.log; \
236-
done;
237-
238-
clean:
239-
rm -rf bin
240-
rm -Rf $(BUILD)
245+
batch \

cmd/samples/batch/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Batch Processing Sample
2+
3+
This sample demonstrates how to process large batches of activities with controlled concurrency using Cadence's `workflow.NewBatchFuture` functionality, while respecting the 1024 pending activities limit per workflow.
4+
5+
## The problem it solves
6+
7+
**The Problem**: When processing large datasets (thousands of records, files, or API calls), you face a dilemma:
8+
- **Sequential processing**: Too slow, poor user experience
9+
- **Unlimited concurrency**: Overwhelms databases, APIs, or downstream services
10+
- **Manual concurrency control**: Complex error handling and resource management
11+
- **Cadence limits**: Max 1024 pending activities per workflow
12+
13+
**Real-world scenarios**:
14+
- Processing 10,000 user records for a migration
15+
- Sending emails to 50,000 subscribers
16+
- Generating reports for 1,000 customers
17+
- Processing files in a data pipeline
18+
19+
### The Solution
20+
`workflow.NewBatchFuture` provides a robust solution:
21+
22+
**Controlled Concurrency**: Process items in parallel while respecting system limits
23+
**Automatic Error Handling**: Failed activities don't crash the entire batch
24+
**Resource Efficiency**: Optimal throughput without overwhelming downstream services
25+
**Built-in Observability**: Monitoring, retries, and failure tracking
26+
**Workflow Integration**: Seamless integration with Cadence's workflow engine
27+
28+
This eliminates the need to build custom concurrency control, error handling, and monitoring systems.
29+
30+
## Sample behavior
31+
32+
- Creates a configurable number of activities (default: 10)
33+
- Executes them with controlled concurrency (default: 3)
34+
- Simulates work with random delays (900-999ms per activity)
35+
- Handles cancellation gracefully
36+
37+
## Technical considerations
38+
39+
- **Cadence limit**: Maximum 1024 pending activities per workflow
40+
- **Resource management**: Controlled concurrency prevents system overload
41+
- **Error handling**: Failed activities don't crash the entire batch
42+
43+
## How to run
44+
45+
1. Build the sample:
46+
```bash
47+
make batch
48+
```
49+
50+
2. Start Worker:
51+
```bash
52+
./bin/batch -m worker
53+
```
54+
55+
3. Start Workflow:
56+
```bash
57+
./bin/batch -m trigger
58+
```

cmd/samples/batch/main.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"time"
6+
7+
"github.com/pborman/uuid"
8+
"go.uber.org/cadence/client"
9+
"go.uber.org/cadence/worker"
10+
11+
"github.com/uber-common/cadence-samples/cmd/samples/common"
12+
)
13+
14+
// This needs to be done as part of a bootstrap step when the process starts.
15+
// The workers are supposed to be long running.
16+
func startWorkers(h *common.SampleHelper) {
17+
// Configure worker options.
18+
workerOptions := worker.Options{
19+
MetricsScope: h.WorkerMetricScope,
20+
Logger: h.Logger,
21+
FeatureFlags: client.FeatureFlags{
22+
WorkflowExecutionAlreadyCompletedErrorEnabled: true,
23+
},
24+
}
25+
h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
26+
}
27+
28+
func startWorkflow(h *common.SampleHelper) {
29+
workflowOptions := client.StartWorkflowOptions{
30+
ID: "batch_" + uuid.New(),
31+
TaskList: ApplicationName,
32+
ExecutionStartToCloseTimeout: time.Minute * 5,
33+
DecisionTaskStartToCloseTimeout: time.Minute * 5,
34+
}
35+
36+
// Default batch configuration
37+
input := BatchWorkflowInput{
38+
Concurrency: 3,
39+
TotalSize: 10,
40+
}
41+
42+
h.StartWorkflow(workflowOptions, batchWorkflowName, input)
43+
}
44+
45+
func registerWorkflowAndActivity(
46+
h *common.SampleHelper,
47+
) {
48+
h.RegisterWorkflowWithAlias(BatchWorkflow, batchWorkflowName)
49+
h.RegisterActivity(BatchActivity)
50+
}
51+
52+
func main() {
53+
var mode string
54+
flag.StringVar(&mode, "m", "trigger", "Mode is worker or trigger.")
55+
flag.Parse()
56+
57+
var h common.SampleHelper
58+
h.SetupServiceConfig()
59+
60+
switch mode {
61+
case "worker":
62+
registerWorkflowAndActivity(&h)
63+
startWorkers(&h)
64+
65+
// The workers are supposed to be long running process that should not exit.
66+
// Use select{} to block indefinitely for samples, you can quit by CMD+C.
67+
select {}
68+
case "trigger":
69+
startWorkflow(&h)
70+
}
71+
}

cmd/samples/batch/workflow.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package batch
1+
package main
22

33
import (
44
"context"
@@ -7,40 +7,50 @@ import (
77
"time"
88

99
"go.uber.org/cadence/workflow"
10-
"go.uber.org/cadence/x"
1110
)
1211

12+
// ApplicationName is the task list for this sample
13+
const ApplicationName = "batchGroup"
14+
15+
const batchWorkflowName = "batchWorkflow"
16+
1317
type BatchWorkflowInput struct {
1418
Concurrency int
1519
TotalSize int
1620
}
1721

1822
func BatchWorkflow(ctx workflow.Context, input BatchWorkflowInput) error {
23+
// Create activity factories for each task (not yet executed)
1924
factories := make([]func(workflow.Context) workflow.Future, input.TotalSize)
2025
for taskID := 0; taskID < input.TotalSize; taskID++ {
21-
taskID := taskID
26+
taskID := taskID // Capture loop variable for closure
2227
factories[taskID] = func(ctx workflow.Context) workflow.Future {
28+
// Configure activity timeouts
2329
aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
24-
ScheduleToStartTimeout: time.Second * 10,
25-
StartToCloseTimeout: time.Second * 10,
30+
ScheduleToStartTimeout: time.Minute * 1,
31+
StartToCloseTimeout: time.Minute * 1,
2632
})
2733
return workflow.ExecuteActivity(aCtx, BatchActivity, taskID)
2834
}
2935
}
3036

31-
batch, err := x.NewBatchFuture(ctx, input.Concurrency, factories)
37+
// Execute all activities with controlled concurrency
38+
batch, err := workflow.NewBatchFuture(ctx, input.Concurrency, factories)
3239
if err != nil {
3340
return fmt.Errorf("failed to create batch future: %w", err)
3441
}
3542

43+
// Wait for all activities to complete
3644
return batch.Get(ctx, nil)
3745
}
3846

3947
func BatchActivity(ctx context.Context, taskID int) error {
4048
select {
4149
case <-ctx.Done():
50+
// Return error if workflow/activity is cancelled
4251
return fmt.Errorf("batch activity %d failed: %w", taskID, ctx.Err())
4352
case <-time.After(time.Duration(rand.Int63n(100))*time.Millisecond + 900*time.Millisecond):
53+
// Wait for random duration (900-999ms) to simulate work, then return success
4454
return nil
4555
}
4656
}

cmd/samples/batch/workflow_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package batch
1+
package main
22

33
import (
44
"testing"
@@ -8,17 +8,21 @@ import (
88
)
99

1010
func Test_BatchWorkflow(t *testing.T) {
11+
// Create test environment for workflow testing
1112
testSuite := &testsuite.WorkflowTestSuite{}
1213
env := testSuite.NewTestWorkflowEnvironment()
1314

15+
// Register the workflow and activity functions
1416
env.RegisterWorkflow(BatchWorkflow)
1517
env.RegisterActivity(BatchActivity)
1618

19+
// Execute workflow with 3 concurrent workers processing 10 tasks
1720
env.ExecuteWorkflow(BatchWorkflow, BatchWorkflowInput{
18-
Concurrency: 2,
21+
Concurrency: 3,
1922
TotalSize: 10,
2023
})
2124

25+
// Assert workflow completed successfully without errors
2226
assert.True(t, env.IsWorkflowCompleted())
2327
assert.Nil(t, env.GetWorkflowError())
2428
}

0 commit comments

Comments
 (0)