Skip to content

Commit bc722c1

Browse files
authored
Add a config to allow disable fallback in Parquet Queryable (cortexproject#6920)
* add configuration to disallow parquet fallback to store gateway Signed-off-by: yeya24 <[email protected]> * make parquet consistency check error as a function Signed-off-by: yeya24 <[email protected]> * changelog Signed-off-by: yeya24 <[email protected]> * disable parquet fallback Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]>
1 parent e277b85 commit bc722c1

File tree

4 files changed

+191
-1
lines changed

4 files changed

+191
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870
6161
* [ENHANCEMENT] Ring: Add zone label to ring_members metric. #6900
6262
* [ENHANCEMENT] Ingester: Add new metric `cortex_ingester_push_errors_total` to track reasons for ingester request failures. #6901
63+
* [ENHANCEMENT] Parquet Storage: Allow Parquet Queryable to disable fallback to Store Gateway. #6920
6364
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
6465
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
6566
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

pkg/querier/parquet_queryable.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package querier
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"time"
78

89
"github.com/go-kit/log"
@@ -50,7 +51,9 @@ const (
5051
parquetBlockStore blockStoreType = "parquet"
5152
)
5253

53-
var validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}
54+
var (
55+
validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}
56+
)
5457

5558
// AddBlockStoreTypeToContext checks HTTP header and set block store key to context if
5659
// relevant header is set.
@@ -91,6 +94,7 @@ func newParquetQueryableFallbackMetrics(reg prometheus.Registerer) *parquetQuery
9194
type parquetQueryableWithFallback struct {
9295
services.Service
9396

97+
fallbackDisabled bool
9498
queryStoreAfter time.Duration
9599
parquetQueryable storage.Queryable
96100
blockStorageQueryable *BlocksStoreQueryable
@@ -255,6 +259,7 @@ func NewParquetQueryable(
255259
limits: limits,
256260
logger: logger,
257261
defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore),
262+
fallbackDisabled: config.ParquetQueryableFallbackDisabled,
258263
}
259264

260265
p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
@@ -307,6 +312,7 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie
307312
limits: p.limits,
308313
logger: p.logger,
309314
defaultBlockStoreType: p.defaultBlockStoreType,
315+
fallbackDisabled: p.fallbackDisabled,
310316
}, nil
311317
}
312318

@@ -329,6 +335,8 @@ type parquetQuerierWithFallback struct {
329335
logger log.Logger
330336

331337
defaultBlockStoreType blockStoreType
338+
339+
fallbackDisabled bool
332340
}
333341

334342
func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
@@ -351,6 +359,10 @@ func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name strin
351359
rAnnotations annotations.Annotations
352360
)
353361

362+
if len(remaining) > 0 && q.fallbackDisabled {
363+
return nil, nil, parquetConsistencyCheckError(remaining)
364+
}
365+
354366
if len(parquet) > 0 {
355367
res, ann, qErr := q.parquetQuerier.LabelValues(InjectBlocksIntoContext(ctx, parquet...), name, hints, matchers...)
356368
if qErr != nil {
@@ -401,6 +413,10 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor
401413
rAnnotations annotations.Annotations
402414
)
403415

416+
if len(remaining) > 0 && q.fallbackDisabled {
417+
return nil, nil, parquetConsistencyCheckError(remaining)
418+
}
419+
404420
if len(parquet) > 0 {
405421
res, ann, qErr := q.parquetQuerier.LabelNames(InjectBlocksIntoContext(ctx, parquet...), hints, matchers...)
406422
if qErr != nil {
@@ -466,6 +482,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
466482
return storage.ErrSeriesSet(err)
467483
}
468484

485+
if len(remaining) > 0 && q.fallbackDisabled {
486+
err = parquetConsistencyCheckError(remaining)
487+
return storage.ErrSeriesSet(err)
488+
}
489+
469490
// Lets sort the series to merge
470491
if len(parquet) > 0 && len(remaining) > 0 {
471492
sortSeries = true
@@ -691,3 +712,15 @@ func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher,
691712

692713
return nil, false
693714
}
715+
716+
func parquetConsistencyCheckError(blocks []*bucketindex.Block) error {
717+
return fmt.Errorf("consistency check failed because some blocks were not available as parquet files: %s", strings.Join(convertBlockULIDToString(blocks), " "))
718+
}
719+
720+
func convertBlockULIDToString(blocks []*bucketindex.Block) []string {
721+
res := make([]string, len(blocks))
722+
for idx, b := range blocks {
723+
res[idx] = b.ID.String()
724+
}
725+
return res
726+
}

pkg/querier/parquet_queryable_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,3 +716,157 @@ func TestMaterializedLabelsFilterCallback(t *testing.T) {
716716
})
717717
}
718718
}
719+
720+
func TestParquetQueryableFallbackDisabled(t *testing.T) {
721+
block1 := ulid.MustNew(1, nil)
722+
block2 := ulid.MustNew(2, nil)
723+
minT := int64(10)
724+
maxT := util.TimeToMillis(time.Now())
725+
726+
createStore := func() *blocksStoreSetMock {
727+
return &blocksStoreSetMock{mockedResponses: []interface{}{
728+
map[BlocksStoreClient][]ulid.ULID{
729+
&storeGatewayClientMock{remoteAddr: "1.1.1.1",
730+
mockedSeriesResponses: []*storepb.SeriesResponse{
731+
mockSeriesResponse(labels.Labels{{Name: labels.MetricName, Value: "fromSg"}}, []cortexpb.Sample{{Value: 1, TimestampMs: minT}, {Value: 2, TimestampMs: minT + 1}}, nil, nil),
732+
mockHintsResponse(block1, block2),
733+
},
734+
mockedLabelNamesResponse: &storepb.LabelNamesResponse{
735+
Names: namesFromSeries(labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})),
736+
Warnings: []string{},
737+
Hints: mockNamesHints(block1, block2),
738+
},
739+
mockedLabelValuesResponse: &storepb.LabelValuesResponse{
740+
Values: valuesFromSeries(labels.MetricName, labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})),
741+
Warnings: []string{},
742+
Hints: mockValuesHints(block1, block2),
743+
},
744+
}: {block1, block2}},
745+
},
746+
}
747+
}
748+
749+
matchers := []*labels.Matcher{
750+
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "fromSg"),
751+
}
752+
ctx := user.InjectOrgID(context.Background(), "user-1")
753+
754+
t.Run("should return consistency check errors when fallback disabled and some blocks not available as parquet", func(t *testing.T) {
755+
finder := &blocksFinderMock{}
756+
stores := createStore()
757+
758+
q := &blocksStoreQuerier{
759+
minT: minT,
760+
maxT: maxT,
761+
finder: finder,
762+
stores: stores,
763+
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
764+
logger: log.NewNopLogger(),
765+
metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()),
766+
limits: &blocksStoreLimitsMock{},
767+
768+
storeGatewayConsistencyCheckMaxAttempts: 3,
769+
}
770+
771+
mParquetQuerier := &mockParquetQuerier{}
772+
pq := &parquetQuerierWithFallback{
773+
minT: minT,
774+
maxT: maxT,
775+
finder: finder,
776+
blocksStoreQuerier: q,
777+
parquetQuerier: mParquetQuerier,
778+
queryStoreAfter: time.Hour,
779+
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
780+
limits: defaultOverrides(t, 0),
781+
logger: log.NewNopLogger(),
782+
defaultBlockStoreType: parquetBlockStore,
783+
fallbackDisabled: true, // Disable fallback
784+
}
785+
786+
// Set up blocks where block1 has parquet metadata but block2 doesn't
787+
finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{
788+
&bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet
789+
&bucketindex.Block{ID: block2}, // Not available as parquet
790+
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)
791+
792+
expectedError := fmt.Sprintf("consistency check failed because some blocks were not available as parquet files: %s", block2.String())
793+
794+
t.Run("select should return consistency check error", func(t *testing.T) {
795+
ss := pq.Select(ctx, true, nil, matchers...)
796+
require.Error(t, ss.Err())
797+
require.Contains(t, ss.Err().Error(), expectedError)
798+
})
799+
800+
t.Run("labelNames should return consistency check error", func(t *testing.T) {
801+
_, _, err := pq.LabelNames(ctx, nil, matchers...)
802+
require.Error(t, err)
803+
require.Contains(t, err.Error(), expectedError)
804+
})
805+
806+
t.Run("labelValues should return consistency check error", func(t *testing.T) {
807+
_, _, err := pq.LabelValues(ctx, labels.MetricName, nil, matchers...)
808+
require.Error(t, err)
809+
require.Contains(t, err.Error(), expectedError)
810+
})
811+
})
812+
813+
t.Run("should work normally when all blocks are available as parquet and fallback disabled", func(t *testing.T) {
814+
finder := &blocksFinderMock{}
815+
stores := createStore()
816+
817+
q := &blocksStoreQuerier{
818+
minT: minT,
819+
maxT: maxT,
820+
finder: finder,
821+
stores: stores,
822+
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
823+
logger: log.NewNopLogger(),
824+
metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()),
825+
limits: &blocksStoreLimitsMock{},
826+
827+
storeGatewayConsistencyCheckMaxAttempts: 3,
828+
}
829+
830+
mParquetQuerier := &mockParquetQuerier{}
831+
pq := &parquetQuerierWithFallback{
832+
minT: minT,
833+
maxT: maxT,
834+
finder: finder,
835+
blocksStoreQuerier: q,
836+
parquetQuerier: mParquetQuerier,
837+
queryStoreAfter: time.Hour,
838+
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
839+
limits: defaultOverrides(t, 0),
840+
logger: log.NewNopLogger(),
841+
defaultBlockStoreType: parquetBlockStore,
842+
fallbackDisabled: true, // Disable fallback
843+
}
844+
845+
// Set up blocks where both blocks have parquet metadata
846+
finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{
847+
&bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet
848+
&bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet
849+
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)
850+
851+
t.Run("select should work without error", func(t *testing.T) {
852+
mParquetQuerier.Reset()
853+
ss := pq.Select(ctx, true, nil, matchers...)
854+
require.NoError(t, ss.Err())
855+
require.Len(t, mParquetQuerier.queriedBlocks, 2)
856+
})
857+
858+
t.Run("labelNames should work without error", func(t *testing.T) {
859+
mParquetQuerier.Reset()
860+
_, _, err := pq.LabelNames(ctx, nil, matchers...)
861+
require.NoError(t, err)
862+
require.Len(t, mParquetQuerier.queriedBlocks, 2)
863+
})
864+
865+
t.Run("labelValues should work without error", func(t *testing.T) {
866+
mParquetQuerier.Reset()
867+
_, _, err := pq.LabelValues(ctx, labels.MetricName, nil, matchers...)
868+
require.NoError(t, err)
869+
require.Len(t, mParquetQuerier.queriedBlocks, 2)
870+
})
871+
})
872+
}

pkg/querier/querier.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ type Config struct {
9595
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
9696
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"`
9797
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store" doc:"hidden"`
98+
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled" doc:"hidden"`
9899
}
99100

100101
var (
@@ -145,6 +146,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
145146
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
146147
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
147148
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
149+
f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.")
148150
}
149151

150152
// Validate the config

0 commit comments

Comments
 (0)