Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,13 @@ querier:
# CLI flag: -querier.enable-parquet-queryable
[enable_parquet_queryable: <boolean> | default = false]

# [Experimental] Maximum size of the Parquet queryable shard cache. 0 to
# disable.
# CLI flag: -querier.parquet-queryable-shard-cache-size
[parquet_queryable_shard_cache_size: <int> | default = 512]
# [Experimental] Maximum size of the Parquet shard cache. 0 to disable.
# CLI flag: -querier.parquet-shard-cache-size
[parquet_shard_cache_size: <int> | default = 512]

# [Experimental] TTL of the Parquet shard cache. 0 to no TTL.
# CLI flag: -querier.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# [Experimental] Parquet queryable's default block store to query. Valid
# options are tsdb and parquet. If it is set to tsdb, parquet queryable always
Expand All @@ -307,10 +310,6 @@ querier:
# queryable.
# CLI flag: -querier.parquet-queryable-fallback-disabled
[parquet_queryable_fallback_disabled: <boolean> | default = false]

# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
```

### `blocks_storage_config`
Expand Down
15 changes: 7 additions & 8 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4807,10 +4807,13 @@ thanos_engine:
# CLI flag: -querier.enable-parquet-queryable
[enable_parquet_queryable: <boolean> | default = false]

# [Experimental] Maximum size of the Parquet queryable shard cache. 0 to
# disable.
# CLI flag: -querier.parquet-queryable-shard-cache-size
[parquet_queryable_shard_cache_size: <int> | default = 512]
# [Experimental] Maximum size of the Parquet shard cache. 0 to disable.
# CLI flag: -querier.parquet-shard-cache-size
[parquet_shard_cache_size: <int> | default = 512]

# [Experimental] TTL of the Parquet shard cache. 0 to no TTL.
# CLI flag: -querier.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# [Experimental] Parquet queryable's default block store to query. Valid options
# are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback
Expand All @@ -4824,10 +4827,6 @@ thanos_engine:
# need to make sure Parquet files are created before it is queryable.
# CLI flag: -querier.parquet-queryable-fallback-disabled
[parquet_queryable_fallback_disabled: <boolean> | default = false]

# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
```

### `query_frontend_config`
Expand Down
160 changes: 4 additions & 156 deletions pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/go-kit/log"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"
"github.com/pkg/errors"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
"golang.org/x/sync/errgroup"
Expand All @@ -33,6 +33,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/multierror"
"github.com/cortexproject/cortex/pkg/util/parquetutil"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/users"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand All @@ -51,8 +52,6 @@ const (
parquetBlockStore blockStoreType = "parquet"
)

const defaultMaintenanceInterval = time.Minute

var (
validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}
)
Expand Down Expand Up @@ -99,7 +98,7 @@ type parquetQueryableWithFallback struct {
fallbackDisabled bool
queryStoreAfter time.Duration
parquetQueryable storage.Queryable
cache cacheInterface[parquet_storage.ParquetShard]
cache parquetutil.CacheInterface[parquet_storage.ParquetShard]
blockStorageQueryable *BlocksStoreQueryable

finder BlocksFinder
Expand Down Expand Up @@ -135,7 +134,7 @@ func NewParquetQueryable(
return nil, err
}

cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg))
cache, err := parquetutil.NewCache[parquet_storage.ParquetShard](&config.ParquetShardCache, "parquet-shards", extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -623,157 +622,6 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint
return &shardMatcherLabelsFilter{shardMatcher: sm}, true
}

type cacheInterface[T any] interface {
Get(path string) T
Set(path string, reader T)
Close()
}

type cacheMetrics struct {
hits *prometheus.CounterVec
misses *prometheus.CounterVec
evictions *prometheus.CounterVec
size *prometheus.GaugeVec
}

func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics {
return &cacheMetrics{
hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_parquet_queryable_cache_hits_total",
Help: "Total number of parquet cache hits",
}, []string{"name"}),
misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_parquet_queryable_cache_misses_total",
Help: "Total number of parquet cache misses",
}, []string{"name"}),
evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_parquet_queryable_cache_evictions_total",
Help: "Total number of parquet cache evictions",
}, []string{"name"}),
size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_parquet_queryable_cache_item_count",
Help: "Current number of cached parquet items",
}, []string{"name"}),
}
}

type cacheEntry[T any] struct {
value T
expiresAt time.Time
}

type Cache[T any] struct {
cache *lru.Cache[string, *cacheEntry[T]]
name string
metrics *cacheMetrics
ttl time.Duration
stopCh chan struct{}
}

func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) {
if size <= 0 {
return &noopCache[T]{}, nil
}
cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) {
metrics.evictions.WithLabelValues(name).Inc()
metrics.size.WithLabelValues(name).Dec()
})
if err != nil {
return nil, err
}

c := &Cache[T]{
cache: cache,
name: name,
metrics: metrics,
ttl: ttl,
stopCh: make(chan struct{}),
}

if ttl > 0 {
go c.maintenanceLoop(maintenanceInterval)
}

return c, nil
}

func (c *Cache[T]) maintenanceLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
now := time.Now()
keys := c.cache.Keys()
for _, key := range keys {
if entry, ok := c.cache.Peek(key); ok {
// we use a Peek() because the Get() change LRU order.
if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) {
c.cache.Remove(key)
}
}
}
case <-c.stopCh:
return
}
}
}

func (c *Cache[T]) Get(path string) (r T) {
if entry, ok := c.cache.Get(path); ok {
isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt)

if isExpired {
c.cache.Remove(path)
c.metrics.misses.WithLabelValues(c.name).Inc()
return
}

c.metrics.hits.WithLabelValues(c.name).Inc()
return entry.value
}
c.metrics.misses.WithLabelValues(c.name).Inc()
return
}

func (c *Cache[T]) Set(path string, reader T) {
if !c.cache.Contains(path) {
c.metrics.size.WithLabelValues(c.name).Inc()
}

var expiresAt time.Time
if c.ttl > 0 {
expiresAt = time.Now().Add(c.ttl)
}

entry := &cacheEntry[T]{
value: reader,
expiresAt: expiresAt,
}

c.cache.Add(path, entry)
}

func (c *Cache[T]) Close() {
close(c.stopCh)
}

type noopCache[T any] struct {
}

func (n noopCache[T]) Get(_ string) (r T) {
return
}

func (n noopCache[T]) Set(_ string, _ T) {

}

func (n noopCache[T]) Close() {

}

var (
shardInfoCtxKey contextKey = 1
)
Expand Down
Loading
Loading