Skip to content

Commit f509f08

Browse files
authored
allow override parquet queryable block store via header (#6792)
* allow runtime override parquet queryable block store to query via HTTP header Signed-off-by: yeya24 <[email protected]> * lint Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]>
1 parent 844fa55 commit f509f08

File tree

5 files changed

+208
-44
lines changed

5 files changed

+208
-44
lines changed

pkg/api/queryapi/query_api.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/weaveworks/common/httpgrpc"
1919

2020
"github.com/cortexproject/cortex/pkg/engine"
21+
"github.com/cortexproject/cortex/pkg/querier"
2122
"github.com/cortexproject/cortex/pkg/util"
2223
"github.com/cortexproject/cortex/pkg/util/api"
2324
)
@@ -98,6 +99,7 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
9899
}
99100

100101
ctx = engine.AddEngineTypeToContext(ctx, r)
102+
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
101103
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
102104
if err != nil {
103105
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
@@ -153,6 +155,7 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
153155
}
154156

155157
ctx = engine.AddEngineTypeToContext(ctx, r)
158+
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
156159
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
157160
if err != nil {
158161
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")

pkg/querier/parquet_queryable.go

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,39 @@ import (
3333
"github.com/cortexproject/cortex/pkg/util/validation"
3434
)
3535

36+
type blockStorageType struct{}
37+
38+
var blockStorageKey = blockStorageType{}
39+
40+
const BlockStoreTypeHeader = "X-Cortex-BlockStore-Type"
41+
42+
type blockStoreType string
43+
44+
const (
45+
tsdbBlockStore blockStoreType = "tsdb"
46+
parquetBlockStore blockStoreType = "parquet"
47+
)
48+
49+
var validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}
50+
51+
// AddBlockStoreTypeToContext checks HTTP header and set block store key to context if
52+
// relevant header is set.
53+
func AddBlockStoreTypeToContext(ctx context.Context, storeType string) context.Context {
54+
ng := blockStoreType(storeType)
55+
switch ng {
56+
case tsdbBlockStore, parquetBlockStore:
57+
return context.WithValue(ctx, blockStorageKey, ng)
58+
}
59+
return ctx
60+
}
61+
62+
func getBlockStoreType(ctx context.Context, defaultBlockStoreType blockStoreType) blockStoreType {
63+
if ng, ok := ctx.Value(blockStorageKey).(blockStoreType); ok {
64+
return ng
65+
}
66+
return defaultBlockStoreType
67+
}
68+
3669
type parquetQueryableFallbackMetrics struct {
3770
blocksQueriedTotal *prometheus.CounterVec
3871
selectCount *prometheus.CounterVec
@@ -69,6 +102,8 @@ type parquetQueryableWithFallback struct {
69102

70103
limits *validation.Overrides
71104
logger log.Logger
105+
106+
defaultBlockStoreType blockStoreType
72107
}
73108

74109
func NewParquetQueryable(
@@ -153,6 +188,7 @@ func NewParquetQueryable(
153188
metrics: newParquetQueryableFallbackMetrics(reg),
154189
limits: limits,
155190
logger: logger,
191+
defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore),
156192
}
157193

158194
p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
@@ -195,15 +231,16 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie
195231
}
196232

197233
return &parquetQuerierWithFallback{
198-
minT: mint,
199-
maxT: maxt,
200-
parquetQuerier: pq,
201-
queryStoreAfter: p.queryStoreAfter,
202-
blocksStoreQuerier: bsq,
203-
finder: p.finder,
204-
metrics: p.metrics,
205-
limits: p.limits,
206-
logger: p.logger,
234+
minT: mint,
235+
maxT: maxt,
236+
parquetQuerier: pq,
237+
queryStoreAfter: p.queryStoreAfter,
238+
blocksStoreQuerier: bsq,
239+
finder: p.finder,
240+
metrics: p.metrics,
241+
limits: p.limits,
242+
logger: p.logger,
243+
defaultBlockStoreType: p.defaultBlockStoreType,
207244
}, nil
208245
}
209246

@@ -224,6 +261,8 @@ type parquetQuerierWithFallback struct {
224261

225262
limits *validation.Overrides
226263
logger log.Logger
264+
265+
defaultBlockStoreType blockStoreType
227266
}
228267

229268
func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
@@ -394,10 +433,11 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i
394433
return nil, nil, err
395434
}
396435

436+
useParquet := getBlockStoreType(ctx, q.defaultBlockStoreType) == parquetBlockStore
397437
parquetBlocks := make([]*bucketindex.Block, 0, len(blocks))
398438
remaining := make([]*bucketindex.Block, 0, len(blocks))
399439
for _, b := range blocks {
400-
if b.Parquet != nil {
440+
if useParquet && b.Parquet != nil {
401441
parquetBlocks = append(parquetBlocks, b)
402442
continue
403443
}

pkg/querier/parquet_queryable_test.go

Lines changed: 136 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
7676

7777
mParquetQuerier := &mockParquetQuerier{}
7878
pq := &parquetQuerierWithFallback{
79-
minT: minT,
80-
maxT: maxT,
81-
finder: finder,
82-
blocksStoreQuerier: q,
83-
parquetQuerier: mParquetQuerier,
84-
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
85-
limits: defaultOverrides(t, 4),
86-
logger: log.NewNopLogger(),
79+
minT: minT,
80+
maxT: maxT,
81+
finder: finder,
82+
blocksStoreQuerier: q,
83+
parquetQuerier: mParquetQuerier,
84+
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
85+
limits: defaultOverrides(t, 4),
86+
logger: log.NewNopLogger(),
87+
defaultBlockStoreType: parquetBlockStore,
8788
}
8889

8990
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
@@ -118,14 +119,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
118119

119120
mParquetQuerier := &mockParquetQuerier{}
120121
pq := &parquetQuerierWithFallback{
121-
minT: minT,
122-
maxT: maxT,
123-
finder: finder,
124-
blocksStoreQuerier: q,
125-
parquetQuerier: mParquetQuerier,
126-
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
127-
limits: defaultOverrides(t, 0),
128-
logger: log.NewNopLogger(),
122+
minT: minT,
123+
maxT: maxT,
124+
finder: finder,
125+
blocksStoreQuerier: q,
126+
parquetQuerier: mParquetQuerier,
127+
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
128+
limits: defaultOverrides(t, 0),
129+
logger: log.NewNopLogger(),
130+
defaultBlockStoreType: parquetBlockStore,
129131
}
130132

131133
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
@@ -178,14 +180,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
178180

179181
mParquetQuerier := &mockParquetQuerier{}
180182
pq := &parquetQuerierWithFallback{
181-
minT: minT,
182-
maxT: maxT,
183-
finder: finder,
184-
blocksStoreQuerier: q,
185-
parquetQuerier: mParquetQuerier,
186-
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
187-
limits: defaultOverrides(t, 0),
188-
logger: log.NewNopLogger(),
183+
minT: minT,
184+
maxT: maxT,
185+
finder: finder,
186+
blocksStoreQuerier: q,
187+
parquetQuerier: mParquetQuerier,
188+
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
189+
limits: defaultOverrides(t, 0),
190+
logger: log.NewNopLogger(),
191+
defaultBlockStoreType: parquetBlockStore,
189192
}
190193

191194
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
@@ -244,14 +247,15 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
244247

245248
mParquetQuerier := &mockParquetQuerier{}
246249
pq := &parquetQuerierWithFallback{
247-
minT: minT,
248-
maxT: maxT,
249-
finder: finder,
250-
blocksStoreQuerier: q,
251-
parquetQuerier: mParquetQuerier,
252-
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
253-
limits: defaultOverrides(t, 0),
254-
logger: log.NewNopLogger(),
250+
minT: minT,
251+
maxT: maxT,
252+
finder: finder,
253+
blocksStoreQuerier: q,
254+
parquetQuerier: mParquetQuerier,
255+
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
256+
limits: defaultOverrides(t, 0),
257+
logger: log.NewNopLogger(),
258+
defaultBlockStoreType: parquetBlockStore,
255259
}
256260

257261
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
@@ -291,6 +295,106 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
291295
})
292296
})
293297

298+
t.Run("Default query TSDB block store even if parquet blocks available. Override with ctx", func(t *testing.T) {
299+
finder := &blocksFinderMock{}
300+
stores := createStore()
301+
302+
q := &blocksStoreQuerier{
303+
minT: minT,
304+
maxT: maxT,
305+
finder: finder,
306+
stores: stores,
307+
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
308+
logger: log.NewNopLogger(),
309+
metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()),
310+
limits: &blocksStoreLimitsMock{},
311+
312+
storeGatewayConsistencyCheckMaxAttempts: 3,
313+
}
314+
315+
mParquetQuerier := &mockParquetQuerier{}
316+
pq := &parquetQuerierWithFallback{
317+
minT: minT,
318+
maxT: maxT,
319+
finder: finder,
320+
blocksStoreQuerier: q,
321+
parquetQuerier: mParquetQuerier,
322+
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
323+
limits: defaultOverrides(t, 0),
324+
logger: log.NewNopLogger(),
325+
defaultBlockStoreType: tsdbBlockStore,
326+
}
327+
328+
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
329+
&bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
330+
&bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
331+
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)
332+
333+
t.Run("select", func(t *testing.T) {
334+
stores.Reset()
335+
mParquetQuerier.Reset()
336+
ss := pq.Select(ctx, true, nil, matchers...)
337+
require.NoError(t, ss.Err())
338+
require.Len(t, stores.queriedBlocks, 2)
339+
require.Len(t, mParquetQuerier.queriedBlocks, 0)
340+
})
341+
342+
t.Run("select with ctx key override to parquet", func(t *testing.T) {
343+
stores.Reset()
344+
mParquetQuerier.Reset()
345+
newCtx := AddBlockStoreTypeToContext(ctx, string(parquetBlockStore))
346+
ss := pq.Select(newCtx, true, nil, matchers...)
347+
require.NoError(t, ss.Err())
348+
require.Len(t, stores.queriedBlocks, 0)
349+
require.Len(t, mParquetQuerier.queriedBlocks, 2)
350+
})
351+
352+
t.Run("labelNames", func(t *testing.T) {
353+
stores.Reset()
354+
mParquetQuerier.Reset()
355+
r, _, err := pq.LabelNames(ctx, nil, matchers...)
356+
require.NoError(t, err)
357+
require.Len(t, stores.queriedBlocks, 2)
358+
require.Len(t, mParquetQuerier.queriedBlocks, 0)
359+
require.Contains(t, r, "fromSg")
360+
require.NotContains(t, r, "fromParquet")
361+
})
362+
363+
t.Run("labelNames with ctx key override to parquet", func(t *testing.T) {
364+
stores.Reset()
365+
mParquetQuerier.Reset()
366+
newCtx := AddBlockStoreTypeToContext(ctx, string(parquetBlockStore))
367+
r, _, err := pq.LabelNames(newCtx, nil, matchers...)
368+
require.NoError(t, err)
369+
require.Len(t, stores.queriedBlocks, 0)
370+
require.Len(t, mParquetQuerier.queriedBlocks, 2)
371+
require.NotContains(t, r, "fromSg")
372+
require.Contains(t, r, "fromParquet")
373+
})
374+
375+
t.Run("labelValues", func(t *testing.T) {
376+
stores.Reset()
377+
mParquetQuerier.Reset()
378+
r, _, err := pq.LabelValues(ctx, labels.MetricName, nil, matchers...)
379+
require.NoError(t, err)
380+
require.Len(t, stores.queriedBlocks, 2)
381+
require.Len(t, mParquetQuerier.queriedBlocks, 0)
382+
require.Contains(t, r, "fromSg")
383+
require.NotContains(t, r, "fromParquet")
384+
})
385+
386+
t.Run("labelValues with ctx key override to parquet", func(t *testing.T) {
387+
stores.Reset()
388+
mParquetQuerier.Reset()
389+
newCtx := AddBlockStoreTypeToContext(ctx, string(parquetBlockStore))
390+
r, _, err := pq.LabelValues(newCtx, labels.MetricName, nil, matchers...)
391+
require.NoError(t, err)
392+
require.Len(t, stores.queriedBlocks, 0)
393+
require.Len(t, mParquetQuerier.queriedBlocks, 2)
394+
require.NotContains(t, r, "fromSg")
395+
require.Contains(t, r, "fromParquet")
396+
})
397+
})
294398
}
295399

296400
func defaultOverrides(t *testing.T, queryVerticalShardSize int) *validation.Overrides {

pkg/querier/querier.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8+
"slices"
89
"strings"
910
"sync"
1011
"time"
@@ -93,8 +94,9 @@ type Config struct {
9394
EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"`
9495

9596
// Query Parquet files if available
96-
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
97-
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"`
97+
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
98+
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"`
99+
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store" doc:"hidden"`
98100
}
99101

100102
var (
@@ -104,6 +106,7 @@ var (
104106
errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)")
105107
errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1")
106108
errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1")
109+
errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet")
107110
)
108111

109112
// RegisterFlags adds the flags required to config this to the given FlagSet.
@@ -142,6 +145,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
142145
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
143146
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
144147
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
148+
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
145149
}
146150

147151
// Validate the config
@@ -171,6 +175,12 @@ func (cfg *Config) Validate() error {
171175
return errInvalidIngesterQueryMaxAttempts
172176
}
173177

178+
if cfg.EnableParquetQueryable {
179+
if !slices.Contains(validBlockStoreTypes, blockStoreType(cfg.ParquetQueryableDefaultBlockStore)) {
180+
return errInvalidParquetQueryableDefaultBlockStore
181+
}
182+
}
183+
174184
return nil
175185
}
176186

pkg/querier/querier_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,6 +1677,13 @@ func TestConfig_Validate(t *testing.T) {
16771677
},
16781678
expected: errShuffleShardingLookbackLessThanQueryStoreAfter,
16791679
},
1680+
"should fail if invalid parquet queryable default block store": {
1681+
setup: func(cfg *Config) {
1682+
cfg.EnableParquetQueryable = true
1683+
cfg.ParquetQueryableDefaultBlockStore = "none"
1684+
},
1685+
expected: errInvalidParquetQueryableDefaultBlockStore,
1686+
},
16801687
}
16811688

16821689
for testName, testData := range tests {

0 commit comments

Comments
 (0)