Skip to content

Commit 596d06a

Browse files
authored
Merge branch 'master' into bufferone-idl
2 parents 61ad3e2 + 291e3dd commit 596d06a

22 files changed

+1429
-883
lines changed

internal/batch/batch_future.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package batch
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
7+
"go.uber.org/multierr"
8+
9+
"go.uber.org/cadence/internal"
10+
)
11+
12+
// BatchFuture is an implementation of public BatchFuture interface.
13+
type BatchFuture struct {
14+
futures []internal.Future
15+
settables []internal.Settable
16+
factories []func(ctx internal.Context) internal.Future
17+
batchSize int
18+
19+
// state
20+
wg internal.WaitGroup
21+
}
22+
23+
func NewBatchFuture(ctx internal.Context, batchSize int, factories []func(ctx internal.Context) internal.Future) (*BatchFuture, error) {
24+
var futures []internal.Future
25+
var settables []internal.Settable
26+
for range factories {
27+
future, settable := internal.NewFuture(ctx)
28+
futures = append(futures, future)
29+
settables = append(settables, settable)
30+
}
31+
32+
batchFuture := &BatchFuture{
33+
futures: futures,
34+
settables: settables,
35+
factories: factories,
36+
batchSize: batchSize,
37+
38+
wg: internal.NewWaitGroup(ctx),
39+
}
40+
batchFuture.start(ctx)
41+
return batchFuture, nil
42+
}
43+
44+
func (b *BatchFuture) GetFutures() []internal.Future {
45+
return b.futures
46+
}
47+
48+
func (b *BatchFuture) start(ctx internal.Context) {
49+
50+
semaphore := internal.NewBufferedChannel(ctx, b.batchSize) // buffered workChan to limit the number of concurrent futures
51+
workChan := internal.NewNamedChannel(ctx, "batch-future-channel")
52+
b.wg.Add(1)
53+
internal.GoNamed(ctx, "batch-future-submitter", func(ctx internal.Context) {
54+
defer b.wg.Done()
55+
56+
for i := range b.factories {
57+
semaphore.Send(ctx, nil)
58+
workChan.Send(ctx, i)
59+
}
60+
workChan.Close()
61+
})
62+
63+
b.wg.Add(1)
64+
internal.GoNamed(ctx, "batch-future-processor", func(ctx internal.Context) {
65+
defer b.wg.Done()
66+
67+
wgForFutures := internal.NewWaitGroup(ctx)
68+
69+
var idx int
70+
for workChan.Receive(ctx, &idx) {
71+
idx := idx
72+
73+
wgForFutures.Add(1)
74+
internal.GoNamed(ctx, fmt.Sprintf("batch-future-processor-one-future-%d", idx), func(ctx internal.Context) {
75+
defer wgForFutures.Done()
76+
77+
// fork a future and chain it to the processed future for user to get the result
78+
f := b.factories[idx](ctx)
79+
b.settables[idx].Chain(f)
80+
81+
// error handling is not needed here because the result is chained to the settable
82+
f.Get(ctx, nil)
83+
semaphore.Receive(ctx, nil)
84+
})
85+
}
86+
wgForFutures.Wait(ctx)
87+
})
88+
}
89+
90+
func (b *BatchFuture) IsReady() bool {
91+
for _, future := range b.futures {
92+
if !future.IsReady() {
93+
return false
94+
}
95+
}
96+
return true
97+
}
98+
99+
// Get assigns the result of the futures to the valuePtr.
100+
// NOTE: valuePtr must be a pointer to a slice, or nil.
101+
// If valuePtr is a pointer to a slice, the slice will be resized to the length of the futures. Each element of the slice will be assigned with the underlying Future.Get() and thus behaves the same way.
102+
// If valuePtr is nil, no assignment will be made.
103+
// If error occurs, values will be set on successful futures and the errors of failed futures will be returned.
104+
func (b *BatchFuture) Get(ctx internal.Context, valuePtr interface{}) error {
105+
// No assignment if valuePtr is nil
106+
if valuePtr == nil {
107+
b.wg.Wait(ctx)
108+
var errs error
109+
for i := range b.futures {
110+
errs = multierr.Append(errs, b.futures[i].Get(ctx, nil))
111+
}
112+
return errs
113+
}
114+
115+
v := reflect.ValueOf(valuePtr)
116+
if v.Kind() != reflect.Ptr || v.Elem().Kind() != reflect.Slice {
117+
return fmt.Errorf("valuePtr must be a pointer to a slice, got %v", v.Kind())
118+
}
119+
120+
// resize the slice to the length of the futures
121+
slice := v.Elem()
122+
if slice.Cap() < len(b.futures) {
123+
slice.Grow(len(b.futures) - slice.Cap())
124+
}
125+
slice.SetLen(len(b.futures))
126+
127+
// wait for all futures to be ready
128+
b.wg.Wait(ctx)
129+
130+
// loop through all elements of valuePtr
131+
var errs error
132+
for i := range b.futures {
133+
e := b.futures[i].Get(ctx, slice.Index(i).Addr().Interface())
134+
errs = multierr.Append(errs, e)
135+
}
136+
137+
return errs
138+
}

0 commit comments

Comments
 (0)