|
4 | 4 | "context"
|
5 | 5 | "fmt"
|
6 | 6 | "math"
|
7 |
| - "net/http" |
8 | 7 | "os"
|
9 | 8 | "path/filepath"
|
10 | 9 | "strings"
|
@@ -35,6 +34,7 @@ import (
|
35 | 34 |
|
36 | 35 | "github.com/cortexproject/cortex/pkg/storage/bucket"
|
37 | 36 | "github.com/cortexproject/cortex/pkg/storage/tsdb"
|
| 37 | + "github.com/cortexproject/cortex/pkg/util" |
38 | 38 | "github.com/cortexproject/cortex/pkg/util/backoff"
|
39 | 39 | cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
|
40 | 40 | util_log "github.com/cortexproject/cortex/pkg/util/log"
|
@@ -73,6 +73,11 @@ type BucketStores struct {
|
73 | 73 | storesErrorsMu sync.RWMutex
|
74 | 74 | storesErrors map[string]error
|
75 | 75 |
|
| 76 | + instanceTokenBucket *util.TokenBucket |
| 77 | + |
| 78 | + userTokenBucketsMu sync.RWMutex |
| 79 | + userTokenBuckets map[string]*util.TokenBucket |
| 80 | + |
76 | 81 | // Keeps number of inflight requests
|
77 | 82 | inflightRequestCnt int
|
78 | 83 | inflightRequestMu sync.RWMutex
|
@@ -115,6 +120,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
|
115 | 120 | metaFetcherMetrics: NewMetadataFetcherMetrics(),
|
116 | 121 | queryGate: queryGate,
|
117 | 122 | partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
|
| 123 | + userTokenBuckets: make(map[string]*util.TokenBucket), |
118 | 124 | syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
|
119 | 125 | Name: "cortex_bucket_stores_blocks_sync_seconds",
|
120 | 126 | Help: "The total time it takes to perform a sync stores",
|
@@ -144,6 +150,13 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
|
144 | 150 | return nil, errors.Wrap(err, "create chunks bytes pool")
|
145 | 151 | }
|
146 | 152 |
|
| 153 | + if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) { |
| 154 | + u.instanceTokenBucket = util.NewTokenBucket(cfg.BucketStore.TokenBucketBytesLimiter.InstanceTokenBucketSize, promauto.With(reg).NewGauge(prometheus.GaugeOpts{ |
| 155 | + Name: "cortex_bucket_stores_instance_token_bucket_remaining", |
| 156 | + Help: "Number of tokens left in instance token bucket.", |
| 157 | + })) |
| 158 | + } |
| 159 | + |
147 | 160 | if reg != nil {
|
148 | 161 | reg.MustRegister(u.bucketStoreMetrics, u.metaFetcherMetrics)
|
149 | 162 | }
|
@@ -475,6 +488,12 @@ func (u *BucketStores) closeEmptyBucketStore(userID string) error {
|
475 | 488 | unlockInDefer = false
|
476 | 489 | u.storesMu.Unlock()
|
477 | 490 |
|
| 491 | + if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) { |
| 492 | + u.userTokenBucketsMu.Lock() |
| 493 | + delete(u.userTokenBuckets, userID) |
| 494 | + u.userTokenBucketsMu.Unlock() |
| 495 | + } |
| 496 | + |
478 | 497 | u.metaFetcherMetrics.RemoveUserRegistry(userID)
|
479 | 498 | u.bucketStoreMetrics.RemoveUserRegistry(userID)
|
480 | 499 | return bs.Close()
|
@@ -612,13 +631,19 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
|
612 | 631 | bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
|
613 | 632 | }
|
614 | 633 |
|
| 634 | + if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) { |
| 635 | + u.userTokenBucketsMu.Lock() |
| 636 | + u.userTokenBuckets[userID] = util.NewTokenBucket(u.cfg.BucketStore.TokenBucketBytesLimiter.UserTokenBucketSize, nil) |
| 637 | + u.userTokenBucketsMu.Unlock() |
| 638 | + } |
| 639 | + |
615 | 640 | bs, err := store.NewBucketStore(
|
616 | 641 | userBkt,
|
617 | 642 | fetcher,
|
618 | 643 | u.syncDirForUser(userID),
|
619 | 644 | newChunksLimiterFactory(u.limits, userID),
|
620 | 645 | newSeriesLimiterFactory(u.limits, userID),
|
621 |
| - newBytesLimiterFactory(u.limits, userID), |
| 646 | + newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve), |
622 | 647 | u.partitioner,
|
623 | 648 | u.cfg.BucketStore.BlockSyncConcurrency,
|
624 | 649 | false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
|
@@ -680,6 +705,31 @@ func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[str
|
680 | 705 | }
|
681 | 706 | }
|
682 | 707 |
|
| 708 | +func (u *BucketStores) getUserTokenBucket(userID string) *util.TokenBucket { |
| 709 | + u.userTokenBucketsMu.RLock() |
| 710 | + defer u.userTokenBucketsMu.RUnlock() |
| 711 | + return u.userTokenBuckets[userID] |
| 712 | +} |
| 713 | + |
| 714 | +func (u *BucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 { |
| 715 | + tokensToRetrieve := float64(tokens) |
| 716 | + switch dataType { |
| 717 | + case store.PostingsFetched: |
| 718 | + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedPostingsTokenFactor |
| 719 | + case store.PostingsTouched: |
| 720 | + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedPostingsTokenFactor |
| 721 | + case store.SeriesFetched: |
| 722 | + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedSeriesTokenFactor |
| 723 | + case store.SeriesTouched: |
| 724 | + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedSeriesTokenFactor |
| 725 | + case store.ChunksFetched: |
| 726 | + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedChunksTokenFactor |
| 727 | + case store.ChunksTouched: |
| 728 | + tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedChunksTokenFactor |
| 729 | + } |
| 730 | + return int64(tokensToRetrieve) |
| 731 | +} |
| 732 | + |
683 | 733 | func getUserIDFromGRPCContext(ctx context.Context) string {
|
684 | 734 | meta, ok := metadata.FromIncomingContext(ctx)
|
685 | 735 | if !ok {
|
@@ -730,50 +780,3 @@ type spanSeriesServer struct {
|
730 | 780 | func (s spanSeriesServer) Context() context.Context {
|
731 | 781 | return s.ctx
|
732 | 782 | }
|
733 |
| - |
734 |
| -type limiter struct { |
735 |
| - limiter *store.Limiter |
736 |
| -} |
737 |
| - |
738 |
| -func (c *limiter) Reserve(num uint64) error { |
739 |
| - return c.ReserveWithType(num, 0) |
740 |
| -} |
741 |
| - |
742 |
| -func (c *limiter) ReserveWithType(num uint64, _ store.StoreDataType) error { |
743 |
| - err := c.limiter.Reserve(num) |
744 |
| - if err != nil { |
745 |
| - return httpgrpc.Errorf(http.StatusUnprocessableEntity, err.Error()) |
746 |
| - } |
747 |
| - |
748 |
| - return nil |
749 |
| -} |
750 |
| - |
751 |
| -func newChunksLimiterFactory(limits *validation.Overrides, userID string) store.ChunksLimiterFactory { |
752 |
| - return func(failedCounter prometheus.Counter) store.ChunksLimiter { |
753 |
| - // Since limit overrides could be live reloaded, we have to get the current user's limit |
754 |
| - // each time a new limiter is instantiated. |
755 |
| - return &limiter{ |
756 |
| - limiter: store.NewLimiter(uint64(limits.MaxChunksPerQueryFromStore(userID)), failedCounter), |
757 |
| - } |
758 |
| - } |
759 |
| -} |
760 |
| - |
761 |
| -func newSeriesLimiterFactory(limits *validation.Overrides, userID string) store.SeriesLimiterFactory { |
762 |
| - return func(failedCounter prometheus.Counter) store.SeriesLimiter { |
763 |
| - // Since limit overrides could be live reloaded, we have to get the current user's limit |
764 |
| - // each time a new limiter is instantiated. |
765 |
| - return &limiter{ |
766 |
| - limiter: store.NewLimiter(uint64(limits.MaxFetchedSeriesPerQuery(userID)), failedCounter), |
767 |
| - } |
768 |
| - } |
769 |
| -} |
770 |
| - |
771 |
| -func newBytesLimiterFactory(limits *validation.Overrides, userID string) store.BytesLimiterFactory { |
772 |
| - return func(failedCounter prometheus.Counter) store.BytesLimiter { |
773 |
| - // Since limit overrides could be live reloaded, we have to get the current user's limit |
774 |
| - // each time a new limiter is instantiated. |
775 |
| - return &limiter{ |
776 |
| - limiter: store.NewLimiter(uint64(limits.MaxDownloadedBytesPerRequest(userID)), failedCounter), |
777 |
| - } |
778 |
| - } |
779 |
| -} |
|
0 commit comments