Skip to content

Commit 8821811

Browse files
committed
fix coroutine leak
1 parent 1aeb3f8 commit 8821811

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

pkg/store/proxy_merge.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -558,13 +558,13 @@ func newAsyncRespSet(
558558
func (l *lazyRespSet) Close() {
559559
l.bufferedResponsesMtx.Lock()
560560
defer l.bufferedResponsesMtx.Unlock()
561+
_ = l.cl.CloseSend()
561562

562563
l.closeSeries()
563564
l.noMoreData = true
564565
l.dataOrFinishEvent.Signal()
565566

566567
l.shardMatcher.Close()
567-
_ = l.cl.CloseSend()
568568
}
569569

570570
// eagerRespSet is a SeriesSet that blocks until all data is retrieved from
@@ -750,11 +750,11 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]
750750
}
751751

752752
func (l *eagerRespSet) Close() {
753+
_ = l.cl.CloseSend()
753754
if l.closeSeries != nil {
754755
l.closeSeries()
755756
}
756757
l.shardMatcher.Close()
757-
_ = l.cl.CloseSend()
758758
}
759759

760760
func (l *eagerRespSet) At() *storepb.SeriesResponse {

pkg/store/storepb/inprocess.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"io"
99
"iter"
10+
"sync"
1011

1112
"google.golang.org/grpc"
1213
)
@@ -38,18 +39,22 @@ type inProcessClient struct {
3839
ctx context.Context
3940
next func() (*SeriesResponse, error, bool)
4041
stop func()
42+
mu sync.Mutex
4143
}
4244

4345
func newInProcessClient(ctx context.Context, next func() (*SeriesResponse, error, bool), stop func()) *inProcessClient {
4446
return &inProcessClient{
4547
ctx: ctx,
4648
next: next,
4749
stop: stop,
50+
mu: sync.Mutex{},
4851
}
4952
}
5053

5154
func (c *inProcessClient) Recv() (*SeriesResponse, error) {
55+
c.mu.Lock()
5256
resp, err, ok := c.next()
57+
c.mu.Unlock()
5358
if err != nil {
5459
c.stop()
5560
return nil, err
@@ -68,7 +73,9 @@ func (c *inProcessClient) Context() context.Context {
6873
}
6974

7075
func (c *inProcessClient) CloseSend() error {
76+
c.mu.Lock()
7177
c.stop()
78+
c.mu.Unlock()
7279
return nil
7380
}
7481

0 commit comments

Comments
 (0)