Skip to content

Commit a3248e4

Browse files
committed
feat(websearch): concurrent multi-backend dispatch with errgroup
Search and news engines now run concurrently using errgroup instead of sequentially. Results are sorted by engine priority after collection to maintain deterministic ranking order.
1 parent f8e89b4 commit a3248e4

File tree

2 files changed

+104
-31
lines changed

2 files changed

+104
-31
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
go.mau.fi/whatsmeow v0.0.0-20260227112304-c9652e4448a2
2222
golang.org/x/net v0.50.0
2323
golang.org/x/oauth2 v0.35.0
24+
golang.org/x/sync v0.19.0
2425
golang.org/x/time v0.14.0
2526
google.golang.org/api v0.269.0
2627
google.golang.org/protobuf v1.36.11
@@ -141,7 +142,6 @@ require (
141142
go.uber.org/goleak v1.3.0 // indirect
142143
golang.org/x/crypto v0.48.0 // indirect
143144
golang.org/x/exp v0.0.0-20260212183809-81e46e3db34a // indirect
144-
golang.org/x/sync v0.19.0 // indirect
145145
golang.org/x/sys v0.41.0 // indirect
146146
golang.org/x/text v0.34.0 // indirect
147147
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect

source/websearch/search.go

Lines changed: 103 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ import (
77
"log/slog"
88
"sort"
99
"strings"
10+
"sync"
1011
"time"
12+
13+
"golang.org/x/sync/errgroup"
1114
)
1215

1316
const (
@@ -70,29 +73,55 @@ func (w *WebSearch) searchWithEngines(ctx context.Context, query string, opts Se
7073
})
7174

7275
start := time.Now()
73-
var allResults []Result
74-
var backends []string
75-
var lastErr error
7676

77+
type engineResult struct {
78+
name string
79+
results []Result
80+
}
81+
82+
var (
83+
mu sync.Mutex
84+
collected []engineResult
85+
lastErr error
86+
)
87+
88+
g, gctx := errgroup.WithContext(ctx)
7789
for _, eng := range engines {
7890
if !w.health.IsHealthy(eng.Name()) {
7991
slog.Info("skipping unhealthy backend", "engine", eng.Name())
8092
continue
8193
}
82-
results, err := eng.Search(ctx, query, opts)
83-
if err != nil {
84-
lastErr = err
85-
w.health.RecordFailure(eng.Name())
86-
slog.Warn("search engine failed", "engine", eng.Name(), "error", err)
87-
continue
88-
}
89-
w.health.RecordSuccess(eng.Name())
90-
allResults = append(allResults, results...)
91-
backends = append(backends, eng.Name())
94+
g.Go(func() error {
95+
results, err := eng.Search(gctx, query, opts)
96+
mu.Lock()
97+
defer mu.Unlock()
98+
if err != nil {
99+
lastErr = err
100+
w.health.RecordFailure(eng.Name())
101+
slog.Warn("search engine failed", "engine", eng.Name(), "error", err)
102+
return nil // don't cancel other goroutines
103+
}
104+
w.health.RecordSuccess(eng.Name())
105+
collected = append(collected, engineResult{name: eng.Name(), results: results})
106+
return nil
107+
})
108+
}
109+
g.Wait()
110+
111+
if len(collected) == 0 && lastErr != nil {
112+
return nil, fmt.Errorf("all backends failed: %w", lastErr)
92113
}
93114

94-
if len(allResults) == 0 && lastErr != nil {
95-
return nil, fmt.Errorf("all backends failed: %w", lastErr)
115+
// Maintain priority order for deterministic ranking.
116+
sort.Slice(collected, func(i, j int) bool {
117+
return priorityOf(collected[i].name, engines) > priorityOf(collected[j].name, engines)
118+
})
119+
120+
var allResults []Result
121+
var backends []string
122+
for _, c := range collected {
123+
allResults = append(allResults, c.results...)
124+
backends = append(backends, c.name)
96125
}
97126

98127
allResults = rankResults(allResults, query)
@@ -215,29 +244,55 @@ func (w *WebSearch) newsWithEngines(ctx context.Context, query string, opts Sear
215244
})
216245

217246
start := time.Now()
218-
var allResults []Result
219-
var backends []string
220-
var lastErr error
221247

248+
type newsResult struct {
249+
name string
250+
results []Result
251+
}
252+
253+
var (
254+
mu sync.Mutex
255+
collected []newsResult
256+
lastErr error
257+
)
258+
259+
g, gctx := errgroup.WithContext(ctx)
222260
for _, eng := range engines {
223261
if !w.health.IsHealthy(eng.Name()) {
224262
slog.Info("skipping unhealthy news backend", "engine", eng.Name())
225263
continue
226264
}
227-
results, err := eng.News(ctx, query, opts)
228-
if err != nil {
229-
lastErr = err
230-
w.health.RecordFailure(eng.Name())
231-
slog.Warn("news engine failed", "engine", eng.Name(), "error", err)
232-
continue
233-
}
234-
w.health.RecordSuccess(eng.Name())
235-
allResults = append(allResults, results...)
236-
backends = append(backends, eng.Name())
265+
g.Go(func() error {
266+
results, err := eng.News(gctx, query, opts)
267+
mu.Lock()
268+
defer mu.Unlock()
269+
if err != nil {
270+
lastErr = err
271+
w.health.RecordFailure(eng.Name())
272+
slog.Warn("news engine failed", "engine", eng.Name(), "error", err)
273+
return nil
274+
}
275+
w.health.RecordSuccess(eng.Name())
276+
collected = append(collected, newsResult{name: eng.Name(), results: results})
277+
return nil
278+
})
279+
}
280+
g.Wait()
281+
282+
if len(collected) == 0 && lastErr != nil {
283+
return nil, fmt.Errorf("all news backends failed: %w", lastErr)
237284
}
238285

239-
if len(allResults) == 0 && lastErr != nil {
240-
return nil, fmt.Errorf("all news backends failed: %w", lastErr)
286+
// Maintain priority order for deterministic ranking.
287+
sort.Slice(collected, func(i, j int) bool {
288+
return newsEngPriority(collected[i].name, engines) > newsEngPriority(collected[j].name, engines)
289+
})
290+
291+
var allResults []Result
292+
var backends []string
293+
for _, c := range collected {
294+
allResults = append(allResults, c.results...)
295+
backends = append(backends, c.name)
241296
}
242297

243298
allResults = rankResults(allResults, query)
@@ -276,6 +331,24 @@ func buildNewsEngines(client HTTPDoer, backend string, configured []string) []Ne
276331
}
277332
}
278333

334+
func priorityOf(name string, engines []Engine) int {
335+
for _, e := range engines {
336+
if e.Name() == name {
337+
return e.Priority()
338+
}
339+
}
340+
return 0
341+
}
342+
343+
func newsEngPriority(name string, engines []NewsEngine) int {
344+
for _, e := range engines {
345+
if e.Name() == name {
346+
return e.Priority()
347+
}
348+
}
349+
return 0
350+
}
351+
279352
func filterNewsEngines(engines []NewsEngine, allowed []string) []NewsEngine {
280353
if len(allowed) == 0 {
281354
return engines

0 commit comments

Comments
 (0)