Skip to content

Commit f64b0a4

Browse files
committed
Refactor: move parquet shard cache to parquet util pkg
Signed-off-by: SungJin1212 <[email protected]>
1 parent 56bc703 commit f64b0a4

File tree

7 files changed

+329
-282
lines changed

7 files changed

+329
-282
lines changed

docs/blocks-storage/querier.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ querier:
294294
# CLI flag: -querier.parquet-queryable-shard-cache-size
295295
[parquet_queryable_shard_cache_size: <int> | default = 512]
296296

297+
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
298+
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
299+
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
300+
297301
# [Experimental] Parquet queryable's default block store to query. Valid
298302
# options are tsdb and parquet. If it is set to tsdb, parquet queryable always
299303
# fallback to store gateway.
@@ -307,10 +311,6 @@ querier:
307311
# queryable.
308312
# CLI flag: -querier.parquet-queryable-fallback-disabled
309313
[parquet_queryable_fallback_disabled: <boolean> | default = false]
310-
311-
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
312-
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
313-
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
314314
```
315315
316316
### `blocks_storage_config`

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4812,6 +4812,10 @@ thanos_engine:
48124812
# CLI flag: -querier.parquet-queryable-shard-cache-size
48134813
[parquet_queryable_shard_cache_size: <int> | default = 512]
48144814

4815+
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
4816+
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
4817+
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
4818+
48154819
# [Experimental] Parquet queryable's default block store to query. Valid options
48164820
# are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback
48174821
# to store gateway.
@@ -4824,10 +4828,6 @@ thanos_engine:
48244828
# need to make sure Parquet files are created before it is queryable.
48254829
# CLI flag: -querier.parquet-queryable-fallback-disabled
48264830
[parquet_queryable_fallback_disabled: <boolean> | default = false]
4827-
4828-
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
4829-
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
4830-
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
48314831
```
48324832
48334833
### `query_frontend_config`

pkg/querier/parquet_queryable.go

Lines changed: 4 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"time"
88

99
"github.com/go-kit/log"
10-
lru "github.com/hashicorp/golang-lru/v2"
1110
"github.com/opentracing/opentracing-go"
1211
"github.com/parquet-go/parquet-go"
1312
"github.com/pkg/errors"
@@ -21,6 +20,7 @@ import (
2120
"github.com/prometheus/prometheus/storage"
2221
"github.com/prometheus/prometheus/tsdb/chunkenc"
2322
"github.com/prometheus/prometheus/util/annotations"
23+
"github.com/thanos-io/thanos/pkg/extprom"
2424
"github.com/thanos-io/thanos/pkg/store/storepb"
2525
"github.com/thanos-io/thanos/pkg/strutil"
2626
"golang.org/x/sync/errgroup"
@@ -33,6 +33,7 @@ import (
3333
"github.com/cortexproject/cortex/pkg/util"
3434
"github.com/cortexproject/cortex/pkg/util/limiter"
3535
"github.com/cortexproject/cortex/pkg/util/multierror"
36+
"github.com/cortexproject/cortex/pkg/util/parquetutil"
3637
"github.com/cortexproject/cortex/pkg/util/services"
3738
"github.com/cortexproject/cortex/pkg/util/users"
3839
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -51,8 +52,6 @@ const (
5152
parquetBlockStore blockStoreType = "parquet"
5253
)
5354

54-
const defaultMaintenanceInterval = time.Minute
55-
5655
var (
5756
validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}
5857
)
@@ -99,7 +98,7 @@ type parquetQueryableWithFallback struct {
9998
fallbackDisabled bool
10099
queryStoreAfter time.Duration
101100
parquetQueryable storage.Queryable
102-
cache cacheInterface[parquet_storage.ParquetShard]
101+
cache parquetutil.CacheInterface[parquet_storage.ParquetShard]
103102
blockStorageQueryable *BlocksStoreQueryable
104103

105104
finder BlocksFinder
@@ -135,7 +134,7 @@ func NewParquetQueryable(
135134
return nil, err
136135
}
137136

138-
cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg))
137+
cache, err := parquetutil.NewCache[parquet_storage.ParquetShard](&config.ParquetShardCache, "parquet-shards", extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg))
139138
if err != nil {
140139
return nil, err
141140
}
@@ -618,157 +617,6 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint
618617
return &shardMatcherLabelsFilter{shardMatcher: sm}, true
619618
}
620619

621-
type cacheInterface[T any] interface {
622-
Get(path string) T
623-
Set(path string, reader T)
624-
Close()
625-
}
626-
627-
type cacheMetrics struct {
628-
hits *prometheus.CounterVec
629-
misses *prometheus.CounterVec
630-
evictions *prometheus.CounterVec
631-
size *prometheus.GaugeVec
632-
}
633-
634-
func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics {
635-
return &cacheMetrics{
636-
hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
637-
Name: "cortex_parquet_queryable_cache_hits_total",
638-
Help: "Total number of parquet cache hits",
639-
}, []string{"name"}),
640-
misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
641-
Name: "cortex_parquet_queryable_cache_misses_total",
642-
Help: "Total number of parquet cache misses",
643-
}, []string{"name"}),
644-
evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
645-
Name: "cortex_parquet_queryable_cache_evictions_total",
646-
Help: "Total number of parquet cache evictions",
647-
}, []string{"name"}),
648-
size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
649-
Name: "cortex_parquet_queryable_cache_item_count",
650-
Help: "Current number of cached parquet items",
651-
}, []string{"name"}),
652-
}
653-
}
654-
655-
type cacheEntry[T any] struct {
656-
value T
657-
expiresAt time.Time
658-
}
659-
660-
type Cache[T any] struct {
661-
cache *lru.Cache[string, *cacheEntry[T]]
662-
name string
663-
metrics *cacheMetrics
664-
ttl time.Duration
665-
stopCh chan struct{}
666-
}
667-
668-
func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) {
669-
if size <= 0 {
670-
return &noopCache[T]{}, nil
671-
}
672-
cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) {
673-
metrics.evictions.WithLabelValues(name).Inc()
674-
metrics.size.WithLabelValues(name).Dec()
675-
})
676-
if err != nil {
677-
return nil, err
678-
}
679-
680-
c := &Cache[T]{
681-
cache: cache,
682-
name: name,
683-
metrics: metrics,
684-
ttl: ttl,
685-
stopCh: make(chan struct{}),
686-
}
687-
688-
if ttl > 0 {
689-
go c.maintenanceLoop(maintenanceInterval)
690-
}
691-
692-
return c, nil
693-
}
694-
695-
func (c *Cache[T]) maintenanceLoop(interval time.Duration) {
696-
ticker := time.NewTicker(interval)
697-
defer ticker.Stop()
698-
699-
for {
700-
select {
701-
case <-ticker.C:
702-
now := time.Now()
703-
keys := c.cache.Keys()
704-
for _, key := range keys {
705-
if entry, ok := c.cache.Peek(key); ok {
706-
// we use a Peek() because the Get() change LRU order.
707-
if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) {
708-
c.cache.Remove(key)
709-
}
710-
}
711-
}
712-
case <-c.stopCh:
713-
return
714-
}
715-
}
716-
}
717-
718-
func (c *Cache[T]) Get(path string) (r T) {
719-
if entry, ok := c.cache.Get(path); ok {
720-
isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt)
721-
722-
if isExpired {
723-
c.cache.Remove(path)
724-
c.metrics.misses.WithLabelValues(c.name).Inc()
725-
return
726-
}
727-
728-
c.metrics.hits.WithLabelValues(c.name).Inc()
729-
return entry.value
730-
}
731-
c.metrics.misses.WithLabelValues(c.name).Inc()
732-
return
733-
}
734-
735-
func (c *Cache[T]) Set(path string, reader T) {
736-
if !c.cache.Contains(path) {
737-
c.metrics.size.WithLabelValues(c.name).Inc()
738-
}
739-
740-
var expiresAt time.Time
741-
if c.ttl > 0 {
742-
expiresAt = time.Now().Add(c.ttl)
743-
}
744-
745-
entry := &cacheEntry[T]{
746-
value: reader,
747-
expiresAt: expiresAt,
748-
}
749-
750-
c.cache.Add(path, entry)
751-
}
752-
753-
func (c *Cache[T]) Close() {
754-
close(c.stopCh)
755-
}
756-
757-
type noopCache[T any] struct {
758-
}
759-
760-
func (n noopCache[T]) Get(_ string) (r T) {
761-
return
762-
}
763-
764-
func (n noopCache[T]) Set(_ string, _ T) {
765-
766-
}
767-
768-
func (n noopCache[T]) Close() {
769-
770-
}
771-
772620
var (
773621
shardInfoCtxKey contextKey = 1
774622
)

0 commit comments

Comments
 (0)