From 9c6d985b348b74284d08e063df47f3043521cb76 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 8 Dec 2025 18:44:20 +0900 Subject: [PATCH 1/2] Refactor: move parquet shard cache to parquet util pkg Signed-off-by: SungJin1212 --- docs/blocks-storage/querier.md | 8 +- docs/configuration/config-file-reference.md | 8 +- pkg/querier/parquet_queryable.go | 160 +----------------- pkg/querier/parquet_queryable_test.go | 116 +------------ pkg/querier/querier.go | 13 +- pkg/util/parquetutil/cache.go | 176 ++++++++++++++++++++ pkg/util/parquetutil/cache_test.go | 130 +++++++++++++++ 7 files changed, 329 insertions(+), 282 deletions(-) create mode 100644 pkg/util/parquetutil/cache.go create mode 100644 pkg/util/parquetutil/cache_test.go diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 732324fec1f..614713f0196 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -294,6 +294,10 @@ querier: # CLI flag: -querier.parquet-queryable-shard-cache-size [parquet_queryable_shard_cache_size: | default = 512] + # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. + # CLI flag: -querier.parquet-queryable-shard-cache-ttl + [parquet_queryable_shard_cache_ttl: | default = 24h] + # [Experimental] Parquet queryable's default block store to query. Valid # options are tsdb and parquet. If it is set to tsdb, parquet queryable always # fallback to store gateway. @@ -307,10 +311,6 @@ querier: # queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] - - # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. - # CLI flag: -querier.parquet-queryable-shard-cache-ttl - [parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 58d1c89ebb9..4109092cd6c 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4812,6 +4812,10 @@ thanos_engine: # CLI flag: -querier.parquet-queryable-shard-cache-size [parquet_queryable_shard_cache_size: | default = 512] +# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. +# CLI flag: -querier.parquet-queryable-shard-cache-ttl +[parquet_queryable_shard_cache_ttl: | default = 24h] + # [Experimental] Parquet queryable's default block store to query. Valid options # are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback # to store gateway. @@ -4824,10 +4828,6 @@ thanos_engine: # need to make sure Parquet files are created before it is queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] - -# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. -# CLI flag: -querier.parquet-queryable-shard-cache-ttl -[parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `query_frontend_config` diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 40be5ff8997..77f578225cf 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -7,7 +7,6 @@ import ( "time" "github.com/go-kit/log" - lru "github.com/hashicorp/golang-lru/v2" "github.com/opentracing/opentracing-go" "github.com/parquet-go/parquet-go" "github.com/pkg/errors" @@ -21,6 +20,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" @@ -33,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/multierror" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -51,8 +52,6 @@ const ( parquetBlockStore blockStoreType = "parquet" ) -const defaultMaintenanceInterval = time.Minute - var ( validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore} ) @@ -99,7 +98,7 @@ type parquetQueryableWithFallback struct { fallbackDisabled bool queryStoreAfter time.Duration parquetQueryable storage.Queryable - cache cacheInterface[parquet_storage.ParquetShard] + cache parquetutil.CacheInterface[parquet_storage.ParquetShard] blockStorageQueryable *BlocksStoreQueryable finder BlocksFinder @@ -135,7 +134,7 @@ func NewParquetQueryable( return nil, err } - cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg)) + cache, err := parquetutil.NewCache[parquet_storage.ParquetShard](&config.ParquetShardCache, "parquet-shards", extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) if err != nil { return nil, err } @@ -623,157 +622,6 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint return &shardMatcherLabelsFilter{shardMatcher: sm}, true } -type cacheInterface[T any] interface { - Get(path string) T - Set(path string, reader T) - Close() -} - -type cacheMetrics struct { - hits *prometheus.CounterVec - misses *prometheus.CounterVec - evictions *prometheus.CounterVec - size *prometheus.GaugeVec -} - -func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { - return &cacheMetrics{ - hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_hits_total", - Help: "Total number of parquet cache hits", - }, []string{"name"}), - misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_misses_total", - Help: "Total number of parquet cache misses", - }, []string{"name"}), - evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_evictions_total", - Help: "Total number of parquet cache evictions", - }, []string{"name"}), - size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_parquet_queryable_cache_item_count", - Help: "Current number of cached parquet items", - }, []string{"name"}), - } -} - -type cacheEntry[T any] struct { - value T - expiresAt time.Time -} - -type Cache[T any] struct { - cache *lru.Cache[string, *cacheEntry[T]] - name string - metrics *cacheMetrics - ttl time.Duration - stopCh chan struct{} -} - -func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) { - if size <= 0 { - return &noopCache[T]{}, nil - } - cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) { - metrics.evictions.WithLabelValues(name).Inc() - metrics.size.WithLabelValues(name).Dec() - }) - if err != nil { - return nil, err - } - - c := &Cache[T]{ - cache: cache, - name: name, - metrics: metrics, - ttl: ttl, - stopCh: make(chan struct{}), - } - - if ttl > 0 { - go c.maintenanceLoop(maintenanceInterval) - } - - return c, nil -} - -func (c *Cache[T]) maintenanceLoop(interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - now := time.Now() - keys := c.cache.Keys() - for _, key := range keys { - if entry, ok := c.cache.Peek(key); ok { - // we use a Peek() because the Get() change LRU order. - if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) { - c.cache.Remove(key) - } - } - } - case <-c.stopCh: - return - } - } -} - -func (c *Cache[T]) Get(path string) (r T) { - if entry, ok := c.cache.Get(path); ok { - isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt) - - if isExpired { - c.cache.Remove(path) - c.metrics.misses.WithLabelValues(c.name).Inc() - return - } - - c.metrics.hits.WithLabelValues(c.name).Inc() - return entry.value - } - c.metrics.misses.WithLabelValues(c.name).Inc() - return -} - -func (c *Cache[T]) Set(path string, reader T) { - if !c.cache.Contains(path) { - c.metrics.size.WithLabelValues(c.name).Inc() - } - - var expiresAt time.Time - if c.ttl > 0 { - expiresAt = time.Now().Add(c.ttl) - } - - entry := &cacheEntry[T]{ - value: reader, - expiresAt: expiresAt, - } - - c.cache.Add(path, entry) -} - -func (c *Cache[T]) Close() { - close(c.stopCh) -} - -type noopCache[T any] struct { -} - -func (n noopCache[T]) Get(_ string) (r T) { - return -} - -func (n noopCache[T]) Set(_ string, _ T) { - -} - -func (n noopCache[T]) Close() { - -} - var ( shardInfoCtxKey contextKey = 1 ) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 250b1831442..82bc29f80a0 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -1,7 +1,6 @@ package querier import ( - "bytes" "context" "fmt" "math/rand" @@ -15,7 +14,6 @@ import ( "github.com/oklog/ulid/v2" "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -41,6 +39,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/services" cortex_testutil "github.com/cortexproject/cortex/pkg/util/testutil" "github.com/cortexproject/cortex/pkg/util/validation" @@ -402,8 +401,10 @@ func TestParquetQueryable_Limits(t *testing.T) { QueryStoreAfter: 0, StoreGatewayQueryStatsEnabled: false, StoreGatewayConsistencyCheckMaxAttempts: 3, - ParquetQueryableShardCacheSize: 100, - ParquetQueryableDefaultBlockStore: "parquet", + ParquetShardCache: parquetutil.CacheConfig{ + ParquetQueryableShardCacheSize: 100, + }, + ParquetQueryableDefaultBlockStore: "parquet", } storageCfg := cortex_tsdb.BlocksStorageConfig{ @@ -883,110 +884,3 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { }) }) } - -func Test_Cache_LRUEviction(t *testing.T) { - reg := prometheus.NewRegistry() - metrics := newCacheMetrics(reg) - cache, err := newCache[string]("test", 2, 0, time.Minute, metrics) - require.NoError(t, err) - defer cache.Close() - - cache.Set("key1", "value1") - cache.Set("key2", "value2") - - _ = cache.Get("key1") // hit - // "key2" deleted by LRU eviction - cache.Set("key3", "value3") - - val1 := cache.Get("key1") // hit - require.Equal(t, "value1", val1) - val3 := cache.Get("key3") // hit - require.Equal(t, "value3", val3) - val2 := cache.Get("key2") // miss - require.Equal(t, "", val2) - - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 3 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 2 - # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses - # TYPE cortex_parquet_queryable_cache_misses_total counter - cortex_parquet_queryable_cache_misses_total{name="test"} 1 - `))) -} - -func Test_Cache_TTLEvictionByGet(t *testing.T) { - reg := prometheus.NewRegistry() - metrics := newCacheMetrics(reg) - - cache, err := newCache[string]("test", 10, 100*time.Millisecond, time.Minute, metrics) - require.NoError(t, err) - defer cache.Close() - - cache.Set("key1", "value1") - - val := cache.Get("key1") - require.Equal(t, "value1", val) - - // sleep longer than TTL - time.Sleep(150 * time.Millisecond) - - val = cache.Get("key1") - require.Equal(t, "", val) - - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 0 - # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses - # TYPE cortex_parquet_queryable_cache_misses_total counter - cortex_parquet_queryable_cache_misses_total{name="test"} 1 - `))) -} - -func Test_Cache_TTLEvictionByLoop(t *testing.T) { - reg := prometheus.NewRegistry() - metrics := newCacheMetrics(reg) - - cache, err := newCache[string]("test", 10, 100*time.Millisecond, 100*time.Millisecond, metrics) - require.NoError(t, err) - defer cache.Close() - - cache.Set("key1", "value1") - - val := cache.Get("key1") - require.Equal(t, "value1", val) - - // sleep longer than TTL - time.Sleep(150 * time.Millisecond) - - if c, ok := cache.(*Cache[string]); ok { - // should delete by maintenance loop - _, ok := c.cache.Peek("key1") - require.False(t, ok) - } - - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 0 - `))) -} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index a6912ea024a..636300d64df 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -32,6 +32,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -92,11 +93,10 @@ type Config struct { EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` // Query Parquet files if available - EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` - ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` - ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` - ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` + EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` + ParquetShardCache parquetutil.CacheConfig `yaml:",inline"` + ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` + ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } @@ -147,8 +147,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.") f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") - f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") - f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, "querier.parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") + cfg.ParquetShardCache.RegisterFlagsWithPrefix("querier.", f) f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") diff --git a/pkg/util/parquetutil/cache.go b/pkg/util/parquetutil/cache.go new file mode 100644 index 00000000000..d9a4fc67ecb --- /dev/null +++ b/pkg/util/parquetutil/cache.go @@ -0,0 +1,176 @@ +package parquetutil + +import ( + "flag" + "time" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + defaultMaintenanceInterval = time.Minute +) + +type CacheInterface[T any] interface { + Get(path string) T + Set(path string, reader T) + Close() +} + +type cacheMetrics struct { + hits *prometheus.CounterVec + misses *prometheus.CounterVec + evictions *prometheus.CounterVec + size *prometheus.GaugeVec +} + +func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { + return &cacheMetrics{ + hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_queryable_cache_hits_total", + Help: "Total number of parquet cache hits", + }, []string{"name"}), + misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_queryable_cache_misses_total", + Help: "Total number of parquet cache misses", + }, []string{"name"}), + evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_queryable_cache_evictions_total", + Help: "Total number of parquet cache evictions", + }, []string{"name"}), + size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_parquet_queryable_cache_item_count", + Help: "Current number of cached parquet items", + }, []string{"name"}), + } +} + +type cacheEntry[T any] struct { + value T + expiresAt time.Time +} + +type Cache[T any] struct { + cache *lru.Cache[string, *cacheEntry[T]] + name string + metrics *cacheMetrics + ttl time.Duration + stopCh chan struct{} +} + +type CacheConfig struct { + ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` + ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` + MaintenanceInterval time.Duration `yaml:"-"` +} + +func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.ParquetQueryableShardCacheSize, prefix+"parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") + f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, prefix+"parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") + cfg.MaintenanceInterval = defaultMaintenanceInterval +} + +func NewCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) (CacheInterface[T], error) { + if cfg.ParquetQueryableShardCacheSize <= 0 { + return &noopCache[T]{}, nil + } + metrics := newCacheMetrics(reg) + cache, err := lru.NewWithEvict(cfg.ParquetQueryableShardCacheSize, func(key string, value *cacheEntry[T]) { + metrics.evictions.WithLabelValues(name).Inc() + metrics.size.WithLabelValues(name).Dec() + }) + if err != nil { + return nil, err + } + + c := &Cache[T]{ + cache: cache, + name: name, + metrics: metrics, + ttl: cfg.ParquetQueryableShardCacheTTL, + stopCh: make(chan struct{}), + } + + if cfg.ParquetQueryableShardCacheTTL > 0 { + go c.maintenanceLoop(cfg.MaintenanceInterval) + } + + return c, nil +} + +func (c *Cache[T]) maintenanceLoop(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + now := time.Now() + keys := c.cache.Keys() + for _, key := range keys { + if entry, ok := c.cache.Peek(key); ok { + // we use a Peek() because the Get() change LRU order. + if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) { + c.cache.Remove(key) + } + } + } + case <-c.stopCh: + return + } + } +} + +func (c *Cache[T]) Get(path string) (r T) { + if entry, ok := c.cache.Get(path); ok { + isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt) + + if isExpired { + c.cache.Remove(path) + c.metrics.misses.WithLabelValues(c.name).Inc() + return + } + + c.metrics.hits.WithLabelValues(c.name).Inc() + return entry.value + } + c.metrics.misses.WithLabelValues(c.name).Inc() + return +} + +func (c *Cache[T]) Set(path string, reader T) { + if !c.cache.Contains(path) { + c.metrics.size.WithLabelValues(c.name).Inc() + } + + var expiresAt time.Time + if c.ttl > 0 { + expiresAt = time.Now().Add(c.ttl) + } + + entry := &cacheEntry[T]{ + value: reader, + expiresAt: expiresAt, + } + + c.cache.Add(path, entry) +} + +func (c *Cache[T]) Close() { + close(c.stopCh) +} + +type noopCache[T any] struct { +} + +func (n noopCache[T]) Get(_ string) (r T) { + return +} + +func (n noopCache[T]) Set(_ string, _ T) { + +} + +func (n noopCache[T]) Close() {} diff --git a/pkg/util/parquetutil/cache_test.go b/pkg/util/parquetutil/cache_test.go new file mode 100644 index 00000000000..c9ad1f48ec5 --- /dev/null +++ b/pkg/util/parquetutil/cache_test.go @@ -0,0 +1,130 @@ +package parquetutil + +import ( + "bytes" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func Test_Cache_LRUEviction(t *testing.T) { + reg := prometheus.NewRegistry() + cfg := &CacheConfig{ + ParquetQueryableShardCacheSize: 2, + ParquetQueryableShardCacheTTL: 0, + MaintenanceInterval: time.Minute, + } + cache, err := NewCache[string](cfg, "test", reg) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + cache.Set("key2", "value2") + + _ = cache.Get("key1") // hit + // "key2" deleted by LRU eviction + cache.Set("key3", "value3") + + val1 := cache.Get("key1") // hit + require.Equal(t, "value1", val1) + val3 := cache.Get("key3") // hit + require.Equal(t, "value3", val3) + val2 := cache.Get("key2") // miss + require.Equal(t, "", val2) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 3 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 2 + # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_queryable_cache_misses_total counter + cortex_parquet_queryable_cache_misses_total{name="test"} 1 + `))) +} + +func Test_Cache_TTLEvictionByGet(t *testing.T) { + reg := prometheus.NewRegistry() + cfg := &CacheConfig{ + ParquetQueryableShardCacheSize: 10, + ParquetQueryableShardCacheTTL: 100 * time.Millisecond, + MaintenanceInterval: time.Minute, + } + + cache, err := NewCache[string](cfg, "test", reg) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + val = cache.Get("key1") + require.Equal(t, "", val) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 0 + # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_queryable_cache_misses_total counter + cortex_parquet_queryable_cache_misses_total{name="test"} 1 + `))) +} + +func Test_Cache_TTLEvictionByLoop(t *testing.T) { + reg := prometheus.NewRegistry() + cfg := &CacheConfig{ + ParquetQueryableShardCacheSize: 10, + ParquetQueryableShardCacheTTL: 100 * time.Millisecond, + MaintenanceInterval: 100 * time.Millisecond, + } + + cache, err := NewCache[string](cfg, "test", reg) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + if c, ok := cache.(*Cache[string]); ok { + // should delete by maintenance loop + _, ok := c.cache.Peek("key1") + require.False(t, ok) + } + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 0 + `))) +} From ea253e3a8fb6957dc630c85016e582227128d130 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 11 Dec 2025 15:27:16 +0900 Subject: [PATCH 2/2] delete queryable Signed-off-by: SungJin1212 --- docs/blocks-storage/querier.md | 15 ++-- docs/configuration/config-file-reference.md | 13 ++-- pkg/querier/parquet_queryable_test.go | 2 +- pkg/util/parquetutil/cache.go | 26 +++---- pkg/util/parquetutil/cache_test.go | 84 ++++++++++----------- schemas/cortex-config-schema.json | 12 +-- 6 files changed, 75 insertions(+), 77 deletions(-) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 614713f0196..a6e0a29e67f 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -289,14 +289,13 @@ querier: # CLI flag: -querier.enable-parquet-queryable [enable_parquet_queryable: | default = false] - # [Experimental] Maximum size of the Parquet queryable shard cache. 0 to - # disable. - # CLI flag: -querier.parquet-queryable-shard-cache-size - [parquet_queryable_shard_cache_size: | default = 512] - - # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. - # CLI flag: -querier.parquet-queryable-shard-cache-ttl - [parquet_queryable_shard_cache_ttl: | default = 24h] + # [Experimental] Maximum size of the Parquet shard cache. 0 to disable. + # CLI flag: -querier.parquet-shard-cache-size + [parquet_shard_cache_size: | default = 512] + + # [Experimental] TTL of the Parquet shard cache. 0 to no TTL. + # CLI flag: -querier.parquet-shard-cache-ttl + [parquet_shard_cache_ttl: | default = 24h] # [Experimental] Parquet queryable's default block store to query. Valid # options are tsdb and parquet. If it is set to tsdb, parquet queryable always diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 4109092cd6c..75257b54541 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4807,14 +4807,13 @@ thanos_engine: # CLI flag: -querier.enable-parquet-queryable [enable_parquet_queryable: | default = false] -# [Experimental] Maximum size of the Parquet queryable shard cache. 0 to -# disable. -# CLI flag: -querier.parquet-queryable-shard-cache-size -[parquet_queryable_shard_cache_size: | default = 512] +# [Experimental] Maximum size of the Parquet shard cache. 0 to disable. +# CLI flag: -querier.parquet-shard-cache-size +[parquet_shard_cache_size: | default = 512] -# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. -# CLI flag: -querier.parquet-queryable-shard-cache-ttl -[parquet_queryable_shard_cache_ttl: | default = 24h] +# [Experimental] TTL of the Parquet shard cache. 0 to no TTL. +# CLI flag: -querier.parquet-shard-cache-ttl +[parquet_shard_cache_ttl: | default = 24h] # [Experimental] Parquet queryable's default block store to query. Valid options # are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 82bc29f80a0..70c2a5cdacd 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -402,7 +402,7 @@ func TestParquetQueryable_Limits(t *testing.T) { StoreGatewayQueryStatsEnabled: false, StoreGatewayConsistencyCheckMaxAttempts: 3, ParquetShardCache: parquetutil.CacheConfig{ - ParquetQueryableShardCacheSize: 100, + ParquetShardCacheSize: 100, }, ParquetQueryableDefaultBlockStore: "parquet", } diff --git a/pkg/util/parquetutil/cache.go b/pkg/util/parquetutil/cache.go index d9a4fc67ecb..952e84bea3b 100644 --- a/pkg/util/parquetutil/cache.go +++ b/pkg/util/parquetutil/cache.go @@ -29,19 +29,19 @@ type cacheMetrics struct { func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { return &cacheMetrics{ hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_hits_total", + Name: "cortex_parquet_cache_hits_total", Help: "Total number of parquet cache hits", }, []string{"name"}), misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_misses_total", + Name: "cortex_parquet_cache_misses_total", Help: "Total number of parquet cache misses", }, []string{"name"}), evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_evictions_total", + Name: "cortex_parquet_cache_evictions_total", Help: "Total number of parquet cache evictions", }, []string{"name"}), size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_parquet_queryable_cache_item_count", + Name: "cortex_parquet_cache_item_count", Help: "Current number of cached parquet items", }, []string{"name"}), } @@ -61,23 +61,23 @@ type Cache[T any] struct { } type CacheConfig struct { - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` - ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` - MaintenanceInterval time.Duration `yaml:"-"` + ParquetShardCacheSize int `yaml:"parquet_shard_cache_size"` + ParquetShardCacheTTL time.Duration `yaml:"parquet_shard_cache_ttl"` + MaintenanceInterval time.Duration `yaml:"-"` } func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.IntVar(&cfg.ParquetQueryableShardCacheSize, prefix+"parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") - f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, prefix+"parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") + f.IntVar(&cfg.ParquetShardCacheSize, prefix+"parquet-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet shard cache. 0 to disable.") + f.DurationVar(&cfg.ParquetShardCacheTTL, prefix+"parquet-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet shard cache. 0 to no TTL.") cfg.MaintenanceInterval = defaultMaintenanceInterval } func NewCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) (CacheInterface[T], error) { - if cfg.ParquetQueryableShardCacheSize <= 0 { + if cfg.ParquetShardCacheSize <= 0 { return &noopCache[T]{}, nil } metrics := newCacheMetrics(reg) - cache, err := lru.NewWithEvict(cfg.ParquetQueryableShardCacheSize, func(key string, value *cacheEntry[T]) { + cache, err := lru.NewWithEvict(cfg.ParquetShardCacheSize, func(key string, value *cacheEntry[T]) { metrics.evictions.WithLabelValues(name).Inc() metrics.size.WithLabelValues(name).Dec() }) @@ -89,11 +89,11 @@ func NewCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) ( cache: cache, name: name, metrics: metrics, - ttl: cfg.ParquetQueryableShardCacheTTL, + ttl: cfg.ParquetShardCacheTTL, stopCh: make(chan struct{}), } - if cfg.ParquetQueryableShardCacheTTL > 0 { + if cfg.ParquetShardCacheTTL > 0 { go c.maintenanceLoop(cfg.MaintenanceInterval) } diff --git a/pkg/util/parquetutil/cache_test.go b/pkg/util/parquetutil/cache_test.go index c9ad1f48ec5..b71b77207bf 100644 --- a/pkg/util/parquetutil/cache_test.go +++ b/pkg/util/parquetutil/cache_test.go @@ -13,9 +13,9 @@ import ( func Test_Cache_LRUEviction(t *testing.T) { reg := prometheus.NewRegistry() cfg := &CacheConfig{ - ParquetQueryableShardCacheSize: 2, - ParquetQueryableShardCacheTTL: 0, - MaintenanceInterval: time.Minute, + ParquetShardCacheSize: 2, + ParquetShardCacheTTL: 0, + MaintenanceInterval: time.Minute, } cache, err := NewCache[string](cfg, "test", reg) require.NoError(t, err) @@ -36,27 +36,27 @@ func Test_Cache_LRUEviction(t *testing.T) { require.Equal(t, "", val2) require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 3 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 2 - # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses - # TYPE cortex_parquet_queryable_cache_misses_total counter - cortex_parquet_queryable_cache_misses_total{name="test"} 1 + # HELP cortex_parquet_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_cache_evictions_total counter + cortex_parquet_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_cache_hits_total counter + cortex_parquet_cache_hits_total{name="test"} 3 + # HELP cortex_parquet_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_cache_item_count gauge + cortex_parquet_cache_item_count{name="test"} 2 + # HELP cortex_parquet_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_cache_misses_total counter + cortex_parquet_cache_misses_total{name="test"} 1 `))) } func Test_Cache_TTLEvictionByGet(t *testing.T) { reg := prometheus.NewRegistry() cfg := &CacheConfig{ - ParquetQueryableShardCacheSize: 10, - ParquetQueryableShardCacheTTL: 100 * time.Millisecond, - MaintenanceInterval: time.Minute, + ParquetShardCacheSize: 10, + ParquetShardCacheTTL: 100 * time.Millisecond, + MaintenanceInterval: time.Minute, } cache, err := NewCache[string](cfg, "test", reg) @@ -75,27 +75,27 @@ func Test_Cache_TTLEvictionByGet(t *testing.T) { require.Equal(t, "", val) require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 0 - # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses - # TYPE cortex_parquet_queryable_cache_misses_total counter - cortex_parquet_queryable_cache_misses_total{name="test"} 1 + # HELP cortex_parquet_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_cache_evictions_total counter + cortex_parquet_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_cache_hits_total counter + cortex_parquet_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_cache_item_count gauge + cortex_parquet_cache_item_count{name="test"} 0 + # HELP cortex_parquet_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_cache_misses_total counter + cortex_parquet_cache_misses_total{name="test"} 1 `))) } func Test_Cache_TTLEvictionByLoop(t *testing.T) { reg := prometheus.NewRegistry() cfg := &CacheConfig{ - ParquetQueryableShardCacheSize: 10, - ParquetQueryableShardCacheTTL: 100 * time.Millisecond, - MaintenanceInterval: 100 * time.Millisecond, + ParquetShardCacheSize: 10, + ParquetShardCacheTTL: 100 * time.Millisecond, + MaintenanceInterval: 100 * time.Millisecond, } cache, err := NewCache[string](cfg, "test", reg) @@ -117,14 +117,14 @@ func Test_Cache_TTLEvictionByLoop(t *testing.T) { } require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 0 + # HELP cortex_parquet_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_cache_evictions_total counter + cortex_parquet_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_cache_hits_total counter + cortex_parquet_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_cache_item_count gauge + cortex_parquet_cache_item_count{name="test"} 0 `))) } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 66fbc2d76db..30efc2eb96c 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5840,17 +5840,17 @@ "type": "boolean", "x-cli-flag": "querier.parquet-queryable-fallback-disabled" }, - "parquet_queryable_shard_cache_size": { + "parquet_shard_cache_size": { "default": 512, - "description": "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.", + "description": "[Experimental] Maximum size of the Parquet shard cache. 0 to disable.", "type": "number", - "x-cli-flag": "querier.parquet-queryable-shard-cache-size" + "x-cli-flag": "querier.parquet-shard-cache-size" }, - "parquet_queryable_shard_cache_ttl": { + "parquet_shard_cache_ttl": { "default": "24h0m0s", - "description": "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.", + "description": "[Experimental] TTL of the Parquet shard cache. 0 to no TTL.", "type": "string", - "x-cli-flag": "querier.parquet-queryable-shard-cache-ttl", + "x-cli-flag": "querier.parquet-shard-cache-ttl", "x-format": "duration" }, "per_step_stats_enabled": {