From e442ce45e646433429133c2b88af312ddffc3d07 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Mon, 12 May 2025 13:35:40 -0700 Subject: [PATCH 1/2] Add sample of batch executions --- cmd/samples/batch/workflow.go | 66 ++++++++++++++++++++++++++++++ cmd/samples/batch/workflow_test.go | 24 +++++++++++ 2 files changed, 90 insertions(+) create mode 100644 cmd/samples/batch/workflow.go create mode 100644 cmd/samples/batch/workflow_test.go diff --git a/cmd/samples/batch/workflow.go b/cmd/samples/batch/workflow.go new file mode 100644 index 00000000..24726f4f --- /dev/null +++ b/cmd/samples/batch/workflow.go @@ -0,0 +1,66 @@ +package batch + +import ( + "context" + "fmt" + "math/rand" + "time" + + "go.uber.org/cadence/workflow" + "go.uber.org/multierr" +) + +type BatchWorkflowInput struct { + Concurrency int + TotalSize int +} + +func BatchWorkflow(ctx workflow.Context, input BatchWorkflowInput) error { + wg := workflow.NewWaitGroup(ctx) + + buffered := workflow.NewBufferedChannel(ctx, input.Concurrency) + futures := workflow.NewNamedChannel(ctx, "futures") + + var errs error + wg.Add(1) + // task result collector + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + for { + var future workflow.Future + ok := futures.Receive(ctx, &future) + if !ok { + break + } + err := future.Get(ctx, nil) + errs = multierr.Append(errs, err) + buffered.Receive(ctx, nil) + } + }) + + // submit all tasks + for taskID := 0; taskID < input.TotalSize; taskID++ { + taskID := taskID + buffered.Send(ctx, nil) + + aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Second * 10, + StartToCloseTimeout: time.Second * 10, + }) + futures.Send(ctx, workflow.ExecuteActivity(aCtx, BatchActivity, taskID)) + } + futures.Close() + + wg.Wait(ctx) + + return errs +} + +func BatchActivity(ctx context.Context, taskID int) error { + select { + case <-ctx.Done(): + return fmt.Errorf("batch activity %d failed: %w", taskID, ctx.Err()) + case <-time.After(time.Duration(rand.Int63n(100))*time.Millisecond + 900*time.Millisecond): + return nil + } +} diff --git a/cmd/samples/batch/workflow_test.go b/cmd/samples/batch/workflow_test.go new file mode 100644 index 00000000..d560839c --- /dev/null +++ b/cmd/samples/batch/workflow_test.go @@ -0,0 +1,24 @@ +package batch + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/cadence/testsuite" +) + +func Test_BatchWorkflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + env.RegisterWorkflow(BatchWorkflow) + env.RegisterActivity(BatchActivity) + + env.ExecuteWorkflow(BatchWorkflow, BatchWorkflowInput{ + Concurrency: 2, + TotalSize: 10, + }) + + assert.True(t, env.IsWorkflowCompleted()) + assert.Nil(t, env.GetWorkflowError()) +} From f49114ab1c2fe4c63fe8d6b7b7c29946de8691dd Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Mon, 12 May 2025 14:03:57 -0700 Subject: [PATCH 2/2] add comment --- cmd/samples/batch/workflow.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/samples/batch/workflow.go b/cmd/samples/batch/workflow.go index 24726f4f..ab8ba91f 100644 --- a/cmd/samples/batch/workflow.go +++ b/cmd/samples/batch/workflow.go @@ -49,6 +49,7 @@ func BatchWorkflow(ctx workflow.Context, input BatchWorkflowInput) error { }) futures.Send(ctx, workflow.ExecuteActivity(aCtx, BatchActivity, taskID)) } + // close the channel to signal the task result collector that no more tasks are coming futures.Close() wg.Wait(ctx)