Skip to content

Commit 398f241

Browse files
authored
Fix: Handle context correctly (#3504)
selectTree was using the a errgroup context going foward. This caused some errors when store-gateways were used for queries, that required deduplication. This fix also updated all errGroup nested contexts to use gCtx, as an additional measure we could consider using a block to limit the use `gCtx` after `Wait()`.
1 parent b49a128 commit 398f241

File tree

4 files changed

+31
-31
lines changed

4 files changed

+31
-31
lines changed

pkg/querier/http.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,11 @@ func (q *QueryHandlers) Render(w http.ResponseWriter, req *http.Request) {
178178
}
179179

180180
var resFlame *connect.Response[querierv1.SelectMergeStacktracesResponse]
181-
g, ctx := errgroup.WithContext(req.Context())
181+
g, gCtx := errgroup.WithContext(req.Context())
182182
selectParamsClone := selectParams.CloneVT()
183183
g.Go(func() error {
184184
var err error
185-
resFlame, err = q.client.SelectMergeStacktraces(ctx, connect.NewRequest(selectParamsClone))
185+
resFlame, err = q.client.SelectMergeStacktraces(gCtx, connect.NewRequest(selectParamsClone))
186186
return err
187187
})
188188

pkg/querier/ingester_querier.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se
111111
return nil, connect.NewError(connect.CodeInternal, err)
112112
}
113113
// send the first initial request to all ingesters.
114-
g, gCtx := errgroup.WithContext(ctx)
114+
g, _ := errgroup.WithContext(ctx)
115115
for idx := range responses {
116116
r := responses[idx]
117117
blockHints, err := BlockHints(plan, r.addr)
@@ -137,7 +137,7 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se
137137
}
138138

139139
// merge all profiles
140-
return selectMergeTree(gCtx, responses)
140+
return selectMergeTree(ctx, responses)
141141
}
142142

143143
func (q *Querier) selectProfileFromIngesters(ctx context.Context, req *querierv1.SelectMergeProfileRequest, plan blockPlan) (*googlev1.Profile, error) {
@@ -168,7 +168,7 @@ func (q *Querier) selectProfileFromIngesters(ctx context.Context, req *querierv1
168168
return nil, connect.NewError(connect.CodeInternal, err)
169169
}
170170
// send the first initial request to all ingesters.
171-
g, gCtx := errgroup.WithContext(ctx)
171+
g, _ := errgroup.WithContext(ctx)
172172
for idx := range responses {
173173
r := responses[idx]
174174
blockHints, err := BlockHints(plan, r.addr)
@@ -196,7 +196,7 @@ func (q *Querier) selectProfileFromIngesters(ctx context.Context, req *querierv1
196196

197197
// merge all profiles
198198
span.LogFields(otlog.String("msg", "selectMergePprofProfile"))
199-
return selectMergePprofProfile(gCtx, profileType, responses)
199+
return selectMergePprofProfile(ctx, profileType, responses)
200200
}
201201

202202
func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest, plan map[string]*blockPlanEntry) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
@@ -321,7 +321,7 @@ func (q *Querier) selectSpanProfileFromIngesters(ctx context.Context, req *queri
321321
return nil, connect.NewError(connect.CodeInternal, err)
322322
}
323323
// send the first initial request to all ingesters.
324-
g, gCtx := errgroup.WithContext(ctx)
324+
g, _ := errgroup.WithContext(ctx)
325325
for idx := range responses {
326326
r := responses[idx]
327327
blockHints, err := BlockHints(plan, r.addr)
@@ -348,7 +348,7 @@ func (q *Querier) selectSpanProfileFromIngesters(ctx context.Context, req *queri
348348
}
349349

350350
// merge all profiles
351-
return selectMergeSpanProfile(gCtx, responses)
351+
return selectMergeSpanProfile(ctx, responses)
352352
}
353353

354354
func (q *Querier) blockSelectFromIngesters(ctx context.Context, req *ingestv1.BlockMetadataRequest) ([]ResponseFromReplica[[]*typesv1.BlockInfo], error) {

pkg/querier/querier.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,11 @@ func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[typesv1.
237237

238238
var responses []ResponseFromReplica[[]string]
239239
var lock sync.Mutex
240-
group, ctx := errgroup.WithContext(ctx)
240+
group, gCtx := errgroup.WithContext(ctx)
241241

242242
if storeQueries.ingester.shouldQuery {
243243
group.Go(func() error {
244-
ir, err := q.labelValuesFromIngesters(ctx, storeQueries.ingester.LabelValuesRequest(req.Msg))
244+
ir, err := q.labelValuesFromIngesters(gCtx, storeQueries.ingester.LabelValuesRequest(req.Msg))
245245
if err != nil {
246246
return err
247247
}
@@ -255,7 +255,7 @@ func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[typesv1.
255255

256256
if storeQueries.storeGateway.shouldQuery {
257257
group.Go(func() error {
258-
ir, err := q.labelValuesFromStoreGateway(ctx, storeQueries.storeGateway.LabelValuesRequest(req.Msg))
258+
ir, err := q.labelValuesFromStoreGateway(gCtx, storeQueries.storeGateway.LabelValuesRequest(req.Msg))
259259
if err != nil {
260260
return err
261261
}
@@ -307,11 +307,11 @@ func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.L
307307

308308
var responses []ResponseFromReplica[[]string]
309309
var lock sync.Mutex
310-
group, ctx := errgroup.WithContext(ctx)
310+
group, gCtx := errgroup.WithContext(ctx)
311311

312312
if storeQueries.ingester.shouldQuery {
313313
group.Go(func() error {
314-
ir, err := q.labelNamesFromIngesters(ctx, storeQueries.ingester.LabelNamesRequest(req.Msg))
314+
ir, err := q.labelNamesFromIngesters(gCtx, storeQueries.ingester.LabelNamesRequest(req.Msg))
315315
if err != nil {
316316
return err
317317
}
@@ -325,7 +325,7 @@ func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.L
325325

326326
if storeQueries.storeGateway.shouldQuery {
327327
group.Go(func() error {
328-
ir, err := q.labelNamesFromStoreGateway(ctx, storeQueries.storeGateway.LabelNamesRequest(req.Msg))
328+
ir, err := q.labelNamesFromStoreGateway(gCtx, storeQueries.storeGateway.LabelNamesRequest(req.Msg))
329329
if err != nil {
330330
return err
331331
}
@@ -427,11 +427,11 @@ func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.Ser
427427

428428
var responses []ResponseFromReplica[[]*typesv1.Labels]
429429
var lock sync.Mutex
430-
group, ctx := errgroup.WithContext(ctx)
430+
group, gCtx := errgroup.WithContext(ctx)
431431

432432
if storeQueries.ingester.shouldQuery {
433433
group.Go(func() error {
434-
ir, err := q.seriesFromIngesters(ctx, storeQueries.ingester.SeriesRequest(req.Msg))
434+
ir, err := q.seriesFromIngesters(gCtx, storeQueries.ingester.SeriesRequest(req.Msg))
435435
if err != nil {
436436
return err
437437
}
@@ -445,7 +445,7 @@ func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.Ser
445445

446446
if storeQueries.storeGateway.shouldQuery {
447447
group.Go(func() error {
448-
ir, err := q.seriesFromStoreGateway(ctx, storeQueries.storeGateway.SeriesRequest(req.Msg))
448+
ir, err := q.seriesFromStoreGateway(gCtx, storeQueries.storeGateway.SeriesRequest(req.Msg))
449449
if err != nil {
450450
return err
451451
}
@@ -690,11 +690,11 @@ func (q *Querier) selectTree(ctx context.Context, req *querierv1.SelectMergeStac
690690
return q.selectTreeFromIngesters(ctx, storeQueries.ingester.MergeStacktracesRequest(req), plan)
691691
}
692692

693-
g, ctx := errgroup.WithContext(ctx)
693+
g, gCtx := errgroup.WithContext(ctx)
694694
var ingesterTree, storegatewayTree *phlaremodel.Tree
695695
g.Go(func() error {
696696
var err error
697-
ingesterTree, err = q.selectTreeFromIngesters(ctx, storeQueries.ingester.MergeStacktracesRequest(req), plan)
697+
ingesterTree, err = q.selectTreeFromIngesters(gCtx, storeQueries.ingester.MergeStacktracesRequest(req), plan)
698698
if err != nil {
699699
return err
700700
}
@@ -893,11 +893,11 @@ func (q *Querier) selectProfile(ctx context.Context, req *querierv1.SelectMergeP
893893
return q.selectProfileFromIngesters(ctx, storeQueries.ingester.MergeProfileRequest(req), plan)
894894
}
895895

896-
g, ctx := errgroup.WithContext(ctx)
896+
g, gCtx := errgroup.WithContext(ctx)
897897
var lock sync.Mutex
898898
var merge pprof.ProfileMerge
899899
g.Go(func() error {
900-
ingesterProfile, err := q.selectProfileFromIngesters(ctx, storeQueries.ingester.MergeProfileRequest(req), plan)
900+
ingesterProfile, err := q.selectProfileFromIngesters(gCtx, storeQueries.ingester.MergeProfileRequest(req), plan)
901901
if err != nil {
902902
return err
903903
}
@@ -906,7 +906,7 @@ func (q *Querier) selectProfile(ctx context.Context, req *querierv1.SelectMergeP
906906
return merge.Merge(ingesterProfile)
907907
})
908908
g.Go(func() error {
909-
storegatewayProfile, err := q.selectProfileFromStoreGateway(ctx, storeQueries.storeGateway.MergeProfileRequest(req), plan)
909+
storegatewayProfile, err := q.selectProfileFromStoreGateway(gCtx, storeQueries.storeGateway.MergeProfileRequest(req), plan)
910910
if err != nil {
911911
return err
912912
}
@@ -1090,19 +1090,19 @@ func (q *Querier) selectSpanProfile(ctx context.Context, req *querierv1.SelectMe
10901090
return q.selectSpanProfileFromIngesters(ctx, storeQueries.ingester.MergeSpanProfileRequest(req), plan)
10911091
}
10921092

1093-
g, ctx := errgroup.WithContext(ctx)
1093+
g, gCtx := errgroup.WithContext(ctx)
10941094
var ingesterTree, storegatewayTree *phlaremodel.Tree
10951095
g.Go(func() error {
10961096
var err error
1097-
ingesterTree, err = q.selectSpanProfileFromIngesters(ctx, storeQueries.ingester.MergeSpanProfileRequest(req), plan)
1097+
ingesterTree, err = q.selectSpanProfileFromIngesters(gCtx, storeQueries.ingester.MergeSpanProfileRequest(req), plan)
10981098
if err != nil {
10991099
return err
11001100
}
11011101
return nil
11021102
})
11031103
g.Go(func() error {
11041104
var err error
1105-
storegatewayTree, err = q.selectSpanProfileFromStoreGateway(ctx, storeQueries.storeGateway.MergeSpanProfileRequest(req), plan)
1105+
storegatewayTree, err = q.selectSpanProfileFromStoreGateway(gCtx, storeQueries.storeGateway.MergeSpanProfileRequest(req), plan)
11061106
if err != nil {
11071107
return err
11081108
}

pkg/querier/store_gateway_querier.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1
206206
return nil, connect.NewError(connect.CodeInternal, err)
207207
}
208208
// send the first initial request to all ingesters.
209-
g, gCtx := errgroup.WithContext(ctx)
209+
g, _ := errgroup.WithContext(ctx)
210210
for _, r := range responses {
211211
r := r
212212
blockHints, err := BlockHints(plan, r.addr)
@@ -231,7 +231,7 @@ func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1
231231
}
232232

233233
// merge all profiles
234-
return selectMergeTree(gCtx, responses)
234+
return selectMergeTree(ctx, responses)
235235
}
236236

237237
func (q *Querier) selectProfileFromStoreGateway(ctx context.Context, req *querierv1.SelectMergeProfileRequest, plan map[string]*blockPlanEntry) (*googlev1.Profile, error) {
@@ -266,7 +266,7 @@ func (q *Querier) selectProfileFromStoreGateway(ctx context.Context, req *querie
266266
return nil, connect.NewError(connect.CodeInternal, err)
267267
}
268268
// send the first initial request to all ingesters.
269-
g, gCtx := errgroup.WithContext(ctx)
269+
g, _ := errgroup.WithContext(ctx)
270270
for _, r := range responses {
271271
r := r
272272
blockHints, err := BlockHints(plan, r.addr)
@@ -292,7 +292,7 @@ func (q *Querier) selectProfileFromStoreGateway(ctx context.Context, req *querie
292292
}
293293

294294
// merge all profiles
295-
return selectMergePprofProfile(gCtx, profileType, responses)
295+
return selectMergePprofProfile(ctx, profileType, responses)
296296
}
297297

298298
func (q *Querier) selectSeriesFromStoreGateway(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest, plan map[string]*blockPlanEntry) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
@@ -434,7 +434,7 @@ func (q *Querier) selectSpanProfileFromStoreGateway(ctx context.Context, req *qu
434434
return nil, connect.NewError(connect.CodeInternal, err)
435435
}
436436
// send the first initial request to all ingesters.
437-
g, gCtx := errgroup.WithContext(ctx)
437+
g, _ := errgroup.WithContext(ctx)
438438
for _, r := range responses {
439439
r := r
440440
blockHints, err := BlockHints(plan, r.addr)
@@ -460,7 +460,7 @@ func (q *Querier) selectSpanProfileFromStoreGateway(ctx context.Context, req *qu
460460
}
461461

462462
// merge all profiles
463-
return selectMergeSpanProfile(gCtx, responses)
463+
return selectMergeSpanProfile(ctx, responses)
464464
}
465465

466466
func (q *Querier) blockSelectFromStoreGateway(ctx context.Context, req *ingestv1.BlockMetadataRequest) ([]ResponseFromReplica[[]*typesv1.BlockInfo], error) {

0 commit comments

Comments
 (0)