diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index e5bab841604..aaeeb99c5e6 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -450,11 +450,10 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.Select") defer span.Finish() - newMatchers, shardMatcher, err := querysharding.ExtractShardingMatchers(matchers) + newMatchers, shardInfo, err := querysharding.ExtractShardingInfo(matchers) if err != nil { return storage.ErrSeriesSet(err) } - defer shardMatcher.Close() hints := storage.SelectHints{ Start: q.minT, @@ -501,8 +500,8 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool span, _ := opentracing.StartSpanFromContext(ctx, "parquetQuerier.Select") defer span.Finish() parquetCtx := InjectBlocksIntoContext(ctx, parquet...) - if shardMatcher != nil { - parquetCtx = injectShardMatcherIntoContext(parquetCtx, shardMatcher) + if shardInfo != nil { + parquetCtx = injectShardInfoIntoContext(parquetCtx, shardInfo) } p <- q.parquetQuerier.Select(parquetCtx, sortSeries, &hints, newMatchers...) }() @@ -604,11 +603,15 @@ func (f *shardMatcherLabelsFilter) Close() { } func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHints) (search.MaterializedLabelsFilter, bool) { - shardMatcher, exists := extractShardMatcherFromContext(ctx) - if !exists || !shardMatcher.IsSharded() { + shardInfo, exists := extractShardInfoFromContext(ctx) + if !exists { return nil, false } - return &shardMatcherLabelsFilter{shardMatcher: shardMatcher}, true + sm := shardInfo.Matcher(&querysharding.Buffers) + if !sm.IsSharded() { + return nil, false + } + return &shardMatcherLabelsFilter{shardMatcher: sm}, true } type cacheInterface[T any] interface { @@ -698,16 +701,16 @@ func (n noopCache[T]) Set(_ string, _ T) { } var ( - shardMatcherCtxKey contextKey = 1 + shardInfoCtxKey contextKey = 1 ) -func injectShardMatcherIntoContext(ctx context.Context, sm *storepb.ShardMatcher) context.Context { - return context.WithValue(ctx, shardMatcherCtxKey, sm) +func injectShardInfoIntoContext(ctx context.Context, si *storepb.ShardInfo) context.Context { + return context.WithValue(ctx, shardInfoCtxKey, si) } -func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher, bool) { - if sm := ctx.Value(shardMatcherCtxKey); sm != nil { - return sm.(*storepb.ShardMatcher), true +func extractShardInfoFromContext(ctx context.Context) (*storepb.ShardInfo, bool) { + if si := ctx.Value(shardInfoCtxKey); si != nil { + return si.(*storepb.ShardInfo), true } return nil, false diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 70baa010b66..3bf11097337 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -654,13 +654,7 @@ func TestMaterializedLabelsFilterCallback(t *testing.T) { Labels: []string{"__name__"}, } - buffers := &sync.Pool{New: func() interface{} { - b := make([]byte, 0, 100) - return &b - }} - shardMatcher := shardInfo.Matcher(buffers) - - return injectShardMatcherIntoContext(context.Background(), shardMatcher) + return injectShardInfoIntoContext(context.Background(), shardInfo) }, expectedFilterReturned: false, expectedCallbackReturned: false, @@ -676,13 +670,7 @@ func TestMaterializedLabelsFilterCallback(t *testing.T) { Labels: []string{"__name__"}, } - buffers := &sync.Pool{New: func() interface{} { - b := make([]byte, 0, 100) - return &b - }} - shardMatcher := shardInfo.Matcher(buffers) - - return injectShardMatcherIntoContext(context.Background(), shardMatcher) + return injectShardInfoIntoContext(context.Background(), shardInfo) }, expectedFilterReturned: true, expectedCallbackReturned: true, @@ -715,6 +703,30 @@ func TestMaterializedLabelsFilterCallback(t *testing.T) { } } +func TestMaterializedLabelsFilterCallbackConcurrent(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(10) + si := &storepb.ShardInfo{ + ShardIndex: 0, + TotalShards: 2, + By: true, + Labels: []string{"__name__"}, + } + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + ctx := injectShardInfoIntoContext(context.Background(), si) + filter, exists := materializedLabelsFilterCallback(ctx, nil) + require.Equal(t, true, exists) + for j := 0; j < 1000; j++ { + filter.Filter(labels.FromStrings("__name__", "test_metric", "label_1", strconv.Itoa(j))) + } + filter.Close() + }() + } + wg.Wait() +} + func TestParquetQueryableFallbackDisabled(t *testing.T) { block1 := ulid.MustNew(1, nil) block2 := ulid.MustNew(2, nil) diff --git a/pkg/querysharding/util.go b/pkg/querysharding/util.go index eafc3a71b4f..20d56a53d5c 100644 --- a/pkg/querysharding/util.go +++ b/pkg/querysharding/util.go @@ -18,7 +18,7 @@ const ( ) var ( - buffers = sync.Pool{New: func() interface{} { + Buffers = sync.Pool{New: func() interface{} { b := make([]byte, 0, 100) return &b }} @@ -79,7 +79,7 @@ func ExtractShardingMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, *st return r, nil, err } - return r, shardInfo.Matcher(&buffers), nil + return r, shardInfo.Matcher(&Buffers), nil } type disableBinaryExpressionAnalyzer struct {