|
| 1 | +--- |
| 2 | +title: "Introducing Batch Future with Concurrency Control" |
| 3 | +description: "We're excited to announce Batch Future, a new feature in the Cadence Go client that provides controlled concurrency for bulk operations, preventing overwhelming downstream services while maintaining efficient parallel processing." |
| 4 | +date: 2025-09-25 |
| 5 | +authors: kevinb |
| 6 | +tags: |
| 7 | + - announcement |
| 8 | + - release |
| 9 | +--- |
| 10 | + |
| 11 | +Are you struggling with uncontrolled concurrency when trying to process thousands of activities or child workflows? Do you find yourself hitting rate limits or overwhelming downstream services when running bulk operations? We've got great news for you! |
| 12 | + |
| 13 | +Today, we're thrilled to announce **Batch Future**, a powerful new feature in the Cadence Go client that provides controlled concurrency for bulk operations. You can now process multiple activities in parallel while maintaining precise control over how many run simultaneously. |
| 14 | + |
| 15 | +<!-- truncate --> |
| 16 | + |
| 17 | +## The Problem: Uncontrolled Concurrency |
| 18 | + |
| 19 | +Traditionally, when you need to process multiple items in a Cadence workflow, you'd write something like this: |
| 20 | + |
| 21 | +```go |
| 22 | +func ProcessUsers(ctx workflow.Context, userIDs []string) error { |
| 23 | + var futures []workflow.Future |
| 24 | + for _, userID := range userIDs { |
| 25 | + future := workflow.ExecuteActivity(ctx, UpdateUserActivity, userID) |
| 26 | + futures = append(futures, future) |
| 27 | + } |
| 28 | + |
| 29 | + // Wait for all activities to complete |
| 30 | + for _, future := range futures { |
| 31 | + if err := future.Get(ctx, nil); err != nil { |
| 32 | + return err |
| 33 | + } |
| 34 | + } |
| 35 | + return nil |
| 36 | +} |
| 37 | +``` |
| 38 | + |
| 39 | +This approach works, but it has **uncontrolled concurrency**: |
| 40 | +- All activities start simultaneously, potentially overwhelming downstream services |
| 41 | +- No way to limit concurrent executions |
| 42 | +- Difficult to manage resource usage |
| 43 | +- Can cause rate limiting or timeouts |
| 44 | +- Causing hot shard in Cadence server's task processing |
| 45 | + |
| 46 | +## The Solution: Batch Future |
| 47 | + |
| 48 | +With Batch Future, you can process users with **controlled concurrency**: |
| 49 | + |
| 50 | +```go |
| 51 | +func ProcessUsersBatch(ctx workflow.Context, userIDs []string, concurrency int) error { |
| 52 | + // Create activity factories for each user |
| 53 | + factories := make([]func(workflow.Context) workflow.Future, len(userIDs)) |
| 54 | + for i, userID := range userIDs { |
| 55 | + userID := userID // Capture loop variable for closure |
| 56 | + factories[i] = func(ctx workflow.Context) workflow.Future { |
| 57 | + return workflow.ExecuteActivity(ctx, UpdateUserActivity, userID) |
| 58 | + } |
| 59 | + } |
| 60 | + |
| 61 | + // Execute with controlled concurrency |
| 62 | + batch, err := workflow.NewBatchFuture(ctx, concurrency, factories) |
| 63 | + if err != nil { |
| 64 | + return fmt.Errorf("failed to create batch future: %w", err) |
| 65 | + } |
| 66 | + |
| 67 | + // Wait for all activities to complete |
| 68 | + return batch.Get(ctx, nil) |
| 69 | +} |
| 70 | +``` |
| 71 | + |
| 72 | +## Key Benefits: Controlled Concurrency |
| 73 | + |
| 74 | +Batch Future provides several important advantages: |
| 75 | + |
| 76 | +- **Controlled Concurrency**: Limit simultaneous executions to prevent overwhelming downstream services |
| 77 | +- **Resource Management**: Better control over memory and CPU usage |
| 78 | +- **Rate Limiting Protection**: Avoid hitting API rate limits by controlling execution speed |
| 79 | +- **Graceful Cancellation**: All activities can be cancelled together if needed |
| 80 | +- **Simplified Error Handling**: Single point of failure handling for the entire batch |
| 81 | + |
| 82 | +## Real-World Use Cases |
| 83 | + |
| 84 | +Batch Future is perfect for scenarios like: |
| 85 | + |
| 86 | +### 1. Multi-Service Data Synchronization |
| 87 | +```go |
| 88 | +func SyncProductData(ctx workflow.Context, products []Product) error { |
| 89 | + // Sync to multiple services with different concurrency limits |
| 90 | + inventoryBatch := createBatch(ctx, products, 5, SyncToInventoryActivity) |
| 91 | + searchBatch := createBatch(ctx, products, 3, SyncToSearchActivity) |
| 92 | + analyticsBatch := createBatch(ctx, products, 2, SyncToAnalyticsActivity) |
| 93 | + |
| 94 | + // Wait for all sync operations to complete |
| 95 | + if err := inventoryBatch.Get(ctx, nil); err != nil { |
| 96 | + return fmt.Errorf("inventory sync failed: %w", err) |
| 97 | + } |
| 98 | + if err := searchBatch.Get(ctx, nil); err != nil { |
| 99 | + return fmt.Errorf("search sync failed: %w", err) |
| 100 | + } |
| 101 | + return analyticsBatch.Get(ctx, nil) |
| 102 | +} |
| 103 | + |
| 104 | +func createBatch(ctx workflow.Context, items []Product, concurrency int, activityFunc interface{}) workflow.Future { |
| 105 | + factories := make([]func(workflow.Context) workflow.Future, len(items)) |
| 106 | + for i, item := range items { |
| 107 | + item := item |
| 108 | + factories[i] = func(ctx workflow.Context) workflow.Future { |
| 109 | + return workflow.ExecuteActivity(ctx, activityFunc, item) |
| 110 | + } |
| 111 | + } |
| 112 | + batch, _ := workflow.NewBatchFuture(ctx, concurrency, factories) |
| 113 | + return batch |
| 114 | +} |
| 115 | +``` |
| 116 | + |
| 117 | +### 2. Progressive Data Processing with Different Priorities |
| 118 | +```go |
| 119 | +func ProcessDataWithPriorities(ctx workflow.Context, data []DataItem) error { |
| 120 | + // High priority items get more concurrency |
| 121 | + highPriority := filterByPriority(data, "high") |
| 122 | + lowPriority := filterByPriority(data, "low") |
| 123 | + |
| 124 | + // Process high priority items first with high concurrency |
| 125 | + highBatch, _ := workflow.NewBatchFuture(ctx, 10, createFactories(highPriority, ProcessHighPriorityActivity)) |
| 126 | + |
| 127 | + // Wait for high priority to complete, then process low priority with lower concurrency |
| 128 | + if err := highBatch.Get(ctx, nil); err != nil { |
| 129 | + return err |
| 130 | + } |
| 131 | + |
| 132 | + lowBatch, _ := workflow.NewBatchFuture(ctx, 3, createFactories(lowPriority, ProcessLowPriorityActivity)) |
| 133 | + return lowBatch.Get(ctx, nil) |
| 134 | +} |
| 135 | +``` |
| 136 | + |
| 137 | +### 3. Conditional Batch Processing with Retry Logic |
| 138 | +```go |
| 139 | +func ProcessOrdersWithRetry(ctx workflow.Context, orders []Order) error { |
| 140 | + // First attempt with normal concurrency |
| 141 | + factories := make([]func(workflow.Context) workflow.Future, len(orders)) |
| 142 | + for i, order := range orders { |
| 143 | + order := order |
| 144 | + factories[i] = func(ctx workflow.Context) workflow.Future { |
| 145 | + return workflow.ExecuteActivity(ctx, ProcessOrderActivity, order) |
| 146 | + } |
| 147 | + } |
| 148 | + |
| 149 | + batch, _ := workflow.NewBatchFuture(ctx, 5, factories) |
| 150 | + if err := batch.Get(ctx, nil); err != nil { |
| 151 | + // If batch fails, retry failed orders individually with higher concurrency |
| 152 | + return retryFailedOrders(ctx, orders, 10) |
| 153 | + } |
| 154 | + return nil |
| 155 | +} |
| 156 | +``` |
| 157 | + |
| 158 | +## How It Works Under the Hood |
| 159 | + |
| 160 | +Batch Future leverages Cadence's existing activity infrastructure with controlled concurrency: |
| 161 | + |
| 162 | +1. **Future Factories**: Creates lazy-evaluated future creation functions that aren't scheduled until needed |
| 163 | +2. **Concurrency Control**: Limits the number of pending futures |
| 164 | +3. **Queue Management**: Maintains a queue of to-be-scheduled futures and starts new ones as others complete |
| 165 | +4. **Future Management**: Returns a single future that completes when all futures finish |
| 166 | +5. **Error Propagation**: If any future fails, the error is stored in a multi-error wrapper entity, users can either cancel or fail open |
| 167 | + |
| 168 | +## Getting Started |
| 169 | + |
| 170 | +Ready to supercharge your workflows? Here's how to get started: |
| 171 | + |
| 172 | +### 1. Update Your Go Client |
| 173 | +Make sure you're using the latest version of the Cadence Go client: |
| 174 | + |
| 175 | +```bash |
| 176 | +go get github.com/uber/cadence-go-client@latest |
| 177 | +``` |
| 178 | + |
| 179 | +### 2. Try the Sample |
| 180 | +Check out our [Batch Future sample](https://github.com/cadence-workflow/cadence-samples/tree/master/cmd/samples/batch) to see it in action. |
| 181 | + |
| 182 | +### 3. Migrate Your Workflows (With Caution) |
| 183 | + |
| 184 | +**This is not a simple code change**. Migrating to Batch Future requires workflow versioning and careful production planning. |
| 185 | + |
| 186 | +#### The Challenge |
| 187 | +Batch Future changes your workflow's execution pattern from individual activities to controlled batching. This creates non-deterministic changes that will break existing running workflows without proper versioning. |
| 188 | + |
| 189 | +#### Migration Approaches |
| 190 | + |
| 191 | +**Option A: Versioned Migration (Recommended for Production)** |
| 192 | +- Use [workflow.GetVersion()](https://cadenceworkflow.io/docs/go-client/workflow-versioning) to support both old and new patterns |
| 193 | +- Deploy code that handles both execution patterns |
| 194 | +- Gradually transition new workflows to use Batch Future |
| 195 | +- Clean up old code after all workflows complete |
| 196 | + |
| 197 | +**Option B: New Workflow Type (Simpler but Requires Coordination)** |
| 198 | +- Create a new workflow type specifically for Batch Future |
| 199 | +- Update callers to use the new workflow type |
| 200 | +- Deprecate the old workflow type after migration |
| 201 | + |
| 202 | +**Option C: Workflow Replacement (Not Gradual)** |
| 203 | +- Terminate existing workflows (if acceptable) |
| 204 | +- Deploy new code with Batch Future |
| 205 | +- Start new workflows with the new pattern |
| 206 | + |
| 207 | +#### Testing Strategy |
| 208 | +Before deploying, use [Workflow Shadowing](https://cadenceworkflow.io/docs/go-client/workflow-replay-shadowing) to replay production workflow histories against your new code. This catches compatibility issues before they reach production. |
| 209 | + |
| 210 | +#### Key Considerations |
| 211 | +- **Timeline**: Plan for weeks, not days |
| 212 | +- **Coordination**: Requires careful coordination between teams |
| 213 | +- **Monitoring**: Essential during transition period |
| 214 | +- **Rollback**: Always have a rollback plan ready |
| 215 | +- **Testing**: Extensive testing in staging environment required |
| 216 | + |
| 217 | +#### When NOT to Migrate |
| 218 | +- If you have long-running workflows (weeks/months) |
| 219 | +- If you can't coordinate a proper versioning strategy |
| 220 | +- If the performance benefits don't justify the migration complexity |
| 221 | + |
| 222 | +## Best Practices |
| 223 | + |
| 224 | +- **Choose Appropriate Concurrency**: Start with 3-5 concurrent activities and adjust based on downstream service capacity |
| 225 | +- **Activity Factories**: Always capture loop variables in closures to avoid race conditions |
| 226 | +- **Error Handling**: Implement proper error handling for individual activity failures |
| 227 | +- **Resource Management**: Consider memory usage for large batches |
| 228 | +- **Monitoring**: Use heartbeats for long-running activities within the batch |
| 229 | + |
| 230 | +## Try It Today! |
| 231 | + |
| 232 | +Batch Future is available now in the latest Cadence Go client. We can't wait to see how you use it to optimize your workflows! |
| 233 | + |
| 234 | +Have questions or feedback? Join our [Slack community](http://t.uber.com/cadence-slack) or open an issue on [GitHub](https://github.com/cadence-workflow/cadence-go-client). |
| 235 | + |
| 236 | +Happy coding, and here's to faster, more efficient workflows! |
0 commit comments