Skip to content

Commit cfc5174

Browse files
committed
added preemptive return to the aggregators
1 parent 7d3ffbf commit cfc5174

File tree

1 file changed

+119
-32
lines changed

1 file changed

+119
-32
lines changed

internal/routing/aggregator.go

Lines changed: 119 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,25 @@ func (a *AllSucceededAggregator) BatchAdd(results map[string]AggregatorResErr) e
108108
if err != nil {
109109
return err
110110
}
111+
112+
if res.Err != nil {
113+
return nil
114+
}
111115
}
112116

113117
return nil
114118
}
115119

116-
func (a *AllSucceededAggregator) BatchSlice(values []AggregatorResErr) error {
117-
for _, val := range values {
118-
err := a.Add(val.Result, val.Err)
120+
func (a *AllSucceededAggregator) BatchSlice(result []AggregatorResErr) error {
121+
for _, res := range result {
122+
err := a.Add(res.Result, res.Err)
119123
if err != nil {
120124
return err
121125
}
126+
127+
if res.Err != nil {
128+
return nil
129+
}
122130
}
123131

124132
return nil
@@ -164,6 +172,10 @@ func (a *OneSucceededAggregator) BatchAdd(results map[string]AggregatorResErr) e
164172
if err != nil {
165173
return err
166174
}
175+
176+
if res.Err == nil {
177+
return nil
178+
}
167179
}
168180

169181
return nil
@@ -173,12 +185,16 @@ func (a *OneSucceededAggregator) AddWithKey(key string, result interface{}, err
173185
return a.Add(result, err)
174186
}
175187

176-
func (a *OneSucceededAggregator) BatchSlice(values []AggregatorResErr) error {
177-
for _, val := range values {
178-
err := a.Add(val.Result, val.Err)
188+
func (a *OneSucceededAggregator) BatchSlice(result []AggregatorResErr) error {
189+
for _, res := range result {
190+
err := a.Add(res.Result, res.Err)
179191
if err != nil {
180192
return err
181193
}
194+
195+
if res.Err == nil {
196+
return nil
197+
}
182198
}
183199

184200
return nil
@@ -221,6 +237,10 @@ func (a *AggSumAggregator) BatchAdd(results map[string]AggregatorResErr) error {
221237
if err != nil {
222238
return err
223239
}
240+
241+
if res.Err != nil {
242+
return nil
243+
}
224244
}
225245

226246
return nil
@@ -230,12 +250,16 @@ func (a *AggSumAggregator) AddWithKey(key string, result interface{}, err error)
230250
return a.Add(result, err)
231251
}
232252

233-
func (a *AggSumAggregator) BatchSlice(values []AggregatorResErr) error {
234-
for _, val := range values {
235-
err := a.Add(val.Result, val.Err)
253+
func (a *AggSumAggregator) BatchSlice(result []AggregatorResErr) error {
254+
for _, res := range result {
255+
err := a.Add(res.Result, res.Err)
236256
if err != nil {
237257
return err
238258
}
259+
260+
if res.Err != nil {
261+
return nil
262+
}
239263
}
240264

241265
return nil
@@ -279,6 +303,10 @@ func (a *AggMinAggregator) BatchAdd(results map[string]AggregatorResErr) error {
279303
if err != nil {
280304
return err
281305
}
306+
307+
if res.Err != nil {
308+
return nil
309+
}
282310
}
283311

284312
return nil
@@ -288,12 +316,16 @@ func (a *AggMinAggregator) AddWithKey(key string, result interface{}, err error)
288316
return a.Add(result, err)
289317
}
290318

291-
func (a *AggMinAggregator) BatchSlice(values []AggregatorResErr) error {
292-
for _, val := range values {
293-
err := a.Add(val.Result, val.Err)
319+
func (a *AggMinAggregator) BatchSlice(result []AggregatorResErr) error {
320+
for _, res := range result {
321+
err := a.Add(res.Result, res.Err)
294322
if err != nil {
295323
return err
296324
}
325+
326+
if res.Err != nil {
327+
return nil
328+
}
297329
}
298330

299331
return nil
@@ -341,6 +373,10 @@ func (a *AggMaxAggregator) BatchAdd(results map[string]AggregatorResErr) error {
341373
if err != nil {
342374
return err
343375
}
376+
377+
if res.Err != nil {
378+
return nil
379+
}
344380
}
345381

346382
return nil
@@ -350,12 +386,16 @@ func (a *AggMaxAggregator) AddWithKey(key string, result interface{}, err error)
350386
return a.Add(result, err)
351387
}
352388

353-
func (a *AggMaxAggregator) BatchSlice(values []AggregatorResErr) error {
354-
for _, val := range values {
355-
err := a.Add(val.Result, val.Err)
389+
func (a *AggMaxAggregator) BatchSlice(result []AggregatorResErr) error {
390+
for _, res := range result {
391+
err := a.Add(res.Result, res.Err)
356392
if err != nil {
357393
return err
358394
}
395+
396+
if res.Err != nil {
397+
return nil
398+
}
359399
}
360400

361401
return nil
@@ -410,6 +450,10 @@ func (a *AggLogicalAndAggregator) BatchAdd(results map[string]AggregatorResErr)
410450
if err != nil {
411451
return err
412452
}
453+
454+
if res.Err != nil {
455+
return nil
456+
}
413457
}
414458

415459
return nil
@@ -419,12 +463,16 @@ func (a *AggLogicalAndAggregator) AddWithKey(key string, result interface{}, err
419463
return a.Add(result, err)
420464
}
421465

422-
func (a *AggLogicalAndAggregator) BatchSlice(values []AggregatorResErr) error {
423-
for _, val := range values {
424-
err := a.Add(val.Result, val.Err)
466+
func (a *AggLogicalAndAggregator) BatchSlice(result []AggregatorResErr) error {
467+
for _, res := range result {
468+
err := a.Add(res.Result, res.Err)
425469
if err != nil {
426470
return err
427471
}
472+
473+
if res.Err != nil {
474+
return nil
475+
}
428476
}
429477

430478
return nil
@@ -478,6 +526,10 @@ func (a *AggLogicalOrAggregator) BatchAdd(results map[string]AggregatorResErr) e
478526
if err != nil {
479527
return err
480528
}
529+
530+
if res.Err != nil {
531+
return nil
532+
}
481533
}
482534

483535
return nil
@@ -487,12 +539,16 @@ func (a *AggLogicalOrAggregator) AddWithKey(key string, result interface{}, err
487539
return a.Add(result, err)
488540
}
489541

490-
func (a *AggLogicalOrAggregator) BatchSlice(values []AggregatorResErr) error {
491-
for _, val := range values {
492-
err := a.Add(val.Result, val.Err)
542+
func (a *AggLogicalOrAggregator) BatchSlice(result []AggregatorResErr) error {
543+
for _, res := range result {
544+
err := a.Add(res.Result, res.Err)
493545
if err != nil {
494546
return err
495547
}
548+
549+
if res.Err != nil {
550+
return nil
551+
}
496552
}
497553

498554
return nil
@@ -581,6 +637,10 @@ func (a *DefaultKeylessAggregator) BatchAdd(results map[string]AggregatorResErr)
581637
if err != nil {
582638
return err
583639
}
640+
641+
if res.Err != nil {
642+
return nil
643+
}
584644
}
585645

586646
return nil
@@ -590,15 +650,19 @@ func (a *DefaultKeylessAggregator) AddWithKey(key string, result interface{}, er
590650
return a.Add(result, err)
591651
}
592652

593-
func (a *DefaultKeylessAggregator) BatchSlice(values []AggregatorResErr) error {
653+
func (a *DefaultKeylessAggregator) BatchSlice(result []AggregatorResErr) error {
594654
a.mu.Lock()
595655
defer a.mu.Unlock()
596656

597-
for _, val := range values {
598-
err := a.add(val.Result, val.Err)
657+
for _, res := range result {
658+
err := a.add(res.Result, res.Err)
599659
if err != nil {
600660
return err
601661
}
662+
663+
if res.Err != nil {
664+
return nil
665+
}
602666
}
603667

604668
return nil
@@ -657,6 +721,10 @@ func (a *DefaultKeyedAggregator) BatchAdd(results map[string]AggregatorResErr) e
657721
if err != nil {
658722
return err
659723
}
724+
725+
if res.Err != nil {
726+
return nil
727+
}
660728
}
661729

662730
return nil
@@ -685,8 +753,15 @@ func (a *DefaultKeyedAggregator) BatchAddWithKeyOrder(results map[string]Aggrega
685753
defer a.mu.Unlock()
686754

687755
a.keyOrder = keyOrder
688-
for key, val := range results {
689-
_ = a.addWithKey(key, val.Result, val.Err)
756+
for key, res := range results {
757+
err := a.addWithKey(key, res.Result, res.Err)
758+
if err != nil {
759+
return nil
760+
}
761+
762+
if res.Err != nil {
763+
return nil
764+
}
690765
}
691766

692767
return nil
@@ -698,15 +773,19 @@ func (a *DefaultKeyedAggregator) SetKeyOrder(keyOrder []string) {
698773
a.keyOrder = keyOrder
699774
}
700775

701-
func (a *DefaultKeyedAggregator) BatchSlice(values []AggregatorResErr) error {
776+
func (a *DefaultKeyedAggregator) BatchSlice(result []AggregatorResErr) error {
702777
a.mu.Lock()
703778
defer a.mu.Unlock()
704779

705-
for _, val := range values {
706-
err := a.add(val.Result, val.Err)
780+
for _, res := range result {
781+
err := a.add(res.Result, res.Err)
707782
if err != nil {
708783
return err
709784
}
785+
786+
if res.Err != nil {
787+
return nil
788+
}
710789
}
711790

712791
return nil
@@ -769,6 +848,10 @@ func (a *SpecialAggregator) BatchAdd(results map[string]AggregatorResErr) error
769848
if err != nil {
770849
return err
771850
}
851+
852+
if res.Err != nil {
853+
return nil
854+
}
772855
}
773856

774857
return nil
@@ -778,15 +861,19 @@ func (a *SpecialAggregator) AddWithKey(key string, result interface{}, err error
778861
return a.Add(result, err)
779862
}
780863

781-
func (a *SpecialAggregator) BatchSlice(values []AggregatorResErr) error {
864+
func (a *SpecialAggregator) BatchSlice(result []AggregatorResErr) error {
782865
a.mu.Lock()
783866
defer a.mu.Unlock()
784867

785-
for _, val := range values {
786-
err := a.add(val.Result, val.Err)
868+
for _, res := range result {
869+
err := a.add(res.Result, res.Err)
787870
if err != nil {
788871
return err
789872
}
873+
874+
if res.Err != nil {
875+
return nil
876+
}
790877
}
791878

792879
return nil

0 commit comments

Comments
 (0)