Skip to content

Commit d20600c

Browse files
authored
Caching Parquet shards (#6775)
* Caching Parquet shards Signed-off-by: alanprot <[email protected]> * lint + comments Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent 7a83210 commit d20600c

File tree

2 files changed

+119
-14
lines changed

2 files changed

+119
-14
lines changed

pkg/querier/parquet_queryable.go

Lines changed: 116 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package querier
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67

78
"github.com/go-kit/log"
89
"github.com/go-kit/log/level"
10+
lru "github.com/hashicorp/golang-lru/v2"
911
"github.com/parquet-go/parquet-go"
1012
"github.com/pkg/errors"
1113
"github.com/prometheus-community/parquet-common/schema"
@@ -87,6 +89,11 @@ func NewParquetQueryable(
8789
return nil, err
8890
}
8991

92+
cache, err := newCache[*parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, newCacheMetrics(reg))
93+
if err != nil {
94+
return nil, err
95+
}
96+
9097
cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool())
9198

9299
parquetQueryable, err := search.NewParquetQueryable(cDecoder, func(ctx context.Context, mint, maxt int64) ([]*parquet_storage.ParquetShard, error) {
@@ -106,20 +113,30 @@ func NewParquetQueryable(
106113

107114
for i, block := range blocks {
108115
errGroup.Go(func() error {
109-
// we always only have 1 shard - shard 0
110-
shard, err := parquet_storage.OpenParquetShard(ctx,
111-
userBkt,
112-
block.ID.String(),
113-
0,
114-
parquet_storage.WithFileOptions(
115-
parquet.SkipMagicBytes(true),
116-
parquet.ReadBufferSize(100*1024),
117-
parquet.SkipBloomFilters(true),
118-
),
119-
parquet_storage.WithOptimisticReader(true),
120-
)
116+
cacheKey := fmt.Sprintf("%v-%v", userID, block.ID)
117+
shard := cache.Get(cacheKey)
118+
if shard == nil {
119+
// we always only have 1 shard - shard 0
120+
// Use context.Background() here as the file can be cached and live after the request ends.
121+
shard, err = parquet_storage.OpenParquetShard(context.Background(),
122+
userBkt,
123+
block.ID.String(),
124+
0,
125+
parquet_storage.WithFileOptions(
126+
parquet.SkipMagicBytes(true),
127+
parquet.ReadBufferSize(100*1024),
128+
parquet.SkipBloomFilters(true),
129+
),
130+
parquet_storage.WithOptimisticReader(true),
131+
)
132+
if err != nil {
133+
return err
134+
}
135+
cache.Set(cacheKey, shard)
136+
}
137+
121138
shards[i] = shard
122-
return err
139+
return nil
123140
})
124141
}
125142

@@ -401,3 +418,89 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i
401418

402419
return remaining, parquetBlocks, nil
403420
}
421+
422+
type cacheInterface[T any] interface {
423+
Get(path string) T
424+
Set(path string, reader T)
425+
}
426+
427+
type cacheMetrics struct {
428+
hits *prometheus.CounterVec
429+
misses *prometheus.CounterVec
430+
evictions *prometheus.CounterVec
431+
size *prometheus.GaugeVec
432+
}
433+
434+
func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics {
435+
return &cacheMetrics{
436+
hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
437+
Name: "cortex_parquet_queryable_cache_hits_total",
438+
Help: "Total number of parquet cache hits",
439+
}, []string{"name"}),
440+
misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
441+
Name: "cortex_parquet_queryable_cache_misses_total",
442+
Help: "Total number of parquet cache misses",
443+
}, []string{"name"}),
444+
evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
445+
Name: "cortex_parquet_queryable_cache_evictions_total",
446+
Help: "Total number of parquet cache evictions",
447+
}, []string{"name"}),
448+
size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
449+
Name: "cortex_parquet_queryable_cache_item_count",
450+
Help: "Current number of cached parquet items",
451+
}, []string{"name"}),
452+
}
453+
}
454+
455+
type Cache[T any] struct {
456+
cache *lru.Cache[string, T]
457+
name string
458+
metrics *cacheMetrics
459+
}
460+
461+
func newCache[T any](name string, size int, metrics *cacheMetrics) (cacheInterface[T], error) {
462+
if size <= 0 {
463+
return &noopCache[T]{}, nil
464+
}
465+
cache, err := lru.NewWithEvict(size, func(key string, value T) {
466+
metrics.evictions.WithLabelValues(name).Inc()
467+
metrics.size.WithLabelValues(name).Dec()
468+
})
469+
if err != nil {
470+
return nil, err
471+
}
472+
473+
return &Cache[T]{
474+
cache: cache,
475+
name: name,
476+
metrics: metrics,
477+
}, nil
478+
}
479+
480+
func (c *Cache[T]) Get(path string) (r T) {
481+
if reader, ok := c.cache.Get(path); ok {
482+
c.metrics.hits.WithLabelValues(c.name).Inc()
483+
return reader
484+
}
485+
c.metrics.misses.WithLabelValues(c.name).Inc()
486+
return
487+
}
488+
489+
func (c *Cache[T]) Set(path string, reader T) {
490+
if !c.cache.Contains(path) {
491+
c.metrics.size.WithLabelValues(c.name).Inc()
492+
}
493+
c.metrics.misses.WithLabelValues(c.name).Inc()
494+
c.cache.Add(path, reader)
495+
}
496+
497+
type noopCache[T any] struct {
498+
}
499+
500+
func (n noopCache[T]) Get(_ string) (r T) {
501+
return
502+
}
503+
504+
func (n noopCache[T]) Set(_ string, _ T) {
505+
506+
}

pkg/querier/querier.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ type Config struct {
9292
EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"`
9393

9494
// Query Parquet files if available
95-
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
95+
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
96+
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"`
9697
}
9798

9899
var (
@@ -139,6 +140,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
139140
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
140141
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
141142
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
143+
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
142144
}
143145

144146
// Validate the config

0 commit comments

Comments
 (0)