diff --git a/Makefile b/Makefile index df5a517d..b3a5e0d2 100644 --- a/Makefile +++ b/Makefile @@ -37,6 +37,7 @@ PROGS = helloworld \ sleep \ dataconverter \ autoscaling-monitoring \ + batch \ TEST_ARG ?= -race -v -timeout 5m BUILD := ./build @@ -74,7 +75,7 @@ TEST_DIRS=./cmd/samples/cron \ ./cmd/samples/recipes/dataconverter \ ./cmd/samples/recovery \ ./cmd/samples/pso \ - + ./cmd/samples/batch \ cancelactivity: go build -o bin/cancelactivity cmd/samples/recipes/cancelactivity/*.go @@ -187,6 +188,21 @@ dataconverter: autoscaling-monitoring: go build -o bin/autoscaling-monitoring cmd/samples/advanced/autoscaling-monitoring/*.go +batch: + go build -o bin/batch cmd/samples/batch/*.go + +test: bins + @rm -f test + @rm -f test.log + @echo $(TEST_DIRS) + @for dir in $(TEST_DIRS); do \ + go test -coverprofile=$@ "$$dir" | tee -a test.log; \ + done; + +clean: + rm -rf bin + rm -Rf $(BUILD) + bins: helloworld \ versioning \ delaystart \ @@ -220,15 +236,4 @@ bins: helloworld \ sleep \ dataconverter \ autoscaling-monitoring \ - -test: bins - @rm -f test - @rm -f test.log - @echo $(TEST_DIRS) - @for dir in $(TEST_DIRS); do \ - go test -coverprofile=$@ "$$dir" | tee -a test.log; \ - done; - -clean: - rm -rf bin - rm -Rf $(BUILD) + batch \ diff --git a/cmd/samples/batch/README.md b/cmd/samples/batch/README.md new file mode 100644 index 00000000..bad129b2 --- /dev/null +++ b/cmd/samples/batch/README.md @@ -0,0 +1,58 @@ +# Batch Processing Sample + +This sample demonstrates how to process large batches of activities with controlled concurrency using Cadence's `workflow.NewBatchFuture` functionality, while respecting the 1024 pending activities limit per workflow. + +## The problem it solves + +**The Problem**: When processing large datasets (thousands of records, files, or API calls), you face a dilemma: +- **Sequential processing**: Too slow, poor user experience +- **Unlimited concurrency**: Overwhelms databases, APIs, or downstream services +- **Manual concurrency control**: Complex error handling and resource management +- **Cadence limits**: Max 1024 pending activities per workflow + +**Real-world scenarios**: +- Processing 10,000 user records for a migration +- Sending emails to 50,000 subscribers +- Generating reports for 1,000 customers +- Processing files in a data pipeline + +### The Solution +`workflow.NewBatchFuture` provides a robust solution: + +**Controlled Concurrency**: Process items in parallel while respecting system limits +**Automatic Error Handling**: Failed activities don't crash the entire batch +**Resource Efficiency**: Optimal throughput without overwhelming downstream services +**Built-in Observability**: Monitoring, retries, and failure tracking +**Workflow Integration**: Seamless integration with Cadence's workflow engine + +This eliminates the need to build custom concurrency control, error handling, and monitoring systems. + +## Sample behavior + +- Creates a configurable number of activities (default: 10) +- Executes them with controlled concurrency (default: 3) +- Simulates work with random delays (900-999ms per activity) +- Handles cancellation gracefully + +## Technical considerations + +- **Cadence limit**: Maximum 1024 pending activities per workflow +- **Resource management**: Controlled concurrency prevents system overload +- **Error handling**: Failed activities don't crash the entire batch + +## How to run + +1. Build the sample: +```bash +make batch +``` + +2. Start Worker: +```bash +./bin/batch -m worker +``` + +3. Start Workflow: +```bash +./bin/batch -m trigger +``` diff --git a/cmd/samples/batch/main.go b/cmd/samples/batch/main.go new file mode 100644 index 00000000..96b0a3f4 --- /dev/null +++ b/cmd/samples/batch/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "flag" + "time" + + "github.com/pborman/uuid" + "go.uber.org/cadence/client" + "go.uber.org/cadence/worker" + + "github.com/uber-common/cadence-samples/cmd/samples/common" +) + +// This needs to be done as part of a bootstrap step when the process starts. +// The workers are supposed to be long running. +func startWorkers(h *common.SampleHelper) { + // Configure worker options. + workerOptions := worker.Options{ + MetricsScope: h.WorkerMetricScope, + Logger: h.Logger, + FeatureFlags: client.FeatureFlags{ + WorkflowExecutionAlreadyCompletedErrorEnabled: true, + }, + } + h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions) +} + +func startWorkflow(h *common.SampleHelper) { + workflowOptions := client.StartWorkflowOptions{ + ID: "batch_" + uuid.New(), + TaskList: ApplicationName, + ExecutionStartToCloseTimeout: time.Minute * 5, + DecisionTaskStartToCloseTimeout: time.Minute * 5, + } + + // Default batch configuration + input := BatchWorkflowInput{ + Concurrency: 3, + TotalSize: 10, + } + + h.StartWorkflow(workflowOptions, batchWorkflowName, input) +} + +func registerWorkflowAndActivity( + h *common.SampleHelper, +) { + h.RegisterWorkflowWithAlias(BatchWorkflow, batchWorkflowName) + h.RegisterActivity(BatchActivity) +} + +func main() { + var mode string + flag.StringVar(&mode, "m", "trigger", "Mode is worker or trigger.") + flag.Parse() + + var h common.SampleHelper + h.SetupServiceConfig() + + switch mode { + case "worker": + registerWorkflowAndActivity(&h) + startWorkers(&h) + + // The workers are supposed to be long running process that should not exit. + // Use select{} to block indefinitely for samples, you can quit by CMD+C. + select {} + case "trigger": + startWorkflow(&h) + } +} diff --git a/cmd/samples/batch/workflow.go b/cmd/samples/batch/workflow.go index 3a1fe88f..56d474ac 100644 --- a/cmd/samples/batch/workflow.go +++ b/cmd/samples/batch/workflow.go @@ -1,4 +1,4 @@ -package batch +package main import ( "context" @@ -7,40 +7,50 @@ import ( "time" "go.uber.org/cadence/workflow" - "go.uber.org/cadence/x" ) +// ApplicationName is the task list for this sample +const ApplicationName = "batchGroup" + +const batchWorkflowName = "batchWorkflow" + type BatchWorkflowInput struct { Concurrency int TotalSize int } func BatchWorkflow(ctx workflow.Context, input BatchWorkflowInput) error { + // Create activity factories for each task (not yet executed) factories := make([]func(workflow.Context) workflow.Future, input.TotalSize) for taskID := 0; taskID < input.TotalSize; taskID++ { - taskID := taskID + taskID := taskID // Capture loop variable for closure factories[taskID] = func(ctx workflow.Context) workflow.Future { + // Configure activity timeouts aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - ScheduleToStartTimeout: time.Second * 10, - StartToCloseTimeout: time.Second * 10, + ScheduleToStartTimeout: time.Minute * 1, + StartToCloseTimeout: time.Minute * 1, }) return workflow.ExecuteActivity(aCtx, BatchActivity, taskID) } } - batch, err := x.NewBatchFuture(ctx, input.Concurrency, factories) + // Execute all activities with controlled concurrency + batch, err := workflow.NewBatchFuture(ctx, input.Concurrency, factories) if err != nil { return fmt.Errorf("failed to create batch future: %w", err) } + // Wait for all activities to complete return batch.Get(ctx, nil) } func BatchActivity(ctx context.Context, taskID int) error { select { case <-ctx.Done(): + // Return error if workflow/activity is cancelled return fmt.Errorf("batch activity %d failed: %w", taskID, ctx.Err()) case <-time.After(time.Duration(rand.Int63n(100))*time.Millisecond + 900*time.Millisecond): + // Wait for random duration (900-999ms) to simulate work, then return success return nil } } diff --git a/cmd/samples/batch/workflow_test.go b/cmd/samples/batch/workflow_test.go index d560839c..440c51f3 100644 --- a/cmd/samples/batch/workflow_test.go +++ b/cmd/samples/batch/workflow_test.go @@ -1,4 +1,4 @@ -package batch +package main import ( "testing" @@ -8,17 +8,21 @@ import ( ) func Test_BatchWorkflow(t *testing.T) { + // Create test environment for workflow testing testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment() + // Register the workflow and activity functions env.RegisterWorkflow(BatchWorkflow) env.RegisterActivity(BatchActivity) + // Execute workflow with 3 concurrent workers processing 10 tasks env.ExecuteWorkflow(BatchWorkflow, BatchWorkflowInput{ - Concurrency: 2, + Concurrency: 3, TotalSize: 10, }) + // Assert workflow completed successfully without errors assert.True(t, env.IsWorkflowCompleted()) assert.Nil(t, env.GetWorkflowError()) } diff --git a/go.mod b/go.mod index 6a19054b..ab5089b8 100644 --- a/go.mod +++ b/go.mod @@ -14,10 +14,11 @@ require ( github.com/uber-go/tally v3.4.3+incompatible github.com/uber/cadence-idl v0.0.0-20250616185004-cc6f52f87bc6 github.com/uber/jaeger-client-go v2.30.0+incompatible - go.uber.org/cadence v1.3.1-rc.8 + go.uber.org/cadence v1.3.1-rc.10 go.uber.org/yarpc v1.60.0 go.uber.org/zap v1.23.0 gopkg.in/yaml.v2 v2.4.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -70,6 +71,5 @@ require ( google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce // indirect google.golang.org/grpc v1.28.0 // indirect google.golang.org/protobuf v1.31.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect honnef.co/go/tools v0.3.2 // indirect ) diff --git a/go.sum b/go.sum index 13e7a7e6..c4a182f3 100644 --- a/go.sum +++ b/go.sum @@ -94,6 +94,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -239,8 +240,8 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/cadence v1.3.1-rc.8 h1:9m1h7KZsPqJsNW87iVd3Mgi85PojCPyaUeIdzTjVu3Y= -go.uber.org/cadence v1.3.1-rc.8/go.mod h1:/Lv4o2eahjro+LKjXmuYm20wg5+84t+h2pu0xm7gUfM= +go.uber.org/cadence v1.3.1-rc.10 h1:2ZQba0IPCTcjYtjVaPM1cgnu9ZFvnw2BTC2xrEO25mU= +go.uber.org/cadence v1.3.1-rc.10/go.mod h1:Yf2WaRFj6TtqrUbGRWxLU/8Vou3WPn0M2VIdfEIlEjE= go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= go.uber.org/dig v1.17.0 h1:5Chju+tUvcC+N7N6EV08BJz41UZuO3BmHcN4A287ZLI=