Skip to content

Commit 8ff28cd

Browse files
committed
Limit lazyRespSet memory buffer size
1 parent 1aeb3f8 commit 8ff28cd

File tree

4 files changed

+91
-27
lines changed

4 files changed

+91
-27
lines changed

cmd/thanos/query.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,9 @@ func registerQuery(app *extkingpin.App) {
247247

248248
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for __rollup__ label for aggregated metrics. If set to x, all queries on aggregated metrics will have a __rollup__=x matcher. Leave empty to disable this behavior. Default is empty.").Default("").String()
249249

250+
lazyRetrievalBufferSize := cmd.Flag("query.lazy-retrieval-buffer-size", "The fixed buffer size (in terms of how many store responses) for lazy retrieval strategy. This is to limit the memory usage.").
251+
Default("20").Int()
252+
250253
var storeRateLimits store.SeriesSelectLimits
251254
storeRateLimits.RegisterFlags(cmd)
252255

@@ -387,6 +390,7 @@ func registerQuery(app *extkingpin.App) {
387390
*tenantLabel,
388391
*enableGroupReplicaPartialStrategy,
389392
*rewriteAggregationLabelTo,
393+
*lazyRetrievalBufferSize,
390394
)
391395
})
392396
}
@@ -472,6 +476,7 @@ func runQuery(
472476
tenantLabel string,
473477
groupReplicaPartialResponseStrategy bool,
474478
rewriteAggregationLabelTo string,
479+
lazyRetrievalBufferSize int,
475480
) error {
476481
comp := component.Query
477482
if alertQueryURL == "" {
@@ -559,6 +564,7 @@ func runQuery(
559564
store.WithTSDBSelector(tsdbSelector),
560565
store.WithProxyStoreDebugLogging(debugLogging),
561566
store.WithQuorumChunkDedup(queryDeduplicationFunc == dedup.AlgorithmQuorum),
567+
store.WithLazyRetrievalBufferSize(lazyRetrievalBufferSize),
562568
}
563569

564570
// Parse and sanitize the provided replica labels flags.

pkg/store/bucket.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1623,6 +1623,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
16231623
shardMatcher,
16241624
false,
16251625
s.metrics.emptyPostingCount.WithLabelValues(tenant),
1626+
100,
16261627
)
16271628
}
16281629

pkg/store/proxy.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,15 @@ type ProxyStore struct {
9393
selectorLabels labels.Labels
9494
buffers sync.Pool
9595

96-
responseTimeout time.Duration
97-
metrics *proxyStoreMetrics
98-
retrievalStrategy RetrievalStrategy
99-
debugLogging bool
100-
tsdbSelector *TSDBSelector
101-
quorumChunkDedup bool
102-
enableDedup bool
103-
matcherConverter *storepb.MatcherConverter
96+
responseTimeout time.Duration
97+
metrics *proxyStoreMetrics
98+
retrievalStrategy RetrievalStrategy
99+
debugLogging bool
100+
tsdbSelector *TSDBSelector
101+
quorumChunkDedup bool
102+
enableDedup bool
103+
matcherConverter *storepb.MatcherConverter
104+
lazyRetrievalBufferSize int
104105
}
105106

106107
type proxyStoreMetrics struct {
@@ -137,6 +138,12 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
137138
// BucketStoreOption are functions that configure BucketStore.
138139
type ProxyStoreOption func(s *ProxyStore)
139140

141+
func WithLazyRetrievalBufferSize(buferSize int) ProxyStoreOption {
142+
return func(s *ProxyStore) {
143+
s.lazyRetrievalBufferSize = buferSize
144+
}
145+
}
146+
140147
// WithProxyStoreDebugLogging toggles debug logging.
141148
func WithProxyStoreDebugLogging(enable bool) ProxyStoreOption {
142149
return func(s *ProxyStore) {
@@ -385,11 +392,16 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
385392
}
386393
}
387394
defer logGroupReplicaErrors()
388-
395+
lazyRetrievalBufferSize := s.lazyRetrievalBufferSize
396+
if lazyRetrievalBufferSize <= 0 {
397+
// Use 1 as default value for lazy retrieval buffer size.
398+
// Unit tests hit this path so that corner cases can be tested with buffer size 1.
399+
lazyRetrievalBufferSize = 1
400+
}
389401
for _, st := range stores {
390402
st := st
391403

392-
respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses)
404+
respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, lazyRetrievalBufferSize)
393405
if err != nil {
394406
level.Warn(s.logger).Log("msg", "Store failure", "group", st.GroupKey(), "replica", st.ReplicaKey(), "err", err)
395407
s.metrics.storeFailureCount.WithLabelValues(st.GroupKey(), st.ReplicaKey()).Inc()

pkg/store/proxy_merge.go

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,20 @@ type lazyRespSet struct {
255255
frameTimeout time.Duration
256256

257257
// Internal bookkeeping.
258-
dataOrFinishEvent *sync.Cond
259-
bufferedResponses []*storepb.SeriesResponse
258+
dataOrFinishEvent *sync.Cond
259+
// This event firing means the buffer has a slot for more data.
260+
bufferSlotEvent *sync.Cond
261+
fixedBufferSize int
262+
// This a ring buffer of size fixedBufferSize.
263+
// A ring buffer of size N can hold N - 1 elements at most in order to distinguish being empty from being full.
264+
bufferedResponses []*storepb.SeriesResponse
265+
// ringHead points to the first element in the ring buffer.
266+
// ringTail points to the slot after the last element in the ring buffer.
267+
// if ringHead == ringTail then the buffer is empty.
268+
// if ringHead == (ringTail + 1) % fixedBufferSize then the buffer is full.
269+
ringHead int
270+
ringTail int
271+
closed bool
260272
bufferedResponsesMtx *sync.Mutex
261273
lastResp *storepb.SeriesResponse
262274

@@ -266,24 +278,32 @@ type lazyRespSet struct {
266278
shardMatcher *storepb.ShardMatcher
267279
}
268280

281+
func (l *lazyRespSet) isEmpty() bool {
282+
return l.ringHead == l.ringTail
283+
}
284+
285+
func (l *lazyRespSet) isFull() bool {
286+
return (l.ringTail+1)%l.fixedBufferSize == l.ringHead
287+
}
288+
269289
func (l *lazyRespSet) Empty() bool {
270290
l.bufferedResponsesMtx.Lock()
271291
defer l.bufferedResponsesMtx.Unlock()
272292

273293
// NOTE(GiedriusS): need to wait here for at least one
274294
// response so that we could build the heap properly.
275-
if l.noMoreData && len(l.bufferedResponses) == 0 {
295+
if l.noMoreData && l.isEmpty() {
276296
return true
277297
}
278298

279-
for len(l.bufferedResponses) == 0 {
299+
for l.isEmpty() {
280300
l.dataOrFinishEvent.Wait()
281-
if l.noMoreData && len(l.bufferedResponses) == 0 {
301+
if l.noMoreData && l.isEmpty() {
282302
break
283303
}
284304
}
285305

286-
return len(l.bufferedResponses) == 0 && l.noMoreData
306+
return l.isEmpty() && l.noMoreData
287307
}
288308

289309
// Next either blocks until more data is available or reads
@@ -295,23 +315,24 @@ func (l *lazyRespSet) Next() bool {
295315

296316
l.initialized = true
297317

298-
if l.noMoreData && len(l.bufferedResponses) == 0 {
318+
if l.noMoreData && l.isEmpty() {
299319
l.lastResp = nil
300320

301321
return false
302322
}
303323

304-
for len(l.bufferedResponses) == 0 {
324+
for l.isEmpty() {
305325
l.dataOrFinishEvent.Wait()
306-
if l.noMoreData && len(l.bufferedResponses) == 0 {
326+
if l.noMoreData && l.isEmpty() {
307327
break
308328
}
309329
}
310330

311-
if len(l.bufferedResponses) > 0 {
312-
l.lastResp = l.bufferedResponses[0]
331+
if !l.isEmpty() {
332+
l.lastResp = l.bufferedResponses[l.ringHead]
313333
if l.initialized {
314-
l.bufferedResponses = l.bufferedResponses[1:]
334+
l.ringHead = (l.ringHead + 1) % l.fixedBufferSize
335+
l.bufferSlotEvent.Signal()
315336
}
316337
return true
317338
}
@@ -338,8 +359,12 @@ func newLazyRespSet(
338359
shardMatcher *storepb.ShardMatcher,
339360
applySharding bool,
340361
emptyStreamResponses prometheus.Counter,
362+
fixedBufferSize int,
341363
) respSet {
342-
bufferedResponses := []*storepb.SeriesResponse{}
364+
// A ring buffer of size N can hold N - 1 elements at most in order to distinguish being empty from being full.
365+
// That's why the size is increased by 1 internally.
366+
fixedBufferSize++
367+
bufferedResponses := make([]*storepb.SeriesResponse, fixedBufferSize)
343368
bufferedResponsesMtx := &sync.Mutex{}
344369
dataAvailable := sync.NewCond(bufferedResponsesMtx)
345370

@@ -351,9 +376,14 @@ func newLazyRespSet(
351376
closeSeries: closeSeries,
352377
span: span,
353378
dataOrFinishEvent: dataAvailable,
379+
bufferSlotEvent: sync.NewCond(bufferedResponsesMtx),
354380
bufferedResponsesMtx: bufferedResponsesMtx,
355381
bufferedResponses: bufferedResponses,
356382
shardMatcher: shardMatcher,
383+
fixedBufferSize: fixedBufferSize,
384+
ringHead: 0,
385+
ringTail: 0,
386+
closed: false,
357387
}
358388
respSet.storeLabels = make(map[string]struct{})
359389
for _, ls := range storeLabelSets {
@@ -406,11 +436,16 @@ func newLazyRespSet(
406436
} else {
407437
rerr = errors.Wrapf(err, "receive series from %s", st)
408438
}
409-
410439
l.span.SetTag("err", rerr.Error())
411440

412441
l.bufferedResponsesMtx.Lock()
413-
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
442+
for l.isFull() && !l.closed {
443+
l.bufferSlotEvent.Wait()
444+
}
445+
if !l.closed {
446+
l.bufferedResponses[l.ringTail] = storepb.NewWarnSeriesResponse(rerr)
447+
l.ringTail = (l.ringTail + 1) % l.fixedBufferSize
448+
}
414449
l.noMoreData = true
415450
l.dataOrFinishEvent.Signal()
416451
l.bufferedResponsesMtx.Unlock()
@@ -429,8 +464,14 @@ func newLazyRespSet(
429464
}
430465

431466
l.bufferedResponsesMtx.Lock()
432-
l.bufferedResponses = append(l.bufferedResponses, resp)
433-
l.dataOrFinishEvent.Signal()
467+
for l.isFull() && !l.closed {
468+
l.bufferSlotEvent.Wait()
469+
}
470+
if !l.closed {
471+
l.bufferedResponses[l.ringTail] = resp
472+
l.ringTail = (l.ringTail + 1) % l.fixedBufferSize
473+
l.dataOrFinishEvent.Signal()
474+
}
434475
l.bufferedResponsesMtx.Unlock()
435476
return true
436477
}
@@ -474,6 +515,7 @@ func newAsyncRespSet(
474515
shardInfo *storepb.ShardInfo,
475516
logger log.Logger,
476517
emptyStreamResponses prometheus.Counter,
518+
fixedBufferSizeForLazyRetrieval int,
477519
) (respSet, error) {
478520

479521
var (
@@ -535,6 +577,7 @@ func newAsyncRespSet(
535577
shardMatcher,
536578
applySharding,
537579
emptyStreamResponses,
580+
fixedBufferSizeForLazyRetrieval,
538581
), nil
539582
case EagerRetrieval:
540583
span.SetTag("retrival_strategy", EagerRetrieval)
@@ -560,6 +603,8 @@ func (l *lazyRespSet) Close() {
560603
defer l.bufferedResponsesMtx.Unlock()
561604

562605
l.closeSeries()
606+
l.closed = true
607+
l.bufferSlotEvent.Signal()
563608
l.noMoreData = true
564609
l.dataOrFinishEvent.Signal()
565610

0 commit comments

Comments
 (0)