Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 14 additions & 35 deletions cmd/samples/batch/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"go.uber.org/cadence/workflow"
"go.uber.org/multierr"
"go.uber.org/cadence/x"
)

type BatchWorkflowInput struct {
Expand All @@ -16,45 +16,24 @@ type BatchWorkflowInput struct {
}

func BatchWorkflow(ctx workflow.Context, input BatchWorkflowInput) error {
wg := workflow.NewWaitGroup(ctx)

buffered := workflow.NewBufferedChannel(ctx, input.Concurrency)
futures := workflow.NewNamedChannel(ctx, "futures")

var errs error
wg.Add(1)
// task result collector
workflow.Go(ctx, func(ctx workflow.Context) {
defer wg.Done()
for {
var future workflow.Future
ok := futures.Receive(ctx, &future)
if !ok {
break
}
err := future.Get(ctx, nil)
errs = multierr.Append(errs, err)
buffered.Receive(ctx, nil)
}
})

// submit all tasks
factories := make([]func(workflow.Context) workflow.Future, input.TotalSize)
for taskID := 0; taskID < input.TotalSize; taskID++ {
taskID := taskID
buffered.Send(ctx, nil)

aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToStartTimeout: time.Second * 10,
StartToCloseTimeout: time.Second * 10,
})
futures.Send(ctx, workflow.ExecuteActivity(aCtx, BatchActivity, taskID))
factories[taskID] = func(ctx workflow.Context) workflow.Future {
aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToStartTimeout: time.Second * 10,
StartToCloseTimeout: time.Second * 10,
})
return workflow.ExecuteActivity(aCtx, BatchActivity, taskID)
}
}
// close the channel to signal the task result collector that no more tasks are coming
futures.Close()

wg.Wait(ctx)
batch, err := x.NewBatchFuture(ctx, input.Concurrency, factories)
if err != nil {
return fmt.Errorf("failed to create batch future: %w", err)
}

return errs
return batch.Get(ctx, nil)
}

func BatchActivity(ctx context.Context, taskID int) error {
Expand Down
28 changes: 10 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,43 +1,41 @@
module github.com/uber-common/cadence-samples

go 1.18
go 1.21

toolchain go1.24.2

require (
github.com/google/uuid v1.3.0
github.com/m3db/prometheus_client_golang v0.8.1
github.com/opentracing/opentracing-go v1.2.0
github.com/pborman/uuid v1.2.1
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.9.0
github.com/uber-go/tally v3.4.3+incompatible
github.com/uber/cadence-idl v0.0.0-20230905165949-03586319b849
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f
github.com/uber/jaeger-client-go v2.30.0+incompatible
go.uber.org/cadence v1.2.9
go.uber.org/cadence v1.2.10-rc.13
go.uber.org/multierr v1.6.0
go.uber.org/yarpc v1.60.0
go.uber.org/zap v1.23.0
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/BurntSushi/toml v0.4.1 // indirect
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/cristalhq/jwt/v3 v3.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/fatih/structtag v1.2.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/googleapis v1.3.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.0 // indirect
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
github.com/golang/mock v1.5.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/jessevdk/go-flags v1.4.0 // indirect
github.com/kisielk/errcheck v1.5.0 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/m3db/prometheus_client_model v0.1.0 // indirect
github.com/m3db/prometheus_common v0.1.0 // indirect
github.com/m3db/prometheus_procfs v0.8.1 // indirect
Expand All @@ -49,19 +47,14 @@ require (
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/twmb/murmur3 v1.1.6 // indirect
github.com/uber-go/mapdecode v1.0.0 // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/uber/tchannel-go v1.32.1 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/fx v1.13.1 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/net/metrics v1.3.0 // indirect
go.uber.org/thriftrw v1.29.2 // indirect
golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e // indirect
Expand All @@ -73,7 +66,6 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20170927054726-6dc17368e09b // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce // indirect
google.golang.org/grpc v1.28.0 // indirect
Expand Down
Loading