@@ -8,71 +8,160 @@ import (
88 "go.uber.org/cadence/internal"
99)
1010
11- type Executor interface {
12- AddFuture (factory func (ctx internal.Context ) internal.Future , valuePtr interface {}) error
13- Execute (ctx internal.Context ) error
11+ // type Executor interface {
12+ // AddFuture(factory func(ctx internal.Context) internal.Future, valuePtr interface{}) error
13+ // Execute(ctx internal.Context) error
14+ // }
15+
16+ // type batchExecutor struct {
17+ // isRunning bool
18+ // factories []func(ctx internal.Context) internal.Future
19+ // valuePtrs []interface{}
20+
21+ // batchSize int
22+ // }
23+
24+ // type futureWithResult struct {
25+ // future internal.Future
26+ // valuePtr interface{}
27+ // }
28+
29+ // func (e *batchExecutor) AddFuture(factory func(ctx internal.Context) internal.Future, valuePtr interface{}) error {
30+ // if e.isRunning {
31+ // return errors.New("executor is already running")
32+ // }
33+ // e.factories = append(e.factories, factory)
34+ // e.valuePtrs = append(e.valuePtrs, valuePtr)
35+ // return nil
36+ // }
37+
38+ // func (e *batchExecutor) Execute(ctx internal.Context) error {
39+ // if e.isRunning {
40+ // return errors.New("executor is already running")
41+ // }
42+ // e.isRunning = true
43+
44+ // futuresToProcess := internal.NewNamedChannel(ctx, "batch-executor-features-to-process")
45+ // wg := internal.NewWaitGroup(ctx)
46+
47+ // buffered := internal.NewBufferedChannel(ctx, e.batchSize)
48+ // var errs error
49+ // wg.Add(1)
50+
51+ // // processor of features asynchronously
52+ // internal.GoNamed(ctx, "batch-executor-processor-loop", func(ctx internal.Context) {
53+ // defer wg.Done()
54+ // for {
55+ // var futureWithResult futureWithResult
56+ // ok := futuresToProcess.Receive(ctx, &futureWithResult)
57+ // if !ok {
58+ // break
59+ // }
60+ // wg.Add(1)
61+ // internal.GoNamed(ctx, "batch-executor-feature-processor", func(ctx internal.Context) {
62+ // defer wg.Done()
63+ // err := futureWithResult.future.Get(ctx, futureWithResult.valuePtr)
64+ // errs = multierr.Append(errs, err)
65+ // buffered.Receive(ctx, nil)
66+ // })
67+ // }
68+ // })
69+
70+ // // submit all futures within concurrency limit, wait to schedule until it's ready
71+ // for i := range e.factories {
72+ // buffered.Send(ctx, nil)
73+ // futuresToProcess.Send(ctx, futureWithResult{
74+ // future: e.factories[i](ctx),
75+ // valuePtr: e.valuePtrs[i],
76+ // })
77+ // }
78+
79+ // // close the channel to signal the task result collector that no more tasks are coming
80+ // futuresToProcess.Close()
81+
82+ // wg.Wait(ctx)
83+
84+ // return errs
85+ // }
86+
87+ // func NewBatchExecutor(ctx internal.Context, batchSize int) Executor {
88+ // return &batchExecutor{
89+ // batchSize: batchSize,
90+ // }
91+ // }
92+
93+ type futureWithIndex struct {
94+ future internal.Future
95+ index int
1496}
1597
16- type batchExecutor struct {
17- isRunning bool
18- factories [] func (ctx internal.Context ) internal. Future
19- valuePtrs [] interface { }
98+ type BatchFuture interface {
99+ GetFutures () []internal. Future
100+ Get (ctx internal.Context , valuePtr ... interface {}) error
101+ }
20102
21- batchSize int
103+ type batchFutureImpl struct {
104+ futures []internal.Future
105+ settables []internal.Settable
106+ factories []func (ctx internal.Context ) internal.Future
22107}
23108
24- type futureWithResult struct {
25- future internal.Future
26- valuePtr interface {}
109+ func (b * batchFutureImpl ) GetFutures () []internal.Future {
110+ return b .futures
27111}
28112
29- func (e * batchExecutor ) AddFuture (factory func (ctx internal.Context ) internal.Future , valuePtr interface {}) error {
30- if e .isRunning {
31- return errors .New ("executor is already running" )
32- }
33- e .factories = append (e .factories , factory )
34- e .valuePtrs = append (e .valuePtrs , valuePtr )
35- return nil
113+
114+ func NewBatchFuture (ctx internal.Context , batchSize int , factories []func (ctx internal.Context ) internal.Future ) (BatchFuture , error ) {
115+
36116}
37117
38- func (e * batchExecutor ) Execute (ctx internal.Context ) error {
39- if e .isRunning {
40- return errors .New ("executor is already running" )
41- }
42- e .isRunning = true
43118
44- futuresToProcess := internal .NewNamedChannel (ctx , "futures" )
119+ func ()(ctx internal.Context , batchSize int , factories []func (ctx internal.Context ) internal.Future ) (BatchFuture , error ) {
120+
121+ futuresToReturn := make ([]internal.Future , len (factories ))
122+ settablesToReturn := make ([]internal.Settable , len (factories ))
123+
124+ futuresToProcess := internal .NewNamedChannel (ctx , "batch-executor-features-to-process" )
45125 wg := internal .NewWaitGroup (ctx )
46126
47- buffered := internal .NewBufferedChannel (ctx , e . batchSize )
127+ buffered := internal .NewBufferedChannel (ctx , batchSize )
48128 var errs error
49129 wg .Add (1 )
50130
51- // future processor
52- internal .Go (ctx , func (ctx internal.Context ) {
131+ // processor of features asynchronously
132+ internal .GoNamed (ctx , "batch-executor-processor-loop" , func (ctx internal.Context ) {
53133 defer wg .Done ()
54134 for {
55- var futureWithResult futureWithResult
56- ok := futuresToProcess .Receive (ctx , & futureWithResult )
135+ var f futureWithIndex
136+ ok := futuresToProcess .Receive (ctx , & f )
57137 if ! ok {
58138 break
59139 }
60140 wg .Add (1 )
61- internal .Go (ctx , func (ctx internal.Context ) {
141+ internal .GoNamed (ctx , "batch-executor-feature-processor" , func (ctx internal.Context ) {
62142 defer wg .Done ()
63- err := futureWithResult .future .Get (ctx , futureWithResult .valuePtr )
143+
144+ // fork a future and chain it to the processed future for user to get the result
145+ future , settable := internal .NewFuture (ctx )
146+ settable .Chain (f .future )
147+
148+ // complete the future to process and chain it to the future to return
149+ // this way user can decide when to get the result
150+ err := f .future .Get (ctx , nil )
64151 errs = multierr .Append (errs , err )
152+ futuresToReturn [f .index ] = future
153+ settablesToReturn [f .index ] = settable
65154 buffered .Receive (ctx , nil )
66155 })
67156 }
68157 })
69158
70- // submit all futures
71- for i := range e . factories {
159+ // submit all futures within concurrency limit, wait to schedule until it's ready
160+ for i := range factories {
72161 buffered .Send (ctx , nil )
73- futuresToProcess .Send (ctx , futureWithResult {
74- future : e. factories [i ](ctx ),
75- valuePtr : e . valuePtrs [ i ] ,
162+ futuresToProcess .Send (ctx , futureWithIndex {
163+ future : factories [i ](ctx ),
164+ index : i ,
76165 })
77166 }
78167
@@ -83,9 +172,3 @@ func (e *batchExecutor) Execute(ctx internal.Context) error {
83172
84173 return errs
85174}
86-
87- func NewBatchExecutor (ctx internal.Context , batchSize int ) Executor {
88- return & batchExecutor {
89- batchSize : batchSize ,
90- }
91- }
0 commit comments