From 54cf86af7cb2b22aa95b501387ea050afaa99af5 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Sun, 24 Aug 2025 13:41:37 -0700 Subject: [PATCH 1/2] fix sort series for parquet queryable Signed-off-by: yeya24 --- pkg/querier/parquet_queryable.go | 5 + pkg/querier/parquet_queryable_test.go | 146 ++++++++++++++++++++++++++ 2 files changed, 151 insertions(+) diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index e5bab841604..1d06fffe586 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -491,6 +491,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool if len(parquet) > 0 && len(remaining) > 0 { sortSeries = true } + // Also sort when multiple parquet blocks are being merged. + // We don't need to sort explicitly + if len(parquet) > 1 { + sortSeries = true + } promises := make([]chan storage.SeriesSet, 0, 2) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 70baa010b66..cf1dc187843 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -596,6 +596,7 @@ func defaultOverrides(t *testing.T, queryVerticalShardSize int) *validation.Over type mockParquetQuerier struct { queriedBlocks []*bucketindex.Block queriedHints *storage.SelectHints + sortSeries bool } func (m *mockParquetQuerier) Select(ctx context.Context, sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { @@ -603,6 +604,7 @@ func (m *mockParquetQuerier) Select(ctx context.Context, sortSeries bool, sp *st m.queriedBlocks = append(m.queriedBlocks, blocks...) } m.queriedHints = sp + m.sortSeries = sortSeries return series.NewConcreteSeriesSet(sortSeries, nil) } @@ -622,6 +624,7 @@ func (m *mockParquetQuerier) LabelNames(ctx context.Context, _ *storage.LabelHin func (m *mockParquetQuerier) Reset() { m.queriedBlocks = nil + m.sortSeries = false } func (mockParquetQuerier) Close() error { @@ -868,3 +871,146 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { }) }) } + +func TestParquetQueryableSelectSortSeries(t *testing.T) { + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + block3 := ulid.MustNew(3, nil) + minT := int64(10) + maxT := util.TimeToMillis(time.Now()) + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric"), + } + ctx := user.InjectOrgID(context.Background(), "user-1") + + t.Run("should sort series when multiple parquet blocks are queried", func(t *testing.T) { + finder := &blocksFinderMock{} + stores := &blocksStoreSetMock{mockedResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{}, + }} + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), + limits: &blocksStoreLimitsMock{}, + + storeGatewayConsistencyCheckMaxAttempts: 3, + } + + mParquetQuerier := &mockParquetQuerier{} + pq := &parquetQuerierWithFallback{ + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + queryStoreAfter: time.Hour, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, + } + + // Set up multiple parquet blocks (no TSDB blocks) + finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block3, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + t.Run("select should sort series when multiple parquet blocks", func(t *testing.T) { + mParquetQuerier.Reset() + ss := pq.Select(ctx, false, nil, matchers...) // Pass sortSeries=false to test if it gets overridden + require.NoError(t, ss.Err()) + require.Len(t, mParquetQuerier.queriedBlocks, 3) + + require.True(t, mParquetQuerier.sortSeries, "sortSeries should be true when multiple parquet blocks are merged") + }) + + t.Run("select with single parquet block should not require sorting", func(t *testing.T) { + // Reset the mock and set new expectations + finder.ExpectedCalls = nil + finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + mParquetQuerier.Reset() + ss := pq.Select(ctx, false, nil, matchers...) + require.NoError(t, ss.Err()) + require.Len(t, mParquetQuerier.queriedBlocks, 1) + // With single block, sortSeries=false should be preserved + }) + }) + + t.Run("should sort series when both parquet and TSDB blocks are queried", func(t *testing.T) { + finder := &blocksFinderMock{} + stores := &blocksStoreSetMock{mockedResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", + mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.FromStrings(labels.MetricName, "fromSg"), []cortexpb.Sample{{Value: 1, TimestampMs: minT}, {Value: 2, TimestampMs: minT + 1}}, nil, nil), + mockHintsResponse(block1, block2), + }, + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})), + Warnings: []string{}, + Hints: mockNamesHints(block1, block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})), + Warnings: []string{}, + Hints: mockValuesHints(block1, block2), + }, + }: {block1, block2}}, + }, + } + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), + limits: &blocksStoreLimitsMock{}, + + storeGatewayConsistencyCheckMaxAttempts: 3, + } + + mParquetQuerier := &mockParquetQuerier{} + pq := &parquetQuerierWithFallback{ + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + queryStoreAfter: time.Hour, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, + } + + // Set up mixed blocks (some parquet, some TSDB) + finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Parquet block + &bucketindex.Block{ID: block2}, // TSDB block + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + t.Run("select should sort series when mixing parquet and TSDB blocks", func(t *testing.T) { + stores.Reset() + mParquetQuerier.Reset() + ss := pq.Select(ctx, false, nil, matchers...) + require.NoError(t, ss.Err()) + require.Len(t, stores.queriedBlocks, 1) + require.Len(t, mParquetQuerier.queriedBlocks, 1) + }) + }) +} From 3fd4a2c26de1cce5da02dbfa9fef720bf11ad79c Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 25 Aug 2025 11:45:21 -0700 Subject: [PATCH 2/2] update comment Signed-off-by: yeya24 --- pkg/querier/parquet_queryable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 1d06fffe586..58ba7f673f3 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -492,7 +492,7 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool sortSeries = true } // Also sort when multiple parquet blocks are being merged. - // We don't need to sort explicitly + // We don't need to sort explicitly if only Store Gateway blocks as they are sorted by default. if len(parquet) > 1 { sortSeries = true }