Skip to content

Commit cc51a01

Browse files
authored
Fix parquet queryable shard matcher lifecycle (cortexproject#6993)
* fix parquet queryable shard matcher lifecycle Signed-off-by: yeya24 <[email protected]> * unit test Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]>
1 parent 72d9b92 commit cc51a01

File tree

3 files changed

+44
-29
lines changed

3 files changed

+44
-29
lines changed

pkg/querier/parquet_queryable.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -450,11 +450,10 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
450450
span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.Select")
451451
defer span.Finish()
452452

453-
newMatchers, shardMatcher, err := querysharding.ExtractShardingMatchers(matchers)
453+
newMatchers, shardInfo, err := querysharding.ExtractShardingInfo(matchers)
454454
if err != nil {
455455
return storage.ErrSeriesSet(err)
456456
}
457-
defer shardMatcher.Close()
458457

459458
hints := storage.SelectHints{
460459
Start: q.minT,
@@ -501,8 +500,8 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
501500
span, _ := opentracing.StartSpanFromContext(ctx, "parquetQuerier.Select")
502501
defer span.Finish()
503502
parquetCtx := InjectBlocksIntoContext(ctx, parquet...)
504-
if shardMatcher != nil {
505-
parquetCtx = injectShardMatcherIntoContext(parquetCtx, shardMatcher)
503+
if shardInfo != nil {
504+
parquetCtx = injectShardInfoIntoContext(parquetCtx, shardInfo)
506505
}
507506
p <- q.parquetQuerier.Select(parquetCtx, sortSeries, &hints, newMatchers...)
508507
}()
@@ -604,11 +603,15 @@ func (f *shardMatcherLabelsFilter) Close() {
604603
}
605604

606605
func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHints) (search.MaterializedLabelsFilter, bool) {
607-
shardMatcher, exists := extractShardMatcherFromContext(ctx)
608-
if !exists || !shardMatcher.IsSharded() {
606+
shardInfo, exists := extractShardInfoFromContext(ctx)
607+
if !exists {
609608
return nil, false
610609
}
611-
return &shardMatcherLabelsFilter{shardMatcher: shardMatcher}, true
610+
sm := shardInfo.Matcher(&querysharding.Buffers)
611+
if !sm.IsSharded() {
612+
return nil, false
613+
}
614+
return &shardMatcherLabelsFilter{shardMatcher: sm}, true
612615
}
613616

614617
type cacheInterface[T any] interface {
@@ -698,16 +701,16 @@ func (n noopCache[T]) Set(_ string, _ T) {
698701
}
699702

700703
var (
701-
shardMatcherCtxKey contextKey = 1
704+
shardInfoCtxKey contextKey = 1
702705
)
703706

704-
func injectShardMatcherIntoContext(ctx context.Context, sm *storepb.ShardMatcher) context.Context {
705-
return context.WithValue(ctx, shardMatcherCtxKey, sm)
707+
func injectShardInfoIntoContext(ctx context.Context, si *storepb.ShardInfo) context.Context {
708+
return context.WithValue(ctx, shardInfoCtxKey, si)
706709
}
707710

708-
func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher, bool) {
709-
if sm := ctx.Value(shardMatcherCtxKey); sm != nil {
710-
return sm.(*storepb.ShardMatcher), true
711+
func extractShardInfoFromContext(ctx context.Context) (*storepb.ShardInfo, bool) {
712+
if si := ctx.Value(shardInfoCtxKey); si != nil {
713+
return si.(*storepb.ShardInfo), true
711714
}
712715

713716
return nil, false

pkg/querier/parquet_queryable_test.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -654,13 +654,7 @@ func TestMaterializedLabelsFilterCallback(t *testing.T) {
654654
Labels: []string{"__name__"},
655655
}
656656

657-
buffers := &sync.Pool{New: func() interface{} {
658-
b := make([]byte, 0, 100)
659-
return &b
660-
}}
661-
shardMatcher := shardInfo.Matcher(buffers)
662-
663-
return injectShardMatcherIntoContext(context.Background(), shardMatcher)
657+
return injectShardInfoIntoContext(context.Background(), shardInfo)
664658
},
665659
expectedFilterReturned: false,
666660
expectedCallbackReturned: false,
@@ -676,13 +670,7 @@ func TestMaterializedLabelsFilterCallback(t *testing.T) {
676670
Labels: []string{"__name__"},
677671
}
678672

679-
buffers := &sync.Pool{New: func() interface{} {
680-
b := make([]byte, 0, 100)
681-
return &b
682-
}}
683-
shardMatcher := shardInfo.Matcher(buffers)
684-
685-
return injectShardMatcherIntoContext(context.Background(), shardMatcher)
673+
return injectShardInfoIntoContext(context.Background(), shardInfo)
686674
},
687675
expectedFilterReturned: true,
688676
expectedCallbackReturned: true,
@@ -715,6 +703,30 @@ func TestMaterializedLabelsFilterCallback(t *testing.T) {
715703
}
716704
}
717705

706+
func TestMaterializedLabelsFilterCallbackConcurrent(t *testing.T) {
707+
wg := sync.WaitGroup{}
708+
wg.Add(10)
709+
si := &storepb.ShardInfo{
710+
ShardIndex: 0,
711+
TotalShards: 2,
712+
By: true,
713+
Labels: []string{"__name__"},
714+
}
715+
for i := 0; i < 10; i++ {
716+
go func() {
717+
defer wg.Done()
718+
ctx := injectShardInfoIntoContext(context.Background(), si)
719+
filter, exists := materializedLabelsFilterCallback(ctx, nil)
720+
require.Equal(t, true, exists)
721+
for j := 0; j < 1000; j++ {
722+
filter.Filter(labels.FromStrings("__name__", "test_metric", "label_1", strconv.Itoa(j)))
723+
}
724+
filter.Close()
725+
}()
726+
}
727+
wg.Wait()
728+
}
729+
718730
func TestParquetQueryableFallbackDisabled(t *testing.T) {
719731
block1 := ulid.MustNew(1, nil)
720732
block2 := ulid.MustNew(2, nil)

pkg/querysharding/util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const (
1818
)
1919

2020
var (
21-
buffers = sync.Pool{New: func() interface{} {
21+
Buffers = sync.Pool{New: func() interface{} {
2222
b := make([]byte, 0, 100)
2323
return &b
2424
}}
@@ -79,7 +79,7 @@ func ExtractShardingMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, *st
7979
return r, nil, err
8080
}
8181

82-
return r, shardInfo.Matcher(&buffers), nil
82+
return r, shardInfo.Matcher(&Buffers), nil
8383
}
8484

8585
type disableBinaryExpressionAnalyzer struct {

0 commit comments

Comments
 (0)