Skip to content

Commit 9c6d985

Browse files
committed
Refactor: move parquet shard cache to parquet util pkg
Signed-off-by: SungJin1212 <[email protected]>
1 parent 194fbcb commit 9c6d985

File tree

7 files changed

+329
-282
lines changed

7 files changed

+329
-282
lines changed

docs/blocks-storage/querier.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ querier:
294294
# CLI flag: -querier.parquet-queryable-shard-cache-size
295295
[parquet_queryable_shard_cache_size: <int> | default = 512]
296296

297+
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
298+
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
299+
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
300+
297301
# [Experimental] Parquet queryable's default block store to query. Valid
298302
# options are tsdb and parquet. If it is set to tsdb, parquet queryable always
299303
# fallback to store gateway.
@@ -307,10 +311,6 @@ querier:
307311
# queryable.
308312
# CLI flag: -querier.parquet-queryable-fallback-disabled
309313
[parquet_queryable_fallback_disabled: <boolean> | default = false]
310-
311-
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
312-
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
313-
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
314314
```
315315
316316
### `blocks_storage_config`

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4812,6 +4812,10 @@ thanos_engine:
48124812
# CLI flag: -querier.parquet-queryable-shard-cache-size
48134813
[parquet_queryable_shard_cache_size: <int> | default = 512]
48144814

4815+
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
4816+
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
4817+
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
4818+
48154819
# [Experimental] Parquet queryable's default block store to query. Valid options
48164820
# are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback
48174821
# to store gateway.
@@ -4824,10 +4828,6 @@ thanos_engine:
48244828
# need to make sure Parquet files are created before it is queryable.
48254829
# CLI flag: -querier.parquet-queryable-fallback-disabled
48264830
[parquet_queryable_fallback_disabled: <boolean> | default = false]
4827-
4828-
# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.
4829-
# CLI flag: -querier.parquet-queryable-shard-cache-ttl
4830-
[parquet_queryable_shard_cache_ttl: <duration> | default = 24h]
48314831
```
48324832
48334833
### `query_frontend_config`

pkg/querier/parquet_queryable.go

Lines changed: 4 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"time"
88

99
"github.com/go-kit/log"
10-
lru "github.com/hashicorp/golang-lru/v2"
1110
"github.com/opentracing/opentracing-go"
1211
"github.com/parquet-go/parquet-go"
1312
"github.com/pkg/errors"
@@ -21,6 +20,7 @@ import (
2120
"github.com/prometheus/prometheus/storage"
2221
"github.com/prometheus/prometheus/tsdb/chunkenc"
2322
"github.com/prometheus/prometheus/util/annotations"
23+
"github.com/thanos-io/thanos/pkg/extprom"
2424
"github.com/thanos-io/thanos/pkg/store/storepb"
2525
"github.com/thanos-io/thanos/pkg/strutil"
2626
"golang.org/x/sync/errgroup"
@@ -33,6 +33,7 @@ import (
3333
"github.com/cortexproject/cortex/pkg/util"
3434
"github.com/cortexproject/cortex/pkg/util/limiter"
3535
"github.com/cortexproject/cortex/pkg/util/multierror"
36+
"github.com/cortexproject/cortex/pkg/util/parquetutil"
3637
"github.com/cortexproject/cortex/pkg/util/services"
3738
"github.com/cortexproject/cortex/pkg/util/users"
3839
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -51,8 +52,6 @@ const (
5152
parquetBlockStore blockStoreType = "parquet"
5253
)
5354

54-
const defaultMaintenanceInterval = time.Minute
55-
5655
var (
5756
validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}
5857
)
@@ -99,7 +98,7 @@ type parquetQueryableWithFallback struct {
9998
fallbackDisabled bool
10099
queryStoreAfter time.Duration
101100
parquetQueryable storage.Queryable
102-
cache cacheInterface[parquet_storage.ParquetShard]
101+
cache parquetutil.CacheInterface[parquet_storage.ParquetShard]
103102
blockStorageQueryable *BlocksStoreQueryable
104103

105104
finder BlocksFinder
@@ -135,7 +134,7 @@ func NewParquetQueryable(
135134
return nil, err
136135
}
137136

138-
cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg))
137+
cache, err := parquetutil.NewCache[parquet_storage.ParquetShard](&config.ParquetShardCache, "parquet-shards", extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg))
139138
if err != nil {
140139
return nil, err
141140
}
@@ -623,157 +622,6 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint
623622
return &shardMatcherLabelsFilter{shardMatcher: sm}, true
624623
}
625624

626-
type cacheInterface[T any] interface {
627-
Get(path string) T
628-
Set(path string, reader T)
629-
Close()
630-
}
631-
632-
type cacheMetrics struct {
633-
hits *prometheus.CounterVec
634-
misses *prometheus.CounterVec
635-
evictions *prometheus.CounterVec
636-
size *prometheus.GaugeVec
637-
}
638-
639-
func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics {
640-
return &cacheMetrics{
641-
hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
642-
Name: "cortex_parquet_queryable_cache_hits_total",
643-
Help: "Total number of parquet cache hits",
644-
}, []string{"name"}),
645-
misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
646-
Name: "cortex_parquet_queryable_cache_misses_total",
647-
Help: "Total number of parquet cache misses",
648-
}, []string{"name"}),
649-
evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
650-
Name: "cortex_parquet_queryable_cache_evictions_total",
651-
Help: "Total number of parquet cache evictions",
652-
}, []string{"name"}),
653-
size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
654-
Name: "cortex_parquet_queryable_cache_item_count",
655-
Help: "Current number of cached parquet items",
656-
}, []string{"name"}),
657-
}
658-
}
659-
660-
type cacheEntry[T any] struct {
661-
value T
662-
expiresAt time.Time
663-
}
664-
665-
type Cache[T any] struct {
666-
cache *lru.Cache[string, *cacheEntry[T]]
667-
name string
668-
metrics *cacheMetrics
669-
ttl time.Duration
670-
stopCh chan struct{}
671-
}
672-
673-
func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) {
674-
if size <= 0 {
675-
return &noopCache[T]{}, nil
676-
}
677-
cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) {
678-
metrics.evictions.WithLabelValues(name).Inc()
679-
metrics.size.WithLabelValues(name).Dec()
680-
})
681-
if err != nil {
682-
return nil, err
683-
}
684-
685-
c := &Cache[T]{
686-
cache: cache,
687-
name: name,
688-
metrics: metrics,
689-
ttl: ttl,
690-
stopCh: make(chan struct{}),
691-
}
692-
693-
if ttl > 0 {
694-
go c.maintenanceLoop(maintenanceInterval)
695-
}
696-
697-
return c, nil
698-
}
699-
700-
func (c *Cache[T]) maintenanceLoop(interval time.Duration) {
701-
ticker := time.NewTicker(interval)
702-
defer ticker.Stop()
703-
704-
for {
705-
select {
706-
case <-ticker.C:
707-
now := time.Now()
708-
keys := c.cache.Keys()
709-
for _, key := range keys {
710-
if entry, ok := c.cache.Peek(key); ok {
711-
// we use a Peek() because the Get() change LRU order.
712-
if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) {
713-
c.cache.Remove(key)
714-
}
715-
}
716-
}
717-
case <-c.stopCh:
718-
return
719-
}
720-
}
721-
}
722-
723-
func (c *Cache[T]) Get(path string) (r T) {
724-
if entry, ok := c.cache.Get(path); ok {
725-
isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt)
726-
727-
if isExpired {
728-
c.cache.Remove(path)
729-
c.metrics.misses.WithLabelValues(c.name).Inc()
730-
return
731-
}
732-
733-
c.metrics.hits.WithLabelValues(c.name).Inc()
734-
return entry.value
735-
}
736-
c.metrics.misses.WithLabelValues(c.name).Inc()
737-
return
738-
}
739-
740-
func (c *Cache[T]) Set(path string, reader T) {
741-
if !c.cache.Contains(path) {
742-
c.metrics.size.WithLabelValues(c.name).Inc()
743-
}
744-
745-
var expiresAt time.Time
746-
if c.ttl > 0 {
747-
expiresAt = time.Now().Add(c.ttl)
748-
}
749-
750-
entry := &cacheEntry[T]{
751-
value: reader,
752-
expiresAt: expiresAt,
753-
}
754-
755-
c.cache.Add(path, entry)
756-
}
757-
758-
func (c *Cache[T]) Close() {
759-
close(c.stopCh)
760-
}
761-
762-
type noopCache[T any] struct {
763-
}
764-
765-
func (n noopCache[T]) Get(_ string) (r T) {
766-
return
767-
}
768-
769-
func (n noopCache[T]) Set(_ string, _ T) {
770-
771-
}
772-
773-
func (n noopCache[T]) Close() {
774-
775-
}
776-
777625
var (
778626
shardInfoCtxKey contextKey = 1
779627
)

0 commit comments

Comments
 (0)