@@ -3,6 +3,7 @@ package main
33import (
44 "context"
55 "fmt"
6+ "math/rand"
67 "time"
78
89 "go.uber.org/cadence/workflow"
@@ -20,31 +21,36 @@ type BatchWorkflowInput struct {
2021}
2122
2223func BatchWorkflow (ctx workflow.Context , input BatchWorkflowInput ) error {
24+ // Create activity factories for each task (not yet executed)
2325 factories := make ([]func (workflow.Context ) workflow.Future , input .TotalSize )
2426 for taskID := 0 ; taskID < input .TotalSize ; taskID ++ {
25- taskID := taskID
27+ taskID := taskID // Capture loop variable for closure
2628 factories [taskID ] = func (ctx workflow.Context ) workflow.Future {
29+ // Configure activity timeouts
2730 aCtx := workflow .WithActivityOptions (ctx , workflow.ActivityOptions {
28- ScheduleToStartTimeout : time .Minute * 10 ,
29- StartToCloseTimeout : time .Minute * 10 ,
31+ ScheduleToStartTimeout : time .Minute * 1 ,
32+ StartToCloseTimeout : time .Minute * 1 ,
3033 })
3134 return workflow .ExecuteActivity (aCtx , BatchActivity , taskID )
3235 }
3336 }
3437
38+ // Execute all activities with controlled concurrency
3539 batch , err := x .NewBatchFuture (ctx , input .Concurrency , factories )
3640 if err != nil {
3741 return fmt .Errorf ("failed to create batch future: %w" , err )
3842 }
3943
44+ // Wait for all activities to complete
4045 return batch .Get (ctx , nil )
4146}
4247
4348func BatchActivity (ctx context.Context , taskID int ) error {
4449 select {
4550 case <- ctx .Done ():
51+ // Return error if workflow/activity is cancelled
4652 return fmt .Errorf ("batch activity %d failed: %w" , taskID , ctx .Err ())
47- case <- time .After (time .Duration (10000 ) * time .Millisecond ):
53+ case <- time .After (time .Duration (rand . Int63n ( 100 )) * time . Millisecond + 900 * time .Millisecond ):
4854 return nil
4955 }
5056}
0 commit comments