Skip to content

Commit 39a3f74

Browse files
authored
upgrade client version and update batch example (#95)
Update batch example with use NewBatchFuture API upgrade client version
1 parent 46ab0b1 commit 39a3f74

File tree

3 files changed

+47
-116
lines changed

3 files changed

+47
-116
lines changed

cmd/samples/batch/workflow.go

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"time"
88

99
"go.uber.org/cadence/workflow"
10-
"go.uber.org/multierr"
10+
"go.uber.org/cadence/x"
1111
)
1212

1313
type BatchWorkflowInput struct {
@@ -16,45 +16,24 @@ type BatchWorkflowInput struct {
1616
}
1717

1818
func BatchWorkflow(ctx workflow.Context, input BatchWorkflowInput) error {
19-
wg := workflow.NewWaitGroup(ctx)
20-
21-
buffered := workflow.NewBufferedChannel(ctx, input.Concurrency)
22-
futures := workflow.NewNamedChannel(ctx, "futures")
23-
24-
var errs error
25-
wg.Add(1)
26-
// task result collector
27-
workflow.Go(ctx, func(ctx workflow.Context) {
28-
defer wg.Done()
29-
for {
30-
var future workflow.Future
31-
ok := futures.Receive(ctx, &future)
32-
if !ok {
33-
break
34-
}
35-
err := future.Get(ctx, nil)
36-
errs = multierr.Append(errs, err)
37-
buffered.Receive(ctx, nil)
38-
}
39-
})
40-
41-
// submit all tasks
19+
factories := make([]func(workflow.Context) workflow.Future, input.TotalSize)
4220
for taskID := 0; taskID < input.TotalSize; taskID++ {
4321
taskID := taskID
44-
buffered.Send(ctx, nil)
45-
46-
aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
47-
ScheduleToStartTimeout: time.Second * 10,
48-
StartToCloseTimeout: time.Second * 10,
49-
})
50-
futures.Send(ctx, workflow.ExecuteActivity(aCtx, BatchActivity, taskID))
22+
factories[taskID] = func(ctx workflow.Context) workflow.Future {
23+
aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
24+
ScheduleToStartTimeout: time.Second * 10,
25+
StartToCloseTimeout: time.Second * 10,
26+
})
27+
return workflow.ExecuteActivity(aCtx, BatchActivity, taskID)
28+
}
5129
}
52-
// close the channel to signal the task result collector that no more tasks are coming
53-
futures.Close()
5430

55-
wg.Wait(ctx)
31+
batch, err := x.NewBatchFuture(ctx, input.Concurrency, factories)
32+
if err != nil {
33+
return fmt.Errorf("failed to create batch future: %w", err)
34+
}
5635

57-
return errs
36+
return batch.Get(ctx, nil)
5837
}
5938

6039
func BatchActivity(ctx context.Context, taskID int) error {

go.mod

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,41 @@
11
module github.com/uber-common/cadence-samples
22

3-
go 1.18
3+
go 1.21
4+
5+
toolchain go1.24.2
46

57
require (
68
github.com/google/uuid v1.3.0
79
github.com/m3db/prometheus_client_golang v0.8.1
810
github.com/opentracing/opentracing-go v1.2.0
911
github.com/pborman/uuid v1.2.1
1012
github.com/pkg/errors v0.9.1
11-
github.com/stretchr/testify v1.8.1
13+
github.com/stretchr/testify v1.9.0
1214
github.com/uber-go/tally v3.4.3+incompatible
13-
github.com/uber/cadence-idl v0.0.0-20230905165949-03586319b849
15+
github.com/uber/cadence-idl v0.0.0-20241126065313-57bd6876d48f
1416
github.com/uber/jaeger-client-go v2.30.0+incompatible
15-
go.uber.org/cadence v1.2.9
17+
go.uber.org/cadence v1.2.10-rc.13
18+
go.uber.org/multierr v1.6.0
1619
go.uber.org/yarpc v1.60.0
1720
go.uber.org/zap v1.23.0
1821
gopkg.in/yaml.v2 v2.4.0
1922
)
2023

2124
require (
2225
github.com/BurntSushi/toml v0.4.1 // indirect
23-
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect
2426
github.com/apache/thrift v0.16.0 // indirect
2527
github.com/benbjohnson/clock v1.3.0 // indirect
2628
github.com/beorn7/perks v1.0.1 // indirect
2729
github.com/cespare/xxhash/v2 v2.1.1 // indirect
28-
github.com/cristalhq/jwt/v3 v3.1.0 // indirect
2930
github.com/davecgh/go-spew v1.1.1 // indirect
3031
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
31-
github.com/fatih/structtag v1.2.0 // indirect
32-
github.com/go-ole/go-ole v1.2.6 // indirect
3332
github.com/gogo/googleapis v1.3.2 // indirect
3433
github.com/gogo/protobuf v1.3.2 // indirect
3534
github.com/gogo/status v1.1.0 // indirect
3635
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
3736
github.com/golang/mock v1.5.0 // indirect
3837
github.com/golang/protobuf v1.5.3 // indirect
39-
github.com/jessevdk/go-flags v1.4.0 // indirect
40-
github.com/kisielk/errcheck v1.5.0 // indirect
38+
github.com/jonboulle/clockwork v0.4.0 // indirect
4139
github.com/m3db/prometheus_client_model v0.1.0 // indirect
4240
github.com/m3db/prometheus_common v0.1.0 // indirect
4341
github.com/m3db/prometheus_procfs v0.8.1 // indirect
@@ -49,19 +47,14 @@ require (
4947
github.com/prometheus/common v0.26.0 // indirect
5048
github.com/prometheus/procfs v0.6.0 // indirect
5149
github.com/robfig/cron v1.2.0 // indirect
52-
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
53-
github.com/stretchr/objx v0.5.0 // indirect
54-
github.com/tklauser/go-sysconf v0.3.11 // indirect
55-
github.com/tklauser/numcpus v0.6.0 // indirect
50+
github.com/stretchr/objx v0.5.2 // indirect
5651
github.com/twmb/murmur3 v1.1.6 // indirect
5752
github.com/uber-go/mapdecode v1.0.0 // indirect
5853
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
5954
github.com/uber/tchannel-go v1.32.1 // indirect
60-
github.com/yusufpapurcu/wmi v1.2.3 // indirect
61-
go.uber.org/atomic v1.9.0 // indirect
55+
go.uber.org/atomic v1.11.0 // indirect
6256
go.uber.org/dig v1.17.0 // indirect
6357
go.uber.org/fx v1.13.1 // indirect
64-
go.uber.org/multierr v1.6.0 // indirect
6558
go.uber.org/net/metrics v1.3.0 // indirect
6659
go.uber.org/thriftrw v1.29.2 // indirect
6760
golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e // indirect
@@ -73,7 +66,6 @@ require (
7366
golang.org/x/text v0.14.0 // indirect
7467
golang.org/x/time v0.0.0-20170927054726-6dc17368e09b // indirect
7568
golang.org/x/tools v0.6.0 // indirect
76-
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
7769
google.golang.org/appengine v1.6.7 // indirect
7870
google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce // indirect
7971
google.golang.org/grpc v1.28.0 // indirect

0 commit comments

Comments
 (0)