diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index 60f05d722e6..0f0977bf9ce 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -8,6 +8,7 @@ import ( "github.com/oklog/ulid/v2" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/objstore" "github.com/cortexproject/cortex/pkg/util/validation" @@ -49,7 +50,7 @@ func NewBucketIndexBlocksFinder(cfg BucketIndexBlocksFinderConfig, bkt objstore. } // GetBlocks implements BlocksFinder. -func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { +func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, minT, maxT int64, _ []*labels.Matcher) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { if f.State() != services.Running { return nil, nil, errBucketIndexBlocksFinderNotRunning } diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go index 280939c16cb..99675d4748f 100644 --- a/pkg/querier/blocks_finder_bucket_index_test.go +++ b/pkg/querier/blocks_finder_bucket_index_test.go @@ -125,7 +125,7 @@ func TestBucketIndexBlocksFinder_GetBlocks(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, testData.minT, testData.maxT) + blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, testData.minT, testData.maxT, nil) require.NoError(t, err) require.ElementsMatch(t, testData.expectedBlocks, blocks) require.Equal(t, testData.expectedMarks, deletionMarks) @@ -165,7 +165,7 @@ func BenchmarkBucketIndexBlocksFinder_GetBlocks(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - blocks, marks, err := finder.GetBlocks(ctx, userID, 100, 200) + blocks, marks, err := finder.GetBlocks(ctx, userID, 100, 200, nil) if err != nil || len(blocks) != 11 || len(marks) != 11 { b.Fail() } @@ -181,7 +181,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexDoesNotExist(t *testing.T) bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) finder := prepareBucketIndexBlocksFinder(t, bkt) - blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, 10, 20) + blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, 10, 20, nil) require.NoError(t, err) assert.Empty(t, blocks) assert.Empty(t, deletionMarks) @@ -199,7 +199,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsCorrupted(t *testing.T) // Upload a corrupted bucket index. require.NoError(t, bkt.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid}!"))) - _, _, err := finder.GetBlocks(ctx, userID, 10, 20) + _, _, err := finder.GetBlocks(ctx, userID, 10, 20, nil) require.Equal(t, bucketindex.ErrIndexCorrupted, err) } @@ -219,7 +219,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOld(t *testing.T) { UpdatedAt: time.Now().Add(-2 * time.Hour).Unix(), })) - _, _, err := finder.GetBlocks(ctx, userID, 10, 20) + _, _, err := finder.GetBlocks(ctx, userID, 10, 20, nil) require.Equal(t, errBucketIndexTooOld, err) } @@ -270,10 +270,10 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOldWithCustomerKeyErr t.Run(name, func(t *testing.T) { bucketindex.WriteSyncStatus(ctx, bkt, userID, tc.ss, log.NewNopLogger()) finder := prepareBucketIndexBlocksFinder(t, bkt) - _, _, err := finder.GetBlocks(ctx, userID, 10, 20) + _, _, err := finder.GetBlocks(ctx, userID, 10, 20, nil) require.Equal(t, tc.err, err) // Doing 2 times to return from the cache - _, _, err = finder.GetBlocks(ctx, userID, 10, 20) + _, _, err = finder.GetBlocks(ctx, userID, 10, 20, nil) require.Equal(t, tc.err, err) }) } @@ -315,7 +315,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_KeyPermissionDenied(t *testing.T) { finder := prepareBucketIndexBlocksFinder(t, bkt) - _, _, err := finder.GetBlocks(context.Background(), userID, 0, 100) + _, _, err := finder.GetBlocks(context.Background(), userID, 0, 100, nil) expected := validation.AccessDeniedError("error") require.IsType(t, expected, err) } diff --git a/pkg/querier/blocks_finder_bucket_scan.go b/pkg/querier/blocks_finder_bucket_scan.go index 949ab5f6350..d047fd1421f 100644 --- a/pkg/querier/blocks_finder_bucket_scan.go +++ b/pkg/querier/blocks_finder_bucket_scan.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" @@ -111,7 +112,7 @@ func NewBucketScanBlocksFinder(cfg BucketScanBlocksFinderConfig, usersScanner us // GetBlocks returns known blocks for userID containing samples within the range minT // and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending. -func (d *BucketScanBlocksFinder) GetBlocks(_ context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { +func (d *BucketScanBlocksFinder) GetBlocks(_ context.Context, userID string, minT, maxT int64, _ []*labels.Matcher) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { // We need to ensure the initial full bucket scan succeeded. if d.State() != services.Running { return nil, nil, errBucketScanBlocksFinderNotRunning diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go index 8393e4b12c6..b81f6d7f910 100644 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ b/pkg/querier/blocks_finder_bucket_scan_test.go @@ -39,7 +39,7 @@ func TestBucketScanBlocksFinder_InitialScan(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 2, len(blocks)) assert.Equal(t, user1Block2.ULID, blocks[0].ID) @@ -48,7 +48,7 @@ func TestBucketScanBlocksFinder_InitialScan(t *testing.T) { assert.WithinDuration(t, time.Now(), blocks[1].GetUploadedAt(), 5*time.Second) assert.Empty(t, deletionMarks) - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-2", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-2", 0, 30, nil) require.NoError(t, err) require.Equal(t, 1, len(blocks)) assert.Equal(t, user2Block1.ULID, blocks[0].ID) @@ -110,7 +110,7 @@ func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) { require.NoError(t, s.StartAsync(ctx)) require.Error(t, s.AwaitRunning(ctx)) - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil) assert.Equal(t, errBucketScanBlocksFinderNotRunning, err) assert.Nil(t, blocks) assert.Nil(t, deletionMarks) @@ -233,7 +233,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsNewUser(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 0, len(blocks)) assert.Empty(t, deletionMarks) @@ -245,7 +245,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsNewUser(t *testing.T) { // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 2, len(blocks)) assert.Equal(t, block2.ULID, blocks[0].ID) @@ -266,7 +266,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsNewBlock(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 1, len(blocks)) assert.Equal(t, block1.ULID, blocks[0].ID) @@ -278,7 +278,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsNewBlock(t *testing.T) { // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 2, len(blocks)) assert.Equal(t, block2.ULID, blocks[0].ID) @@ -298,7 +298,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsBlockMarkedForDeletion(t *testi require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 2, len(blocks)) assert.Equal(t, block2.ULID, blocks[0].ID) @@ -310,7 +310,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsBlockMarkedForDeletion(t *testi // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 2, len(blocks)) assert.Equal(t, block2.ULID, blocks[0].ID) @@ -330,7 +330,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedBlock(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 2, len(blocks)) assert.Equal(t, block2.ULID, blocks[0].ID) @@ -342,7 +342,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedBlock(t *testing.T) { // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 1, len(blocks)) assert.Equal(t, block2.ULID, blocks[0].ID) @@ -359,7 +359,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedUser(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 2, len(blocks)) assert.Equal(t, block2.ULID, blocks[0].ID) @@ -371,7 +371,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedUser(t *testing.T) { // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30, nil) require.NoError(t, err) require.Equal(t, 0, len(blocks)) assert.Empty(t, deletionMarks) @@ -387,7 +387,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t require.NoError(t, services.StartAndAwaitRunning(ctx, s)) - blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 40) + blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 40, nil) require.NoError(t, err) require.Equal(t, 2, len(blocks)) assert.Equal(t, block2.ULID, blocks[0].ID) @@ -399,7 +399,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40, nil) require.NoError(t, err) require.Equal(t, 0, len(blocks)) assert.Empty(t, deletionMarks) @@ -409,7 +409,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t // Trigger a periodic sync require.NoError(t, s.scan(ctx)) - blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40) + blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40, nil) require.NoError(t, err) require.Equal(t, 1, len(blocks)) assert.Equal(t, block3.ULID, blocks[0].ID) @@ -506,7 +506,7 @@ func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - metas, deletionMarks, err := s.GetBlocks(ctx, "user-1", testData.minT, testData.maxT) + metas, deletionMarks, err := s.GetBlocks(ctx, "user-1", testData.minT, testData.maxT, nil) require.NoError(t, err) require.Equal(t, len(testData.expectedMetas), len(metas)) require.Equal(t, testData.expectedMarks, deletionMarks) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 98b58a83361..41cf8201634 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -86,7 +86,7 @@ type BlocksFinder interface { // GetBlocks returns known blocks for userID containing samples within the range minT // and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending. - GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) + GetBlocks(ctx context.Context, userID string, minT, maxT int64, matchers []*labels.Matcher) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) } // BlocksStoreClient is the interface that should be implemented by any client used @@ -373,7 +373,7 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe return queriedBlocks, nil, retryableError } - if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, userID, queryFunc); err != nil { + if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, matchers, userID, queryFunc); err != nil { return nil, nil, err } @@ -416,7 +416,7 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints return queriedBlocks, nil, retryableError } - if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, userID, queryFunc); err != nil { + if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, matchers, userID, queryFunc); err != nil { return nil, nil, err } @@ -472,7 +472,7 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec return queriedBlocks, nil, retryableError } - if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, userID, queryFunc); err != nil { + if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, matchers, userID, queryFunc); err != nil { return storage.ErrSeriesSet(err) } @@ -485,8 +485,8 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec resWarnings) } -func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, userID string, - queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error)) error { +func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, matchers []*labels.Matcher, + userID string, queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error)) error { // If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until // now - queryStoreAfter, because the most recent time range is covered by ingesters. This // optimization is particularly important for the blocks storage because can be used to skip @@ -508,7 +508,7 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg } // Find the list of blocks we need to query given the time range. - knownBlocks, knownDeletionMarks, err := q.finder.GetBlocks(ctx, userID, minT, maxT) + knownBlocks, knownDeletionMarks, err := q.finder.GetBlocks(ctx, userID, minT, maxT, matchers) // if blocks were already discovered, we should use then if b, ok := ExtractBlocksFromContext(ctx); ok { diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 9f890fc3902..4070e0b7383 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -1568,7 +1568,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { reg := prometheus.NewPedanticRegistry() stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} - finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr) + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr) q := &blocksStoreQuerier{ minT: minT, @@ -1664,7 +1664,7 @@ func TestOverrideBlockDiscovery(t *testing.T) { } finder := &blocksFinderMock{} // return block 1 and 2 on finder but only query block 1 - finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{ &bucketindex.Block{ID: block1}, &bucketindex.Block{ID: block2}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -2213,7 +2213,7 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { reg := prometheus.NewPedanticRegistry() stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} - finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr) + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr) q := &blocksStoreQuerier{ minT: minT, @@ -2321,7 +2321,7 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) ctx := user.InjectOrgID(context.Background(), "user-1") finder := &blocksFinderMock{} - finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) + finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) q := &blocksStoreQuerier{ minT: testData.queryMinT, @@ -2385,7 +2385,7 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) { finder := &blocksFinderMock{ Service: services.NewIdleService(nil, nil), } - finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(bucketindex.Blocks{ + finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ &bucketindex.Block{ID: block1}, &bucketindex.Block{ID: block2}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) @@ -2535,8 +2535,8 @@ type blocksFinderMock struct { mock.Mock } -func (m *blocksFinderMock) GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { - args := m.Called(ctx, userID, minT, maxT) +func (m *blocksFinderMock) GetBlocks(ctx context.Context, userID string, minT, maxT int64, matchers []*labels.Matcher) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { + args := m.Called(ctx, userID, minT, maxT, matchers) return args.Get(0).(bucketindex.Blocks), args.Get(1).(map[ulid.ULID]*bucketindex.BlockDeletionMark), args.Error(2) } diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 9d24f58219a..e5bab841604 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -343,7 +343,7 @@ func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name strin span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.LabelValues") defer span.Finish() - remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT) + remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT, matchers) defer q.incrementOpsMetric("LabelValues", remaining, parquet) if err != nil { return nil, nil, err @@ -396,7 +396,7 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.LabelNames") defer span.Finish() - remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT) + remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT, matchers) defer q.incrementOpsMetric("LabelNames", remaining, parquet) if err != nil { return nil, nil, err @@ -475,7 +475,7 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool return storage.EmptySeriesSet() } - remaining, parquet, err := q.getBlocks(ctx, mint, maxt) + remaining, parquet, err := q.getBlocks(ctx, mint, maxt, matchers) defer q.incrementOpsMetric("Select", remaining, parquet) if err != nil { @@ -547,7 +547,7 @@ func (q *parquetQuerierWithFallback) Close() error { return mErr.Err() } -func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT int64) ([]*bucketindex.Block, []*bucketindex.Block, error) { +func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT int64, matchers []*labels.Matcher) ([]*bucketindex.Block, []*bucketindex.Block, error) { userID, err := tenant.TenantID(ctx) if err != nil { return nil, nil, err @@ -559,7 +559,7 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i return nil, nil, nil } - blocks, _, err := q.finder.GetBlocks(ctx, userID, minT, maxT) + blocks, _, err := q.finder.GetBlocks(ctx, userID, minT, maxT, matchers) if err != nil { return nil, nil, err } diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 5bb6bed1c83..70baa010b66 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -108,7 +108,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { defaultBlockStoreType: parquetBlockStore, } - finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{ + finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ &bucketindex.Block{ID: block1}, &bucketindex.Block{ID: block2}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -169,7 +169,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { defaultBlockStoreType: parquetBlockStore, } - finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{ &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, &bucketindex.Block{ID: block2}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -238,7 +238,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { defaultBlockStoreType: parquetBlockStore, } - finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{ + finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -312,7 +312,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { defaultBlockStoreType: tsdbBlockStore, } - finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{ &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -433,7 +433,7 @@ func TestParquetQueryable_Limits(t *testing.T) { // Create a mocked bucket index blocks finder finder := &blocksFinderMock{} - finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{ &bucketindex.Block{ID: blockID, Parquet: &parquet.ConverterMarkMeta{Version: parquet.CurrentVersion}}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -782,7 +782,7 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { } // Set up blocks where block1 has parquet metadata but block2 doesn't - finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{ + finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet &bucketindex.Block{ID: block2}, // Not available as parquet }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -841,7 +841,7 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { } // Set up blocks where both blocks have parquet metadata - finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{ + finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)