@@ -255,8 +255,20 @@ type lazyRespSet struct {
255255 frameTimeout time.Duration
256256
257257 // Internal bookkeeping.
258- dataOrFinishEvent * sync.Cond
259- bufferedResponses []* storepb.SeriesResponse
258+ dataOrFinishEvent * sync.Cond
259+ // This event firing means the buffer has a slot for more data.
260+ bufferSlotEvent * sync.Cond
261+ fixedBufferSize int
262+ // This a ring buffer of size fixedBufferSize.
263+ // A ring buffer of size N can hold N - 1 elements at most in order to distinguish being empty from being full.
264+ bufferedResponses []* storepb.SeriesResponse
265+ // ringHead points to the first element in the ring buffer.
266+ // ringTail points to the slot after the last element in the ring buffer.
267+ // if ringHead == ringTail then the buffer is empty.
268+ // if ringHead == (ringTail + 1) % fixedBufferSize then the buffer is full.
269+ ringHead int
270+ ringTail int
271+ closed bool
260272 bufferedResponsesMtx * sync.Mutex
261273 lastResp * storepb.SeriesResponse
262274
@@ -266,24 +278,32 @@ type lazyRespSet struct {
266278 shardMatcher * storepb.ShardMatcher
267279}
268280
281+ func (l * lazyRespSet ) isEmpty () bool {
282+ return l .ringHead == l .ringTail
283+ }
284+
285+ func (l * lazyRespSet ) isFull () bool {
286+ return (l .ringTail + 1 )% l .fixedBufferSize == l .ringHead
287+ }
288+
269289func (l * lazyRespSet ) Empty () bool {
270290 l .bufferedResponsesMtx .Lock ()
271291 defer l .bufferedResponsesMtx .Unlock ()
272292
273293 // NOTE(GiedriusS): need to wait here for at least one
274294 // response so that we could build the heap properly.
275- if l .noMoreData && len ( l . bufferedResponses ) == 0 {
295+ if l .noMoreData && l . isEmpty () {
276296 return true
277297 }
278298
279- for len ( l . bufferedResponses ) == 0 {
299+ for l . isEmpty () {
280300 l .dataOrFinishEvent .Wait ()
281- if l .noMoreData && len ( l . bufferedResponses ) == 0 {
301+ if l .noMoreData && l . isEmpty () {
282302 break
283303 }
284304 }
285305
286- return len ( l . bufferedResponses ) == 0 && l .noMoreData
306+ return l . isEmpty () && l .noMoreData
287307}
288308
289309// Next either blocks until more data is available or reads
@@ -295,23 +315,24 @@ func (l *lazyRespSet) Next() bool {
295315
296316 l .initialized = true
297317
298- if l .noMoreData && len ( l . bufferedResponses ) == 0 {
318+ if l .noMoreData && l . isEmpty () {
299319 l .lastResp = nil
300320
301321 return false
302322 }
303323
304- for len ( l . bufferedResponses ) == 0 {
324+ for l . isEmpty () {
305325 l .dataOrFinishEvent .Wait ()
306- if l .noMoreData && len ( l . bufferedResponses ) == 0 {
326+ if l .noMoreData && l . isEmpty () {
307327 break
308328 }
309329 }
310330
311- if len ( l . bufferedResponses ) > 0 {
312- l .lastResp = l .bufferedResponses [0 ]
331+ if ! l . isEmpty () {
332+ l .lastResp = l .bufferedResponses [l . ringHead ]
313333 if l .initialized {
314- l .bufferedResponses = l .bufferedResponses [1 :]
334+ l .ringHead = (l .ringHead + 1 ) % l .fixedBufferSize
335+ l .bufferSlotEvent .Signal ()
315336 }
316337 return true
317338 }
@@ -338,8 +359,12 @@ func newLazyRespSet(
338359 shardMatcher * storepb.ShardMatcher ,
339360 applySharding bool ,
340361 emptyStreamResponses prometheus.Counter ,
362+ fixedBufferSize int ,
341363) respSet {
342- bufferedResponses := []* storepb.SeriesResponse {}
364+ // A ring buffer of size N can hold N - 1 elements at most in order to distinguish being empty from being full.
365+ // That's why the size is increased by 1 internally.
366+ fixedBufferSize ++
367+ bufferedResponses := make ([]* storepb.SeriesResponse , fixedBufferSize )
343368 bufferedResponsesMtx := & sync.Mutex {}
344369 dataAvailable := sync .NewCond (bufferedResponsesMtx )
345370
@@ -351,9 +376,14 @@ func newLazyRespSet(
351376 closeSeries : closeSeries ,
352377 span : span ,
353378 dataOrFinishEvent : dataAvailable ,
379+ bufferSlotEvent : sync .NewCond (bufferedResponsesMtx ),
354380 bufferedResponsesMtx : bufferedResponsesMtx ,
355381 bufferedResponses : bufferedResponses ,
356382 shardMatcher : shardMatcher ,
383+ fixedBufferSize : fixedBufferSize ,
384+ ringHead : 0 ,
385+ ringTail : 0 ,
386+ closed : false ,
357387 }
358388 respSet .storeLabels = make (map [string ]struct {})
359389 for _ , ls := range storeLabelSets {
@@ -406,11 +436,16 @@ func newLazyRespSet(
406436 } else {
407437 rerr = errors .Wrapf (err , "receive series from %s" , st )
408438 }
409-
410439 l .span .SetTag ("err" , rerr .Error ())
411440
412441 l .bufferedResponsesMtx .Lock ()
413- l .bufferedResponses = append (l .bufferedResponses , storepb .NewWarnSeriesResponse (rerr ))
442+ for l .isFull () && ! l .closed {
443+ l .bufferSlotEvent .Wait ()
444+ }
445+ if ! l .closed {
446+ l .bufferedResponses [l .ringTail ] = storepb .NewWarnSeriesResponse (rerr )
447+ l .ringTail = (l .ringTail + 1 ) % l .fixedBufferSize
448+ }
414449 l .noMoreData = true
415450 l .dataOrFinishEvent .Signal ()
416451 l .bufferedResponsesMtx .Unlock ()
@@ -429,8 +464,14 @@ func newLazyRespSet(
429464 }
430465
431466 l .bufferedResponsesMtx .Lock ()
432- l .bufferedResponses = append (l .bufferedResponses , resp )
433- l .dataOrFinishEvent .Signal ()
467+ for l .isFull () && ! l .closed {
468+ l .bufferSlotEvent .Wait ()
469+ }
470+ if ! l .closed {
471+ l .bufferedResponses [l .ringTail ] = resp
472+ l .ringTail = (l .ringTail + 1 ) % l .fixedBufferSize
473+ l .dataOrFinishEvent .Signal ()
474+ }
434475 l .bufferedResponsesMtx .Unlock ()
435476 return true
436477 }
@@ -474,6 +515,7 @@ func newAsyncRespSet(
474515 shardInfo * storepb.ShardInfo ,
475516 logger log.Logger ,
476517 emptyStreamResponses prometheus.Counter ,
518+ lazyRetrievalMaxBufferedResponses int ,
477519) (respSet , error ) {
478520
479521 var (
@@ -525,6 +567,11 @@ func newAsyncRespSet(
525567 switch retrievalStrategy {
526568 case LazyRetrieval :
527569 span .SetTag ("retrival_strategy" , LazyRetrieval )
570+ if lazyRetrievalMaxBufferedResponses < 1 {
571+ // Some unit and e2e tests hit this path.
572+ lazyRetrievalMaxBufferedResponses = 1
573+ }
574+
528575 return newLazyRespSet (
529576 span ,
530577 frameTimeout ,
@@ -535,6 +582,7 @@ func newAsyncRespSet(
535582 shardMatcher ,
536583 applySharding ,
537584 emptyStreamResponses ,
585+ lazyRetrievalMaxBufferedResponses ,
538586 ), nil
539587 case EagerRetrieval :
540588 span .SetTag ("retrival_strategy" , EagerRetrieval )
@@ -560,6 +608,8 @@ func (l *lazyRespSet) Close() {
560608 defer l .bufferedResponsesMtx .Unlock ()
561609
562610 l .closeSeries ()
611+ l .closed = true
612+ l .bufferSlotEvent .Signal ()
563613 l .noMoreData = true
564614 l .dataOrFinishEvent .Signal ()
565615
0 commit comments