Skip to content

Commit 3a3b330

Browse files
committed
parquet store gateway poc
Signed-off-by: yeya24 <[email protected]>
1 parent 746b40e commit 3a3b330

File tree

10 files changed

+1918
-94
lines changed

10 files changed

+1918
-94
lines changed

integration/parquet_gateway_test.go

Lines changed: 646 additions & 0 deletions
Large diffs are not rendered by default.

pkg/storage/tsdb/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ var (
6161
ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy")
6262
ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode")
6363
ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio = errors.New("lazy expanded posting group max key series ratio needs to be equal or greater than 0")
64+
ErrInvalidBucketStoreType = errors.New("invalid bucket store type")
6465
)
6566

6667
// BlocksStorageConfig holds the config information for the blocks storage.
@@ -292,6 +293,7 @@ type BucketStoreConfig struct {
292293
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
293294
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
294295
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`
296+
BucketStoreType string `yaml:"bucket_store_type"`
295297

296298
// Chunk pool.
297299
MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"`
@@ -378,6 +380,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
378380
f.Float64Var(&cfg.LazyExpandedPostingGroupMaxKeySeriesRatio, "blocks-storage.bucket-store.lazy-expanded-posting-group-max-key-series-ratio", 100, "Mark posting group as lazy if it fetches more keys than R * max series the query should fetch. With R set to 100, a posting group which fetches 100K keys will be marked as lazy if the current query only fetches 1000 series. This config is only valid if lazy expanded posting is enabled. 0 disables the limit.")
379381
f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.")
380382
f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.")
383+
f.StringVar(&cfg.BucketStoreType, "blocks-storage.bucket-store.bucket-store-type", "tsdb", "Type of bucket store to use (tsdb or parquet).")
381384
f.StringVar(&cfg.TokenBucketBytesLimiter.Mode, "blocks-storage.bucket-store.token-bucket-bytes-limiter.mode", string(TokenBucketBytesLimiterDisabled), fmt.Sprintf("Token bucket bytes limiter mode. Supported values are: %s", strings.Join(supportedTokenBucketBytesLimiterModes, ", ")))
382385
f.Int64Var(&cfg.TokenBucketBytesLimiter.InstanceTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size", int64(820*units.Mebibyte), "Instance token bucket size")
383386
f.Int64Var(&cfg.TokenBucketBytesLimiter.UserTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size", int64(615*units.Mebibyte), "User token bucket size")
@@ -415,6 +418,9 @@ func (cfg *BucketStoreConfig) Validate() error {
415418
if !util.StringsContain(supportedTokenBucketBytesLimiterModes, cfg.TokenBucketBytesLimiter.Mode) {
416419
return ErrInvalidTokenBucketBytesLimiterMode
417420
}
421+
if !util.StringsContain(supportedBucketStoreTypes, cfg.BucketStoreType) {
422+
return ErrInvalidBucketStoreType
423+
}
418424
if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 {
419425
return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio
420426
}
@@ -450,6 +456,11 @@ var supportedBlockDiscoveryStrategies = []string{
450456
string(BucketIndexDiscovery),
451457
}
452458

459+
var supportedBucketStoreTypes = []string{
460+
"tsdb",
461+
"parquet",
462+
}
463+
453464
type TokenBucketBytesLimiterMode string
454465

455466
const (

pkg/storegateway/bucket_stores.go

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,23 @@ import (
4444
"github.com/cortexproject/cortex/pkg/util/validation"
4545
)
4646

47-
// BucketStores is a multi-tenant wrapper of Thanos BucketStore.
48-
type BucketStores struct {
47+
// BucketStoreType represents the type of bucket store
48+
type BucketStoreType string
49+
50+
const (
51+
TSDBBucketStore BucketStoreType = "tsdb"
52+
ParquetBucketStore BucketStoreType = "parquet"
53+
)
54+
55+
// BucketStores defines the methods that any bucket stores implementation must provide
56+
type BucketStores interface {
57+
storepb.StoreServer
58+
SyncBlocks(ctx context.Context) error
59+
InitialSync(ctx context.Context) error
60+
}
61+
62+
// ThanosBucketStores is a multi-tenant wrapper of Thanos BucketStore.
63+
type ThanosBucketStores struct {
4964
logger log.Logger
5065
cfg tsdb.BlocksStorageConfig
5166
limits *validation.Overrides
@@ -74,7 +89,7 @@ type BucketStores struct {
7489
storesMu sync.RWMutex
7590
stores map[string]*store.BucketStore
7691

77-
// Keeps the last sync error for the bucket store for each tenant.
92+
// Keeps the last sync error for the bucket store for each tenant.
7893
storesErrorsMu sync.RWMutex
7994
storesErrors map[string]error
8095

@@ -86,8 +101,7 @@ type BucketStores struct {
86101
userScanner users.Scanner
87102

88103
// Keeps number of inflight requests
89-
inflightRequestCnt int
90-
inflightRequestMu sync.RWMutex
104+
inflightRequests *util.InflightRequestTracker
91105

92106
// Metrics.
93107
syncTimes prometheus.Histogram
@@ -99,7 +113,19 @@ type BucketStores struct {
99113
var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway")
100114

101115
// NewBucketStores makes a new BucketStores.
102-
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
116+
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (BucketStores, error) {
117+
switch cfg.BucketStore.BucketStoreType {
118+
case string(ParquetBucketStore):
119+
return newParquetBucketStores(cfg, bucketClient, limits, logger, reg)
120+
case string(TSDBBucketStore):
121+
return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg)
122+
default:
123+
return nil, fmt.Errorf("unsupported bucket store type: %s", cfg.BucketStore.BucketStoreType)
124+
}
125+
}
126+
127+
// newThanosBucketStores creates a new TSDB-based bucket stores
128+
func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*ThanosBucketStores, error) {
103129
matchers := tsdb.NewMatchers()
104130
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg)
105131
if err != nil {
@@ -114,7 +140,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
114140
Help: "Number of maximum concurrent queries allowed.",
115141
}).Set(float64(cfg.BucketStore.MaxConcurrent))
116142

117-
u := &BucketStores{
143+
u := &ThanosBucketStores{
118144
logger: logger,
119145
cfg: cfg,
120146
limits: limits,
@@ -128,6 +154,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
128154
queryGate: queryGate,
129155
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
130156
userTokenBuckets: make(map[string]*util.TokenBucket),
157+
inflightRequests: util.NewInflightRequestTracker(),
131158
syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
132159
Name: "cortex_bucket_stores_blocks_sync_seconds",
133160
Help: "The total time it takes to perform a sync stores",
@@ -187,7 +214,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
187214
}
188215

189216
// InitialSync does an initial synchronization of blocks for all users.
190-
func (u *BucketStores) InitialSync(ctx context.Context) error {
217+
func (u *ThanosBucketStores) InitialSync(ctx context.Context) error {
191218
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
192219

193220
if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error {
@@ -202,13 +229,13 @@ func (u *BucketStores) InitialSync(ctx context.Context) error {
202229
}
203230

204231
// SyncBlocks synchronizes the stores state with the Bucket store for every user.
205-
func (u *BucketStores) SyncBlocks(ctx context.Context) error {
232+
func (u *ThanosBucketStores) SyncBlocks(ctx context.Context) error {
206233
return u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error {
207234
return s.SyncBlocks(ctx)
208235
})
209236
}
210237

211-
func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
238+
func (u *ThanosBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
212239
retries := backoff.New(ctx, backoff.Config{
213240
MinBackoff: 1 * time.Second,
214241
MaxBackoff: 10 * time.Second,
@@ -232,7 +259,7 @@ func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(co
232259
return lastErr
233260
}
234261

235-
func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) (returnErr error) {
262+
func (u *ThanosBucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) (returnErr error) {
236263
defer func(start time.Time) {
237264
u.syncTimes.Observe(time.Since(start).Seconds())
238265
if returnErr == nil {
@@ -330,7 +357,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
330357
}
331358

332359
// Series makes a series request to the underlying user bucket store.
333-
func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
360+
func (u *ThanosBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
334361
spanLog, spanCtx := spanlogger.New(srv.Context(), "BucketStores.Series")
335362
defer spanLog.Finish()
336363

@@ -356,12 +383,12 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
356383

357384
maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests
358385
if maxInflightRequests > 0 {
359-
if u.getInflightRequestCnt() >= maxInflightRequests {
386+
if u.inflightRequests.Count() >= maxInflightRequests {
360387
return ErrTooManyInflightRequests
361388
}
362389

363-
u.incrementInflightRequestCnt()
364-
defer u.decrementInflightRequestCnt()
390+
u.inflightRequests.Inc()
391+
defer u.inflightRequests.Dec()
365392
}
366393

367394
err = store.Series(req, spanSeriesServer{
@@ -372,26 +399,8 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
372399
return err
373400
}
374401

375-
func (u *BucketStores) getInflightRequestCnt() int {
376-
u.inflightRequestMu.RLock()
377-
defer u.inflightRequestMu.RUnlock()
378-
return u.inflightRequestCnt
379-
}
380-
381-
func (u *BucketStores) incrementInflightRequestCnt() {
382-
u.inflightRequestMu.Lock()
383-
u.inflightRequestCnt++
384-
u.inflightRequestMu.Unlock()
385-
}
386-
387-
func (u *BucketStores) decrementInflightRequestCnt() {
388-
u.inflightRequestMu.Lock()
389-
u.inflightRequestCnt--
390-
u.inflightRequestMu.Unlock()
391-
}
392-
393402
// LabelNames implements the Storegateway proto service.
394-
func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
403+
func (u *ThanosBucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
395404
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames")
396405
defer spanLog.Finish()
397406

@@ -421,7 +430,7 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe
421430
}
422431

423432
// LabelValues implements the Storegateway proto service.
424-
func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
433+
func (u *ThanosBucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
425434
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelValues")
426435
defer spanLog.Finish()
427436

@@ -450,7 +459,7 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues
450459

451460
// scanUsers in the bucket and return the list of found users. It includes active and deleting users
452461
// but not deleted users.
453-
func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) {
462+
func (u *ThanosBucketStores) scanUsers(ctx context.Context) ([]string, error) {
454463
activeUsers, deletingUsers, _, err := u.userScanner.ScanUsers(ctx)
455464
if err != nil {
456465
return nil, err
@@ -477,13 +486,13 @@ func deduplicateUsers(users []string) []string {
477486
return uniqueUsers
478487
}
479488

480-
func (u *BucketStores) getStore(userID string) *store.BucketStore {
489+
func (u *ThanosBucketStores) getStore(userID string) *store.BucketStore {
481490
u.storesMu.RLock()
482491
defer u.storesMu.RUnlock()
483492
return u.stores[userID]
484493
}
485494

486-
func (u *BucketStores) getStoreError(userID string) error {
495+
func (u *ThanosBucketStores) getStoreError(userID string) error {
487496
u.storesErrorsMu.RLock()
488497
defer u.storesErrorsMu.RUnlock()
489498
return u.storesErrors[userID]
@@ -499,7 +508,7 @@ var (
499508
// If bucket store doesn't exist, returns errBucketStoreNotFound.
500509
// If bucket store is not empty, returns errBucketStoreNotEmpty.
501510
// Otherwise returns error from closing the bucket store.
502-
func (u *BucketStores) closeEmptyBucketStore(userID string) error {
511+
func (u *ThanosBucketStores) closeEmptyBucketStore(userID string) error {
503512
u.storesMu.Lock()
504513
unlockInDefer := true
505514
defer func() {
@@ -537,11 +546,11 @@ func isEmptyBucketStore(bs *store.BucketStore) bool {
537546
return min == math.MaxInt64 && max == math.MinInt64
538547
}
539548

540-
func (u *BucketStores) syncDirForUser(userID string) string {
549+
func (u *ThanosBucketStores) syncDirForUser(userID string) string {
541550
return filepath.Join(u.cfg.BucketStore.SyncDir, userID)
542551
}
543552

544-
func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) {
553+
func (u *ThanosBucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) {
545554
// Check if the store already exists.
546555
bs := u.getStore(userID)
547556
if bs != nil {
@@ -721,7 +730,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
721730

722731
// deleteLocalFilesForExcludedTenants removes local "sync" directories for tenants that are not included in the current
723732
// shard.
724-
func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) {
733+
func (u *ThanosBucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) {
725734
files, err := os.ReadDir(u.cfg.BucketStore.SyncDir)
726735
if err != nil {
727736
return
@@ -760,13 +769,13 @@ func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[str
760769
}
761770
}
762771

763-
func (u *BucketStores) getUserTokenBucket(userID string) *util.TokenBucket {
772+
func (u *ThanosBucketStores) getUserTokenBucket(userID string) *util.TokenBucket {
764773
u.userTokenBucketsMu.RLock()
765774
defer u.userTokenBucketsMu.RUnlock()
766775
return u.userTokenBuckets[userID]
767776
}
768777

769-
func (u *BucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 {
778+
func (u *ThanosBucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 {
770779
tokensToRetrieve := float64(tokens)
771780
switch dataType {
772781
case store.PostingsFetched:

0 commit comments

Comments
 (0)