Skip to content

Commit 13c6311

Browse files
committed
Addressed comments
1 parent 8ff28cd commit 13c6311

File tree

3 files changed

+23
-12
lines changed

3 files changed

+23
-12
lines changed

cmd/thanos/query.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ func registerQuery(app *extkingpin.App) {
247247

248248
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for __rollup__ label for aggregated metrics. If set to x, all queries on aggregated metrics will have a __rollup__=x matcher. Leave empty to disable this behavior. Default is empty.").Default("").String()
249249

250-
lazyRetrievalBufferSize := cmd.Flag("query.lazy-retrieval-buffer-size", "The fixed buffer size (in terms of how many store responses) for lazy retrieval strategy. This is to limit the memory usage.").
250+
lazyRetrievalMaxBufferedResponses := cmd.Flag("query.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
251251
Default("20").Int()
252252

253253
var storeRateLimits store.SeriesSelectLimits
@@ -390,7 +390,7 @@ func registerQuery(app *extkingpin.App) {
390390
*tenantLabel,
391391
*enableGroupReplicaPartialStrategy,
392392
*rewriteAggregationLabelTo,
393-
*lazyRetrievalBufferSize,
393+
*lazyRetrievalMaxBufferedResponses,
394394
)
395395
})
396396
}
@@ -476,7 +476,7 @@ func runQuery(
476476
tenantLabel string,
477477
groupReplicaPartialResponseStrategy bool,
478478
rewriteAggregationLabelTo string,
479-
lazyRetrievalBufferSize int,
479+
lazyRetrievalMaxBufferedResponses int,
480480
) error {
481481
comp := component.Query
482482
if alertQueryURL == "" {
@@ -564,7 +564,7 @@ func runQuery(
564564
store.WithTSDBSelector(tsdbSelector),
565565
store.WithProxyStoreDebugLogging(debugLogging),
566566
store.WithQuorumChunkDedup(queryDeduplicationFunc == dedup.AlgorithmQuorum),
567-
store.WithLazyRetrievalBufferSize(lazyRetrievalBufferSize),
567+
store.WithLazyRetrievalMaxBufferedResponsesForProxy(lazyRetrievalMaxBufferedResponses),
568568
}
569569

570570
// Parse and sanitize the provided replica labels flags.

pkg/store/bucket.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,8 @@ type BucketStore struct {
422422
enabledLazyExpandedPostings bool
423423

424424
sortingStrategy sortingStrategy
425+
// This flag limits memory usage when lazy retrieval strategy, newLazyRespSet(), is used.
426+
lazyRetrievalMaxBufferedResponses int
425427

426428
blockEstimatedMaxSeriesFunc BlockEstimator
427429
blockEstimatedMaxChunkFunc BlockEstimator
@@ -561,6 +563,14 @@ func WithDontResort(true bool) BucketStoreOption {
561563
}
562564
}
563565

566+
func WithLazyRetrievalMaxBufferedResponsesForBucket(n int) BucketStoreOption {
567+
return func(s *BucketStore) {
568+
if true {
569+
s.lazyRetrievalMaxBufferedResponses = n
570+
}
571+
}
572+
}
573+
564574
// WithIndexHeaderLazyDownloadStrategy specifies what block to lazy download its index header.
565575
// Only used when lazy mmap is enabled at the same time.
566576
func WithIndexHeaderLazyDownloadStrategy(strategy indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption {
@@ -612,6 +622,7 @@ func NewBucketStore(
612622
enableChunkHashCalculation: enableChunkHashCalculation,
613623
seriesBatchSize: SeriesBatchSize,
614624
sortingStrategy: sortingStrategyStore,
625+
lazyRetrievalMaxBufferedResponses: 10,
615626
indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader,
616627
requestLoggerFunc: NoopRequestLoggerFunc,
617628
}
@@ -1623,7 +1634,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
16231634
shardMatcher,
16241635
false,
16251636
s.metrics.emptyPostingCount.WithLabelValues(tenant),
1626-
100,
1637+
s.lazyRetrievalMaxBufferedResponses,
16271638
)
16281639
}
16291640

pkg/store/proxy.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ type ProxyStore struct {
101101
quorumChunkDedup bool
102102
enableDedup bool
103103
matcherConverter *storepb.MatcherConverter
104-
lazyRetrievalBufferSize int
104+
lazyRetrievalMaxBufferedResponses int
105105
}
106106

107107
type proxyStoreMetrics struct {
@@ -138,9 +138,9 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
138138
// BucketStoreOption are functions that configure BucketStore.
139139
type ProxyStoreOption func(s *ProxyStore)
140140

141-
func WithLazyRetrievalBufferSize(buferSize int) ProxyStoreOption {
141+
func WithLazyRetrievalMaxBufferedResponsesForProxy(buferSize int) ProxyStoreOption {
142142
return func(s *ProxyStore) {
143-
s.lazyRetrievalBufferSize = buferSize
143+
s.lazyRetrievalMaxBufferedResponses = buferSize
144144
}
145145
}
146146

@@ -392,16 +392,16 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
392392
}
393393
}
394394
defer logGroupReplicaErrors()
395-
lazyRetrievalBufferSize := s.lazyRetrievalBufferSize
396-
if lazyRetrievalBufferSize <= 0 {
395+
lazyRetrievalMaxBufferedResponses := s.lazyRetrievalMaxBufferedResponses
396+
if lazyRetrievalMaxBufferedResponses <= 0 {
397397
// Use 1 as default value for lazy retrieval buffer size.
398398
// Unit tests hit this path so that corner cases can be tested with buffer size 1.
399-
lazyRetrievalBufferSize = 1
399+
lazyRetrievalMaxBufferedResponses = 1
400400
}
401401
for _, st := range stores {
402402
st := st
403403

404-
respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, lazyRetrievalBufferSize)
404+
respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, lazyRetrievalMaxBufferedResponses)
405405
if err != nil {
406406
level.Warn(s.logger).Log("msg", "Store failure", "group", st.GroupKey(), "replica", st.ReplicaKey(), "err", err)
407407
s.metrics.storeFailureCount.WithLabelValues(st.GroupKey(), st.ReplicaKey()).Inc()

0 commit comments

Comments
 (0)