Skip to content

Commit 291e3dd

Browse files
authored
add NewBatchFuture API (#1426)
Why? Fanout pattern is common in user workflows. However, it's only possible to fanout <100 futures concurrently. Anything more than that would cause history service to stuck processing. This BatchFuture caps the number of concurrent futures and thus promotes the correct usage. How did you test it? Unit Test in test environment Detailed Description Added NewBatchFuture API for batch use cases Added BatchFuture interface Impact Analysis Backward Compatibility: yes Forward Compatibility: yes Testing Plan Unit Tests: Yes Persistence Tests: Not related Integration Tests: No Compatibility Tests: No due to new API change only Rollout Plan What is the rollout plan? deploy new version Does the order of deployment matter? No Is it safe to rollback? Does the order of rollback matter? Yes Is there a kill switch to mitigate the impact immediately? No
1 parent 221cc4e commit 291e3dd

File tree

4 files changed

+473
-0
lines changed

4 files changed

+473
-0
lines changed

internal/batch/batch_future.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package batch
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
7+
"go.uber.org/multierr"
8+
9+
"go.uber.org/cadence/internal"
10+
)
11+
12+
// BatchFuture is an implementation of public BatchFuture interface.
13+
type BatchFuture struct {
14+
futures []internal.Future
15+
settables []internal.Settable
16+
factories []func(ctx internal.Context) internal.Future
17+
batchSize int
18+
19+
// state
20+
wg internal.WaitGroup
21+
}
22+
23+
func NewBatchFuture(ctx internal.Context, batchSize int, factories []func(ctx internal.Context) internal.Future) (*BatchFuture, error) {
24+
var futures []internal.Future
25+
var settables []internal.Settable
26+
for range factories {
27+
future, settable := internal.NewFuture(ctx)
28+
futures = append(futures, future)
29+
settables = append(settables, settable)
30+
}
31+
32+
batchFuture := &BatchFuture{
33+
futures: futures,
34+
settables: settables,
35+
factories: factories,
36+
batchSize: batchSize,
37+
38+
wg: internal.NewWaitGroup(ctx),
39+
}
40+
batchFuture.start(ctx)
41+
return batchFuture, nil
42+
}
43+
44+
func (b *BatchFuture) GetFutures() []internal.Future {
45+
return b.futures
46+
}
47+
48+
func (b *BatchFuture) start(ctx internal.Context) {
49+
50+
semaphore := internal.NewBufferedChannel(ctx, b.batchSize) // buffered workChan to limit the number of concurrent futures
51+
workChan := internal.NewNamedChannel(ctx, "batch-future-channel")
52+
b.wg.Add(1)
53+
internal.GoNamed(ctx, "batch-future-submitter", func(ctx internal.Context) {
54+
defer b.wg.Done()
55+
56+
for i := range b.factories {
57+
semaphore.Send(ctx, nil)
58+
workChan.Send(ctx, i)
59+
}
60+
workChan.Close()
61+
})
62+
63+
b.wg.Add(1)
64+
internal.GoNamed(ctx, "batch-future-processor", func(ctx internal.Context) {
65+
defer b.wg.Done()
66+
67+
wgForFutures := internal.NewWaitGroup(ctx)
68+
69+
var idx int
70+
for workChan.Receive(ctx, &idx) {
71+
idx := idx
72+
73+
wgForFutures.Add(1)
74+
internal.GoNamed(ctx, fmt.Sprintf("batch-future-processor-one-future-%d", idx), func(ctx internal.Context) {
75+
defer wgForFutures.Done()
76+
77+
// fork a future and chain it to the processed future for user to get the result
78+
f := b.factories[idx](ctx)
79+
b.settables[idx].Chain(f)
80+
81+
// error handling is not needed here because the result is chained to the settable
82+
f.Get(ctx, nil)
83+
semaphore.Receive(ctx, nil)
84+
})
85+
}
86+
wgForFutures.Wait(ctx)
87+
})
88+
}
89+
90+
func (b *BatchFuture) IsReady() bool {
91+
for _, future := range b.futures {
92+
if !future.IsReady() {
93+
return false
94+
}
95+
}
96+
return true
97+
}
98+
99+
// Get assigns the result of the futures to the valuePtr.
100+
// NOTE: valuePtr must be a pointer to a slice, or nil.
101+
// If valuePtr is a pointer to a slice, the slice will be resized to the length of the futures. Each element of the slice will be assigned with the underlying Future.Get() and thus behaves the same way.
102+
// If valuePtr is nil, no assignment will be made.
103+
// If error occurs, values will be set on successful futures and the errors of failed futures will be returned.
104+
func (b *BatchFuture) Get(ctx internal.Context, valuePtr interface{}) error {
105+
// No assignment if valuePtr is nil
106+
if valuePtr == nil {
107+
b.wg.Wait(ctx)
108+
var errs error
109+
for i := range b.futures {
110+
errs = multierr.Append(errs, b.futures[i].Get(ctx, nil))
111+
}
112+
return errs
113+
}
114+
115+
v := reflect.ValueOf(valuePtr)
116+
if v.Kind() != reflect.Ptr || v.Elem().Kind() != reflect.Slice {
117+
return fmt.Errorf("valuePtr must be a pointer to a slice, got %v", v.Kind())
118+
}
119+
120+
// resize the slice to the length of the futures
121+
slice := v.Elem()
122+
if slice.Cap() < len(b.futures) {
123+
slice.Grow(len(b.futures) - slice.Cap())
124+
}
125+
slice.SetLen(len(b.futures))
126+
127+
// wait for all futures to be ready
128+
b.wg.Wait(ctx)
129+
130+
// loop through all elements of valuePtr
131+
var errs error
132+
for i := range b.futures {
133+
e := b.futures[i].Get(ctx, slice.Index(i).Addr().Interface())
134+
errs = multierr.Append(errs, e)
135+
}
136+
137+
return errs
138+
}

0 commit comments

Comments
 (0)