diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 732324fec1f..a6e0a29e67f 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -289,10 +289,13 @@ querier: # CLI flag: -querier.enable-parquet-queryable [enable_parquet_queryable: | default = false] - # [Experimental] Maximum size of the Parquet queryable shard cache. 0 to - # disable. - # CLI flag: -querier.parquet-queryable-shard-cache-size - [parquet_queryable_shard_cache_size: | default = 512] + # [Experimental] Maximum size of the Parquet shard cache. 0 to disable. + # CLI flag: -querier.parquet-shard-cache-size + [parquet_shard_cache_size: | default = 512] + + # [Experimental] TTL of the Parquet shard cache. 0 to no TTL. + # CLI flag: -querier.parquet-shard-cache-ttl + [parquet_shard_cache_ttl: | default = 24h] # [Experimental] Parquet queryable's default block store to query. Valid # options are tsdb and parquet. If it is set to tsdb, parquet queryable always @@ -307,10 +310,6 @@ querier: # queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] - - # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. - # CLI flag: -querier.parquet-queryable-shard-cache-ttl - [parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 58d1c89ebb9..75257b54541 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4807,10 +4807,13 @@ thanos_engine: # CLI flag: -querier.enable-parquet-queryable [enable_parquet_queryable: | default = false] -# [Experimental] Maximum size of the Parquet queryable shard cache. 0 to -# disable. -# CLI flag: -querier.parquet-queryable-shard-cache-size -[parquet_queryable_shard_cache_size: | default = 512] +# [Experimental] Maximum size of the Parquet shard cache. 0 to disable. +# CLI flag: -querier.parquet-shard-cache-size +[parquet_shard_cache_size: | default = 512] + +# [Experimental] TTL of the Parquet shard cache. 0 to no TTL. +# CLI flag: -querier.parquet-shard-cache-ttl +[parquet_shard_cache_ttl: | default = 24h] # [Experimental] Parquet queryable's default block store to query. Valid options # are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback @@ -4824,10 +4827,6 @@ thanos_engine: # need to make sure Parquet files are created before it is queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] - -# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. -# CLI flag: -querier.parquet-queryable-shard-cache-ttl -[parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `query_frontend_config` diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 40be5ff8997..77f578225cf 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -7,7 +7,6 @@ import ( "time" "github.com/go-kit/log" - lru "github.com/hashicorp/golang-lru/v2" "github.com/opentracing/opentracing-go" "github.com/parquet-go/parquet-go" "github.com/pkg/errors" @@ -21,6 +20,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" @@ -33,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/multierror" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -51,8 +52,6 @@ const ( parquetBlockStore blockStoreType = "parquet" ) -const defaultMaintenanceInterval = time.Minute - var ( validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore} ) @@ -99,7 +98,7 @@ type parquetQueryableWithFallback struct { fallbackDisabled bool queryStoreAfter time.Duration parquetQueryable storage.Queryable - cache cacheInterface[parquet_storage.ParquetShard] + cache parquetutil.CacheInterface[parquet_storage.ParquetShard] blockStorageQueryable *BlocksStoreQueryable finder BlocksFinder @@ -135,7 +134,7 @@ func NewParquetQueryable( return nil, err } - cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg)) + cache, err := parquetutil.NewCache[parquet_storage.ParquetShard](&config.ParquetShardCache, "parquet-shards", extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) if err != nil { return nil, err } @@ -623,157 +622,6 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint return &shardMatcherLabelsFilter{shardMatcher: sm}, true } -type cacheInterface[T any] interface { - Get(path string) T - Set(path string, reader T) - Close() -} - -type cacheMetrics struct { - hits *prometheus.CounterVec - misses *prometheus.CounterVec - evictions *prometheus.CounterVec - size *prometheus.GaugeVec -} - -func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { - return &cacheMetrics{ - hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_hits_total", - Help: "Total number of parquet cache hits", - }, []string{"name"}), - misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_misses_total", - Help: "Total number of parquet cache misses", - }, []string{"name"}), - evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_evictions_total", - Help: "Total number of parquet cache evictions", - }, []string{"name"}), - size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_parquet_queryable_cache_item_count", - Help: "Current number of cached parquet items", - }, []string{"name"}), - } -} - -type cacheEntry[T any] struct { - value T - expiresAt time.Time -} - -type Cache[T any] struct { - cache *lru.Cache[string, *cacheEntry[T]] - name string - metrics *cacheMetrics - ttl time.Duration - stopCh chan struct{} -} - -func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) { - if size <= 0 { - return &noopCache[T]{}, nil - } - cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) { - metrics.evictions.WithLabelValues(name).Inc() - metrics.size.WithLabelValues(name).Dec() - }) - if err != nil { - return nil, err - } - - c := &Cache[T]{ - cache: cache, - name: name, - metrics: metrics, - ttl: ttl, - stopCh: make(chan struct{}), - } - - if ttl > 0 { - go c.maintenanceLoop(maintenanceInterval) - } - - return c, nil -} - -func (c *Cache[T]) maintenanceLoop(interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - now := time.Now() - keys := c.cache.Keys() - for _, key := range keys { - if entry, ok := c.cache.Peek(key); ok { - // we use a Peek() because the Get() change LRU order. - if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) { - c.cache.Remove(key) - } - } - } - case <-c.stopCh: - return - } - } -} - -func (c *Cache[T]) Get(path string) (r T) { - if entry, ok := c.cache.Get(path); ok { - isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt) - - if isExpired { - c.cache.Remove(path) - c.metrics.misses.WithLabelValues(c.name).Inc() - return - } - - c.metrics.hits.WithLabelValues(c.name).Inc() - return entry.value - } - c.metrics.misses.WithLabelValues(c.name).Inc() - return -} - -func (c *Cache[T]) Set(path string, reader T) { - if !c.cache.Contains(path) { - c.metrics.size.WithLabelValues(c.name).Inc() - } - - var expiresAt time.Time - if c.ttl > 0 { - expiresAt = time.Now().Add(c.ttl) - } - - entry := &cacheEntry[T]{ - value: reader, - expiresAt: expiresAt, - } - - c.cache.Add(path, entry) -} - -func (c *Cache[T]) Close() { - close(c.stopCh) -} - -type noopCache[T any] struct { -} - -func (n noopCache[T]) Get(_ string) (r T) { - return -} - -func (n noopCache[T]) Set(_ string, _ T) { - -} - -func (n noopCache[T]) Close() { - -} - var ( shardInfoCtxKey contextKey = 1 ) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 250b1831442..70c2a5cdacd 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -1,7 +1,6 @@ package querier import ( - "bytes" "context" "fmt" "math/rand" @@ -15,7 +14,6 @@ import ( "github.com/oklog/ulid/v2" "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -41,6 +39,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/services" cortex_testutil "github.com/cortexproject/cortex/pkg/util/testutil" "github.com/cortexproject/cortex/pkg/util/validation" @@ -402,8 +401,10 @@ func TestParquetQueryable_Limits(t *testing.T) { QueryStoreAfter: 0, StoreGatewayQueryStatsEnabled: false, StoreGatewayConsistencyCheckMaxAttempts: 3, - ParquetQueryableShardCacheSize: 100, - ParquetQueryableDefaultBlockStore: "parquet", + ParquetShardCache: parquetutil.CacheConfig{ + ParquetShardCacheSize: 100, + }, + ParquetQueryableDefaultBlockStore: "parquet", } storageCfg := cortex_tsdb.BlocksStorageConfig{ @@ -883,110 +884,3 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { }) }) } - -func Test_Cache_LRUEviction(t *testing.T) { - reg := prometheus.NewRegistry() - metrics := newCacheMetrics(reg) - cache, err := newCache[string]("test", 2, 0, time.Minute, metrics) - require.NoError(t, err) - defer cache.Close() - - cache.Set("key1", "value1") - cache.Set("key2", "value2") - - _ = cache.Get("key1") // hit - // "key2" deleted by LRU eviction - cache.Set("key3", "value3") - - val1 := cache.Get("key1") // hit - require.Equal(t, "value1", val1) - val3 := cache.Get("key3") // hit - require.Equal(t, "value3", val3) - val2 := cache.Get("key2") // miss - require.Equal(t, "", val2) - - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 3 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 2 - # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses - # TYPE cortex_parquet_queryable_cache_misses_total counter - cortex_parquet_queryable_cache_misses_total{name="test"} 1 - `))) -} - -func Test_Cache_TTLEvictionByGet(t *testing.T) { - reg := prometheus.NewRegistry() - metrics := newCacheMetrics(reg) - - cache, err := newCache[string]("test", 10, 100*time.Millisecond, time.Minute, metrics) - require.NoError(t, err) - defer cache.Close() - - cache.Set("key1", "value1") - - val := cache.Get("key1") - require.Equal(t, "value1", val) - - // sleep longer than TTL - time.Sleep(150 * time.Millisecond) - - val = cache.Get("key1") - require.Equal(t, "", val) - - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 0 - # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses - # TYPE cortex_parquet_queryable_cache_misses_total counter - cortex_parquet_queryable_cache_misses_total{name="test"} 1 - `))) -} - -func Test_Cache_TTLEvictionByLoop(t *testing.T) { - reg := prometheus.NewRegistry() - metrics := newCacheMetrics(reg) - - cache, err := newCache[string]("test", 10, 100*time.Millisecond, 100*time.Millisecond, metrics) - require.NoError(t, err) - defer cache.Close() - - cache.Set("key1", "value1") - - val := cache.Get("key1") - require.Equal(t, "value1", val) - - // sleep longer than TTL - time.Sleep(150 * time.Millisecond) - - if c, ok := cache.(*Cache[string]); ok { - // should delete by maintenance loop - _, ok := c.cache.Peek("key1") - require.False(t, ok) - } - - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 0 - `))) -} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index a6912ea024a..636300d64df 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -32,6 +32,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -92,11 +93,10 @@ type Config struct { EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` // Query Parquet files if available - EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` - ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` - ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` - ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` + EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` + ParquetShardCache parquetutil.CacheConfig `yaml:",inline"` + ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` + ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } @@ -147,8 +147,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.") f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") - f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") - f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, "querier.parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") + cfg.ParquetShardCache.RegisterFlagsWithPrefix("querier.", f) f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") diff --git a/pkg/util/parquetutil/cache.go b/pkg/util/parquetutil/cache.go new file mode 100644 index 00000000000..952e84bea3b --- /dev/null +++ b/pkg/util/parquetutil/cache.go @@ -0,0 +1,176 @@ +package parquetutil + +import ( + "flag" + "time" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + defaultMaintenanceInterval = time.Minute +) + +type CacheInterface[T any] interface { + Get(path string) T + Set(path string, reader T) + Close() +} + +type cacheMetrics struct { + hits *prometheus.CounterVec + misses *prometheus.CounterVec + evictions *prometheus.CounterVec + size *prometheus.GaugeVec +} + +func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { + return &cacheMetrics{ + hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_cache_hits_total", + Help: "Total number of parquet cache hits", + }, []string{"name"}), + misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_cache_misses_total", + Help: "Total number of parquet cache misses", + }, []string{"name"}), + evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_cache_evictions_total", + Help: "Total number of parquet cache evictions", + }, []string{"name"}), + size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_parquet_cache_item_count", + Help: "Current number of cached parquet items", + }, []string{"name"}), + } +} + +type cacheEntry[T any] struct { + value T + expiresAt time.Time +} + +type Cache[T any] struct { + cache *lru.Cache[string, *cacheEntry[T]] + name string + metrics *cacheMetrics + ttl time.Duration + stopCh chan struct{} +} + +type CacheConfig struct { + ParquetShardCacheSize int `yaml:"parquet_shard_cache_size"` + ParquetShardCacheTTL time.Duration `yaml:"parquet_shard_cache_ttl"` + MaintenanceInterval time.Duration `yaml:"-"` +} + +func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.ParquetShardCacheSize, prefix+"parquet-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet shard cache. 0 to disable.") + f.DurationVar(&cfg.ParquetShardCacheTTL, prefix+"parquet-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet shard cache. 0 to no TTL.") + cfg.MaintenanceInterval = defaultMaintenanceInterval +} + +func NewCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) (CacheInterface[T], error) { + if cfg.ParquetShardCacheSize <= 0 { + return &noopCache[T]{}, nil + } + metrics := newCacheMetrics(reg) + cache, err := lru.NewWithEvict(cfg.ParquetShardCacheSize, func(key string, value *cacheEntry[T]) { + metrics.evictions.WithLabelValues(name).Inc() + metrics.size.WithLabelValues(name).Dec() + }) + if err != nil { + return nil, err + } + + c := &Cache[T]{ + cache: cache, + name: name, + metrics: metrics, + ttl: cfg.ParquetShardCacheTTL, + stopCh: make(chan struct{}), + } + + if cfg.ParquetShardCacheTTL > 0 { + go c.maintenanceLoop(cfg.MaintenanceInterval) + } + + return c, nil +} + +func (c *Cache[T]) maintenanceLoop(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + now := time.Now() + keys := c.cache.Keys() + for _, key := range keys { + if entry, ok := c.cache.Peek(key); ok { + // we use a Peek() because the Get() change LRU order. + if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) { + c.cache.Remove(key) + } + } + } + case <-c.stopCh: + return + } + } +} + +func (c *Cache[T]) Get(path string) (r T) { + if entry, ok := c.cache.Get(path); ok { + isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt) + + if isExpired { + c.cache.Remove(path) + c.metrics.misses.WithLabelValues(c.name).Inc() + return + } + + c.metrics.hits.WithLabelValues(c.name).Inc() + return entry.value + } + c.metrics.misses.WithLabelValues(c.name).Inc() + return +} + +func (c *Cache[T]) Set(path string, reader T) { + if !c.cache.Contains(path) { + c.metrics.size.WithLabelValues(c.name).Inc() + } + + var expiresAt time.Time + if c.ttl > 0 { + expiresAt = time.Now().Add(c.ttl) + } + + entry := &cacheEntry[T]{ + value: reader, + expiresAt: expiresAt, + } + + c.cache.Add(path, entry) +} + +func (c *Cache[T]) Close() { + close(c.stopCh) +} + +type noopCache[T any] struct { +} + +func (n noopCache[T]) Get(_ string) (r T) { + return +} + +func (n noopCache[T]) Set(_ string, _ T) { + +} + +func (n noopCache[T]) Close() {} diff --git a/pkg/util/parquetutil/cache_test.go b/pkg/util/parquetutil/cache_test.go new file mode 100644 index 00000000000..b71b77207bf --- /dev/null +++ b/pkg/util/parquetutil/cache_test.go @@ -0,0 +1,130 @@ +package parquetutil + +import ( + "bytes" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func Test_Cache_LRUEviction(t *testing.T) { + reg := prometheus.NewRegistry() + cfg := &CacheConfig{ + ParquetShardCacheSize: 2, + ParquetShardCacheTTL: 0, + MaintenanceInterval: time.Minute, + } + cache, err := NewCache[string](cfg, "test", reg) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + cache.Set("key2", "value2") + + _ = cache.Get("key1") // hit + // "key2" deleted by LRU eviction + cache.Set("key3", "value3") + + val1 := cache.Get("key1") // hit + require.Equal(t, "value1", val1) + val3 := cache.Get("key3") // hit + require.Equal(t, "value3", val3) + val2 := cache.Get("key2") // miss + require.Equal(t, "", val2) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_cache_evictions_total counter + cortex_parquet_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_cache_hits_total counter + cortex_parquet_cache_hits_total{name="test"} 3 + # HELP cortex_parquet_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_cache_item_count gauge + cortex_parquet_cache_item_count{name="test"} 2 + # HELP cortex_parquet_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_cache_misses_total counter + cortex_parquet_cache_misses_total{name="test"} 1 + `))) +} + +func Test_Cache_TTLEvictionByGet(t *testing.T) { + reg := prometheus.NewRegistry() + cfg := &CacheConfig{ + ParquetShardCacheSize: 10, + ParquetShardCacheTTL: 100 * time.Millisecond, + MaintenanceInterval: time.Minute, + } + + cache, err := NewCache[string](cfg, "test", reg) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + val = cache.Get("key1") + require.Equal(t, "", val) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_cache_evictions_total counter + cortex_parquet_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_cache_hits_total counter + cortex_parquet_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_cache_item_count gauge + cortex_parquet_cache_item_count{name="test"} 0 + # HELP cortex_parquet_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_cache_misses_total counter + cortex_parquet_cache_misses_total{name="test"} 1 + `))) +} + +func Test_Cache_TTLEvictionByLoop(t *testing.T) { + reg := prometheus.NewRegistry() + cfg := &CacheConfig{ + ParquetShardCacheSize: 10, + ParquetShardCacheTTL: 100 * time.Millisecond, + MaintenanceInterval: 100 * time.Millisecond, + } + + cache, err := NewCache[string](cfg, "test", reg) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + if c, ok := cache.(*Cache[string]); ok { + // should delete by maintenance loop + _, ok := c.cache.Peek("key1") + require.False(t, ok) + } + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_cache_evictions_total counter + cortex_parquet_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_cache_hits_total counter + cortex_parquet_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_cache_item_count gauge + cortex_parquet_cache_item_count{name="test"} 0 + `))) +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 66fbc2d76db..30efc2eb96c 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5840,17 +5840,17 @@ "type": "boolean", "x-cli-flag": "querier.parquet-queryable-fallback-disabled" }, - "parquet_queryable_shard_cache_size": { + "parquet_shard_cache_size": { "default": 512, - "description": "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.", + "description": "[Experimental] Maximum size of the Parquet shard cache. 0 to disable.", "type": "number", - "x-cli-flag": "querier.parquet-queryable-shard-cache-size" + "x-cli-flag": "querier.parquet-shard-cache-size" }, - "parquet_queryable_shard_cache_ttl": { + "parquet_shard_cache_ttl": { "default": "24h0m0s", - "description": "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.", + "description": "[Experimental] TTL of the Parquet shard cache. 0 to no TTL.", "type": "string", - "x-cli-flag": "querier.parquet-queryable-shard-cache-ttl", + "x-cli-flag": "querier.parquet-shard-cache-ttl", "x-format": "duration" }, "per_step_stats_enabled": {