Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/querier/blocks_finder_bucket_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/oklog/ulid/v2"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"

"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down Expand Up @@ -49,7 +50,7 @@ func NewBucketIndexBlocksFinder(cfg BucketIndexBlocksFinderConfig, bkt objstore.
}

// GetBlocks implements BlocksFinder.
func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, minT, maxT int64, _ []*labels.Matcher) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
if f.State() != services.Running {
return nil, nil, errBucketIndexBlocksFinderNotRunning
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/querier/blocks_finder_bucket_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestBucketIndexBlocksFinder_GetBlocks(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, testData.minT, testData.maxT)
blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, testData.minT, testData.maxT, nil)
require.NoError(t, err)
require.ElementsMatch(t, testData.expectedBlocks, blocks)
require.Equal(t, testData.expectedMarks, deletionMarks)
Expand Down Expand Up @@ -165,7 +165,7 @@ func BenchmarkBucketIndexBlocksFinder_GetBlocks(b *testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
blocks, marks, err := finder.GetBlocks(ctx, userID, 100, 200)
blocks, marks, err := finder.GetBlocks(ctx, userID, 100, 200, nil)
if err != nil || len(blocks) != 11 || len(marks) != 11 {
b.Fail()
}
Expand All @@ -181,7 +181,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexDoesNotExist(t *testing.T)
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
finder := prepareBucketIndexBlocksFinder(t, bkt)

blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, 10, 20)
blocks, deletionMarks, err := finder.GetBlocks(ctx, userID, 10, 20, nil)
require.NoError(t, err)
assert.Empty(t, blocks)
assert.Empty(t, deletionMarks)
Expand All @@ -199,7 +199,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsCorrupted(t *testing.T)
// Upload a corrupted bucket index.
require.NoError(t, bkt.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid}!")))

_, _, err := finder.GetBlocks(ctx, userID, 10, 20)
_, _, err := finder.GetBlocks(ctx, userID, 10, 20, nil)
require.Equal(t, bucketindex.ErrIndexCorrupted, err)
}

Expand All @@ -219,7 +219,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOld(t *testing.T) {
UpdatedAt: time.Now().Add(-2 * time.Hour).Unix(),
}))

_, _, err := finder.GetBlocks(ctx, userID, 10, 20)
_, _, err := finder.GetBlocks(ctx, userID, 10, 20, nil)
require.Equal(t, errBucketIndexTooOld, err)
}

Expand Down Expand Up @@ -270,10 +270,10 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOldWithCustomerKeyErr
t.Run(name, func(t *testing.T) {
bucketindex.WriteSyncStatus(ctx, bkt, userID, tc.ss, log.NewNopLogger())
finder := prepareBucketIndexBlocksFinder(t, bkt)
_, _, err := finder.GetBlocks(ctx, userID, 10, 20)
_, _, err := finder.GetBlocks(ctx, userID, 10, 20, nil)
require.Equal(t, tc.err, err)
// Doing 2 times to return from the cache
_, _, err = finder.GetBlocks(ctx, userID, 10, 20)
_, _, err = finder.GetBlocks(ctx, userID, 10, 20, nil)
require.Equal(t, tc.err, err)
})
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestBucketIndexBlocksFinder_GetBlocks_KeyPermissionDenied(t *testing.T) {

finder := prepareBucketIndexBlocksFinder(t, bkt)

_, _, err := finder.GetBlocks(context.Background(), userID, 0, 100)
_, _, err := finder.GetBlocks(context.Background(), userID, 0, 100, nil)
expected := validation.AccessDeniedError("error")
require.IsType(t, expected, err)
}
3 changes: 2 additions & 1 deletion pkg/querier/blocks_finder_bucket_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
Expand Down Expand Up @@ -111,7 +112,7 @@ func NewBucketScanBlocksFinder(cfg BucketScanBlocksFinderConfig, usersScanner us

// GetBlocks returns known blocks for userID containing samples within the range minT
// and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending.
func (d *BucketScanBlocksFinder) GetBlocks(_ context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
func (d *BucketScanBlocksFinder) GetBlocks(_ context.Context, userID string, minT, maxT int64, _ []*labels.Matcher) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
// We need to ensure the initial full bucket scan succeeded.
if d.State() != services.Running {
return nil, nil, errBucketScanBlocksFinderNotRunning
Expand Down
34 changes: 17 additions & 17 deletions pkg/querier/blocks_finder_bucket_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestBucketScanBlocksFinder_InitialScan(t *testing.T) {

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 2, len(blocks))
assert.Equal(t, user1Block2.ULID, blocks[0].ID)
Expand All @@ -48,7 +48,7 @@ func TestBucketScanBlocksFinder_InitialScan(t *testing.T) {
assert.WithinDuration(t, time.Now(), blocks[1].GetUploadedAt(), 5*time.Second)
assert.Empty(t, deletionMarks)

blocks, deletionMarks, err = s.GetBlocks(ctx, "user-2", 0, 30)
blocks, deletionMarks, err = s.GetBlocks(ctx, "user-2", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 1, len(blocks))
assert.Equal(t, user2Block1.ULID, blocks[0].ID)
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) {
require.NoError(t, s.StartAsync(ctx))
require.Error(t, s.AwaitRunning(ctx))

blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil)
assert.Equal(t, errBucketScanBlocksFinderNotRunning, err)
assert.Nil(t, blocks)
assert.Nil(t, deletionMarks)
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsNewUser(t *testing.T) {

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 0, len(blocks))
assert.Empty(t, deletionMarks)
Expand All @@ -245,7 +245,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsNewUser(t *testing.T) {
// Trigger a periodic sync
require.NoError(t, s.scan(ctx))

blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 2, len(blocks))
assert.Equal(t, block2.ULID, blocks[0].ID)
Expand All @@ -266,7 +266,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsNewBlock(t *testing.T) {

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 1, len(blocks))
assert.Equal(t, block1.ULID, blocks[0].ID)
Expand All @@ -278,7 +278,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsNewBlock(t *testing.T) {
// Trigger a periodic sync
require.NoError(t, s.scan(ctx))

blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 2, len(blocks))
assert.Equal(t, block2.ULID, blocks[0].ID)
Expand All @@ -298,7 +298,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsBlockMarkedForDeletion(t *testi

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 2, len(blocks))
assert.Equal(t, block2.ULID, blocks[0].ID)
Expand All @@ -310,7 +310,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsBlockMarkedForDeletion(t *testi
// Trigger a periodic sync
require.NoError(t, s.scan(ctx))

blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 2, len(blocks))
assert.Equal(t, block2.ULID, blocks[0].ID)
Expand All @@ -330,7 +330,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedBlock(t *testing.T) {

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 2, len(blocks))
assert.Equal(t, block2.ULID, blocks[0].ID)
Expand All @@ -342,7 +342,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedBlock(t *testing.T) {
// Trigger a periodic sync
require.NoError(t, s.scan(ctx))

blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 1, len(blocks))
assert.Equal(t, block2.ULID, blocks[0].ID)
Expand All @@ -359,7 +359,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedUser(t *testing.T) {

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 2, len(blocks))
assert.Equal(t, block2.ULID, blocks[0].ID)
Expand All @@ -371,7 +371,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsDeletedUser(t *testing.T) {
// Trigger a periodic sync
require.NoError(t, s.scan(ctx))

blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30)
blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 30, nil)
require.NoError(t, err)
require.Equal(t, 0, len(blocks))
assert.Empty(t, deletionMarks)
Expand All @@ -387,7 +387,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 40)
blocks, deletionMarks, err := s.GetBlocks(ctx, "user-1", 0, 40, nil)
require.NoError(t, err)
require.Equal(t, 2, len(blocks))
assert.Equal(t, block2.ULID, blocks[0].ID)
Expand All @@ -399,7 +399,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t
// Trigger a periodic sync
require.NoError(t, s.scan(ctx))

blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40)
blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40, nil)
require.NoError(t, err)
require.Equal(t, 0, len(blocks))
assert.Empty(t, deletionMarks)
Expand All @@ -409,7 +409,7 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t
// Trigger a periodic sync
require.NoError(t, s.scan(ctx))

blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40)
blocks, deletionMarks, err = s.GetBlocks(ctx, "user-1", 0, 40, nil)
require.NoError(t, err)
require.Equal(t, 1, len(blocks))
assert.Equal(t, block3.ULID, blocks[0].ID)
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

metas, deletionMarks, err := s.GetBlocks(ctx, "user-1", testData.minT, testData.maxT)
metas, deletionMarks, err := s.GetBlocks(ctx, "user-1", testData.minT, testData.maxT, nil)
require.NoError(t, err)
require.Equal(t, len(testData.expectedMetas), len(metas))
require.Equal(t, testData.expectedMarks, deletionMarks)
Expand Down
14 changes: 7 additions & 7 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type BlocksFinder interface {

// GetBlocks returns known blocks for userID containing samples within the range minT
// and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending.
GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error)
GetBlocks(ctx context.Context, userID string, minT, maxT int64, matchers []*labels.Matcher) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error)
}

// BlocksStoreClient is the interface that should be implemented by any client used
Expand Down Expand Up @@ -373,7 +373,7 @@ func (q *blocksStoreQuerier) LabelNames(ctx context.Context, hints *storage.Labe
return queriedBlocks, nil, retryableError
}

if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, userID, queryFunc); err != nil {
if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, matchers, userID, queryFunc); err != nil {
return nil, nil, err
}

Expand Down Expand Up @@ -416,7 +416,7 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, hints
return queriedBlocks, nil, retryableError
}

if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, userID, queryFunc); err != nil {
if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, matchers, userID, queryFunc); err != nil {
return nil, nil, err
}

Expand Down Expand Up @@ -472,7 +472,7 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec
return queriedBlocks, nil, retryableError
}

if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, userID, queryFunc); err != nil {
if err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, matchers, userID, queryFunc); err != nil {
return storage.ErrSeriesSet(err)
}

Expand All @@ -485,8 +485,8 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec
resWarnings)
}

func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, userID string,
queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error)) error {
func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, matchers []*labels.Matcher,
userID string, queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error)) error {
// If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until
// now - queryStoreAfter, because the most recent time range is covered by ingesters. This
// optimization is particularly important for the blocks storage because can be used to skip
Expand All @@ -508,7 +508,7 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg
}

// Find the list of blocks we need to query given the time range.
knownBlocks, knownDeletionMarks, err := q.finder.GetBlocks(ctx, userID, minT, maxT)
knownBlocks, knownDeletionMarks, err := q.finder.GetBlocks(ctx, userID, minT, maxT, matchers)

// if blocks were already discovered, we should use then
if b, ok := ExtractBlocksFromContext(ctx); ok {
Expand Down
14 changes: 7 additions & 7 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses}
finder := &blocksFinderMock{}
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr)
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr)

q := &blocksStoreQuerier{
minT: minT,
Expand Down Expand Up @@ -1664,7 +1664,7 @@ func TestOverrideBlockDiscovery(t *testing.T) {
}
finder := &blocksFinderMock{}
// return block 1 and 2 on finder but only query block 1
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{
&bucketindex.Block{ID: block1},
&bucketindex.Block{ID: block2},
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)
Expand Down Expand Up @@ -2213,7 +2213,7 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses}
finder := &blocksFinderMock{}
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr)
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr)

q := &blocksStoreQuerier{
minT: minT,
Expand Down Expand Up @@ -2321,7 +2321,7 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T)

ctx := user.InjectOrgID(context.Background(), "user-1")
finder := &blocksFinderMock{}
finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil))
finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil))

q := &blocksStoreQuerier{
minT: testData.queryMinT,
Expand Down Expand Up @@ -2385,7 +2385,7 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) {
finder := &blocksFinderMock{
Service: services.NewIdleService(nil, nil),
}
finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(bucketindex.Blocks{
finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything, mock.Anything).Return(bucketindex.Blocks{
&bucketindex.Block{ID: block1},
&bucketindex.Block{ID: block2},
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil))
Expand Down Expand Up @@ -2535,8 +2535,8 @@ type blocksFinderMock struct {
mock.Mock
}

func (m *blocksFinderMock) GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
args := m.Called(ctx, userID, minT, maxT)
func (m *blocksFinderMock) GetBlocks(ctx context.Context, userID string, minT, maxT int64, matchers []*labels.Matcher) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
args := m.Called(ctx, userID, minT, maxT, matchers)
return args.Get(0).(bucketindex.Blocks), args.Get(1).(map[ulid.ULID]*bucketindex.BlockDeletionMark), args.Error(2)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name strin
span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.LabelValues")
defer span.Finish()

remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT)
remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT, matchers)
defer q.incrementOpsMetric("LabelValues", remaining, parquet)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -547,7 +547,7 @@ func (q *parquetQuerierWithFallback) Close() error {
return mErr.Err()
}

func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT int64) ([]*bucketindex.Block, []*bucketindex.Block, error) {
func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT int64, matchers []*labels.Matcher) ([]*bucketindex.Block, []*bucketindex.Block, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, nil, err
Expand All @@ -559,7 +559,7 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i
return nil, nil, nil
}

blocks, _, err := q.finder.GetBlocks(ctx, userID, minT, maxT)
blocks, _, err := q.finder.GetBlocks(ctx, userID, minT, maxT, matchers)
if err != nil {
return nil, nil, err
}
Expand Down
Loading