Skip to content

Commit a909347

Browse files
committed
added batching to aggregateResponses
1 parent f621126 commit a909347

File tree

2 files changed

+92
-26
lines changed

2 files changed

+92
-26
lines changed

internal/routing/aggregator.go

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ type ResponseAggregator interface {
3434
}
3535

3636
type AggregatorResErr struct {
37-
result interface{}
38-
err error
37+
Result interface{}
38+
Err error
3939
}
4040

4141
// NewResponseAggregator creates an aggregator based on the response policy.
@@ -104,15 +104,21 @@ func (a *AllSucceededAggregator) Add(result interface{}, err error) error {
104104

105105
func (a *AllSucceededAggregator) BatchAdd(results map[string]interface{}) error {
106106
for _, res := range results {
107-
_ = a.Add(res, nil)
107+
err := a.Add(res, nil)
108+
if err != nil {
109+
return err
110+
}
108111
}
109112

110113
return nil
111114
}
112115

113116
func (a *AllSucceededAggregator) BatchWithErrs(values []AggregatorResErr) error {
114117
for _, val := range values {
115-
_ = a.Add(val.result, val.err)
118+
err := a.Add(val.Result, val.Err)
119+
if err != nil {
120+
return err
121+
}
116122
}
117123

118124
return nil
@@ -154,7 +160,10 @@ func (a *OneSucceededAggregator) Add(result interface{}, err error) error {
154160

155161
func (a *OneSucceededAggregator) BatchAdd(results map[string]interface{}) error {
156162
for _, res := range results {
157-
_ = a.Add(res, nil)
163+
err := a.Add(res, nil)
164+
if err != nil {
165+
return err
166+
}
158167
}
159168

160169
return nil
@@ -166,7 +175,10 @@ func (a *OneSucceededAggregator) AddWithKey(key string, result interface{}, err
166175

167176
func (a *OneSucceededAggregator) BatchWithErrs(values []AggregatorResErr) error {
168177
for _, val := range values {
169-
_ = a.Add(val.result, val.err)
178+
err := a.Add(val.Result, val.Err)
179+
if err != nil {
180+
return err
181+
}
170182
}
171183

172184
return nil
@@ -205,7 +217,10 @@ func (a *AggSumAggregator) Add(result interface{}, err error) error {
205217

206218
func (a *AggSumAggregator) BatchAdd(results map[string]interface{}) error {
207219
for _, res := range results {
208-
_ = a.Add(res, nil)
220+
err := a.Add(res, nil)
221+
if err != nil {
222+
return err
223+
}
209224
}
210225

211226
return nil
@@ -217,7 +232,10 @@ func (a *AggSumAggregator) AddWithKey(key string, result interface{}, err error)
217232

218233
func (a *AggSumAggregator) BatchWithErrs(values []AggregatorResErr) error {
219234
for _, val := range values {
220-
_ = a.Add(val.result, val.err)
235+
err := a.Add(val.Result, val.Err)
236+
if err != nil {
237+
return err
238+
}
221239
}
222240

223241
return nil
@@ -257,7 +275,10 @@ func (a *AggMinAggregator) Add(result interface{}, err error) error {
257275

258276
func (a *AggMinAggregator) BatchAdd(results map[string]interface{}) error {
259277
for _, res := range results {
260-
_ = a.Add(res, nil)
278+
err := a.Add(res, nil)
279+
if err != nil {
280+
return err
281+
}
261282
}
262283

263284
return nil
@@ -269,7 +290,10 @@ func (a *AggMinAggregator) AddWithKey(key string, result interface{}, err error)
269290

270291
func (a *AggMinAggregator) BatchWithErrs(values []AggregatorResErr) error {
271292
for _, val := range values {
272-
_ = a.Add(val.result, val.err)
293+
err := a.Add(val.Result, val.Err)
294+
if err != nil {
295+
return err
296+
}
273297
}
274298

275299
return nil
@@ -313,7 +337,10 @@ func (a *AggMaxAggregator) Add(result interface{}, err error) error {
313337

314338
func (a *AggMaxAggregator) BatchAdd(results map[string]interface{}) error {
315339
for _, res := range results {
316-
_ = a.Add(res, nil)
340+
err := a.Add(res, nil)
341+
if err != nil {
342+
return err
343+
}
317344
}
318345

319346
return nil
@@ -325,7 +352,10 @@ func (a *AggMaxAggregator) AddWithKey(key string, result interface{}, err error)
325352

326353
func (a *AggMaxAggregator) BatchWithErrs(values []AggregatorResErr) error {
327354
for _, val := range values {
328-
_ = a.Add(val.result, val.err)
355+
err := a.Add(val.Result, val.Err)
356+
if err != nil {
357+
return err
358+
}
329359
}
330360

331361
return nil
@@ -376,7 +406,10 @@ func (a *AggLogicalAndAggregator) Add(result interface{}, err error) error {
376406

377407
func (a *AggLogicalAndAggregator) BatchAdd(results map[string]interface{}) error {
378408
for _, res := range results {
379-
_ = a.Add(res, nil)
409+
err := a.Add(res, nil)
410+
if err != nil {
411+
return err
412+
}
380413
}
381414

382415
return nil
@@ -388,7 +421,10 @@ func (a *AggLogicalAndAggregator) AddWithKey(key string, result interface{}, err
388421

389422
func (a *AggLogicalAndAggregator) BatchWithErrs(values []AggregatorResErr) error {
390423
for _, val := range values {
391-
_ = a.Add(val.result, val.err)
424+
err := a.Add(val.Result, val.Err)
425+
if err != nil {
426+
return err
427+
}
392428
}
393429

394430
return nil
@@ -438,7 +474,10 @@ func (a *AggLogicalOrAggregator) Add(result interface{}, err error) error {
438474

439475
func (a *AggLogicalOrAggregator) BatchAdd(results map[string]interface{}) error {
440476
for _, res := range results {
441-
_ = a.Add(res, nil)
477+
err := a.Add(res, nil)
478+
if err != nil {
479+
return err
480+
}
442481
}
443482

444483
return nil
@@ -450,7 +489,10 @@ func (a *AggLogicalOrAggregator) AddWithKey(key string, result interface{}, err
450489

451490
func (a *AggLogicalOrAggregator) BatchWithErrs(values []AggregatorResErr) error {
452491
for _, val := range values {
453-
_ = a.Add(val.result, val.err)
492+
err := a.Add(val.Result, val.Err)
493+
if err != nil {
494+
return err
495+
}
454496
}
455497

456498
return nil
@@ -532,7 +574,10 @@ func (a *DefaultKeylessAggregator) Add(result interface{}, err error) error {
532574

533575
func (a *DefaultKeylessAggregator) BatchAdd(results map[string]interface{}) error {
534576
for _, res := range results {
535-
_ = a.add(res, nil)
577+
err := a.Add(res, nil)
578+
if err != nil {
579+
return err
580+
}
536581
}
537582

538583
return nil
@@ -547,7 +592,10 @@ func (a *DefaultKeylessAggregator) BatchWithErrs(values []AggregatorResErr) erro
547592
defer a.mu.Unlock()
548593

549594
for _, val := range values {
550-
_ = a.add(val.result, val.err)
595+
err := a.Add(val.Result, val.Err)
596+
if err != nil {
597+
return err
598+
}
551599
}
552600

553601
return nil
@@ -602,7 +650,10 @@ func (a *DefaultKeyedAggregator) BatchAdd(results map[string]interface{}) error
602650
defer a.mu.Unlock()
603651

604652
for _, res := range results {
605-
_ = a.add(res, nil)
653+
err := a.Add(res, nil)
654+
if err != nil {
655+
return err
656+
}
606657
}
607658

608659
return nil
@@ -649,7 +700,10 @@ func (a *DefaultKeyedAggregator) BatchWithErrs(values []AggregatorResErr) error
649700
defer a.mu.Unlock()
650701

651702
for _, val := range values {
652-
_ = a.add(val.result, val.err)
703+
err := a.Add(val.Result, val.Err)
704+
if err != nil {
705+
return err
706+
}
653707
}
654708

655709
return nil
@@ -708,7 +762,10 @@ func (a *SpecialAggregator) BatchAdd(results map[string]interface{}) error {
708762
defer a.mu.Unlock()
709763

710764
for _, res := range results {
711-
_ = a.add(res, nil)
765+
err := a.Add(res, nil)
766+
if err != nil {
767+
return err
768+
}
712769
}
713770

714771
return nil
@@ -723,7 +780,10 @@ func (a *SpecialAggregator) BatchWithErrs(values []AggregatorResErr) error {
723780
defer a.mu.Unlock()
724781

725782
for _, val := range values {
726-
_ = a.add(val.result, val.err)
783+
err := a.Add(val.Result, val.Err)
784+
if err != nil {
785+
return err
786+
}
727787
}
728788

729789
return nil

osscluster_router.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -427,13 +427,19 @@ func (c *ClusterClient) aggregateResponses(cmd Cmder, cmds []Cmder, policy *rout
427427

428428
aggregator := c.createAggregator(policy, cmd, false)
429429

430+
batchWithErrs := []routing.AggregatorResErr{}
430431
// Add all results to aggregator
431432
for _, shardCmd := range cmds {
432433
value := ExtractCommandValue(shardCmd)
433-
//TODO: Rewrite as batch
434-
if err := aggregator.Add(value, shardCmd.Err()); err != nil {
435-
return err
436-
}
434+
batchWithErrs = append(batchWithErrs, routing.AggregatorResErr{
435+
Result: value,
436+
Err: shardCmd.Err(),
437+
})
438+
}
439+
440+
err := aggregator.BatchWithErrs(batchWithErrs)
441+
if err != nil {
442+
return err
437443
}
438444

439445
return c.finishAggregation(cmd, aggregator)

0 commit comments

Comments
 (0)