Skip to content

Commit 2d4d060

Browse files
authored
fixing bug in chuck cache that can cause panic during shutdown (#4398)
* fixing bug in chuck cache that can cause panic during shutdown Signed-off-by: Roger Steneteg <[email protected]> * adding separate quit channel for stopping chunkfetcher to avoid send on closed channel during stop Signed-off-by: Roger Steneteg <[email protected]>
1 parent a4c1bed commit 2d4d060

File tree

5 files changed

+133
-29
lines changed

5 files changed

+133
-29
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
4646
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
4747
* [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366
48+
* [BUGFIX] Ingester: panic during shutdown while fetching batches from cache. #4397
4849
* [BUGFIX] Querier: After query-frontend restart, querier may have lower than configured concurrency. #4417
4950

5051
## 1.10.0 / 2021-08-03

pkg/chunk/cache/cache_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,25 @@ func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk
125125
require.Equal(t, chunks, found)
126126
}
127127

128+
// testChunkFetcherStop checks that stopping the fetcher while fetching chunks don't result an error
129+
func testChunkFetcherStop(t *testing.T, c cache.Cache, keys []string, chunks []chunk.Chunk) {
130+
fetcher, err := chunk.NewChunkFetcher(c, false, chunk.NewMockStorage())
131+
require.NoError(t, err)
132+
133+
done := make(chan struct{})
134+
go func() {
135+
defer close(done)
136+
if _, err := fetcher.FetchChunks(context.Background(), chunks, keys); err != nil {
137+
// Since we stop fetcher while FetchChunks is running, we may not get everything back
138+
// which requires the fetcher to fetch keys from storage, which is missing the keys
139+
// so errors here is expected. Need to check the error because of the lint check.
140+
require.NotNil(t, err)
141+
}
142+
}()
143+
fetcher.Stop()
144+
<-done
145+
}
146+
128147
type byExternalKey []chunk.Chunk
129148

130149
func (a byExternalKey) Len() int { return len(a) }
@@ -155,6 +174,11 @@ func testCache(t *testing.T, cache cache.Cache) {
155174
t.Run("Fetcher", func(t *testing.T) {
156175
testChunkFetcher(t, cache, keys, chunks)
157176
})
177+
t.Run("FetcherStop", func(t *testing.T) {
178+
// Refill the cache to avoid nil pointer error during fetch for getting missing keys from storage
179+
keys, chunks = fillCache(t, cache)
180+
testChunkFetcherStop(t, cache, keys, chunks)
181+
})
158182
}
159183

160184
func TestMemcache(t *testing.T) {

pkg/chunk/cache/memcached.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type Memcached struct {
5555

5656
wg sync.WaitGroup
5757
inputCh chan *work
58+
quit chan struct{}
5859

5960
logger log.Logger
6061
}
@@ -83,19 +84,24 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, reg
8384
}
8485

8586
c.inputCh = make(chan *work)
87+
c.quit = make(chan struct{})
8688
c.wg.Add(cfg.Parallelism)
8789

8890
for i := 0; i < cfg.Parallelism; i++ {
8991
go func() {
90-
for input := range c.inputCh {
91-
res := &result{
92-
batchID: input.batchID,
92+
defer c.wg.Done()
93+
for {
94+
select {
95+
case <-c.quit:
96+
return
97+
case input := <-c.inputCh:
98+
res := &result{
99+
batchID: input.batchID,
100+
}
101+
res.found, res.bufs, res.missed = c.fetch(input.ctx, input.keys)
102+
input.resultCh <- res
93103
}
94-
res.found, res.bufs, res.missed = c.fetch(input.ctx, input.keys)
95-
input.resultCh <- res
96104
}
97-
98-
c.wg.Done()
99105
}()
100106
}
101107

@@ -187,11 +193,15 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found
187193
go func() {
188194
for i, j := 0, 0; i < len(keys); i += batchSize {
189195
batchKeys := keys[i:math.Min(i+batchSize, len(keys))]
190-
c.inputCh <- &work{
196+
select {
197+
case <-c.quit:
198+
return
199+
case c.inputCh <- &work{
191200
keys: batchKeys,
192201
ctx: ctx,
193202
resultCh: resultsCh,
194203
batchID: j,
204+
}:
195205
}
196206
j++
197207
}
@@ -205,13 +215,21 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found
205215

206216
// We need to order found by the input keys order.
207217
results := make([]*result, numResults)
218+
loopResults:
208219
for i := 0; i < numResults; i++ {
209-
result := <-resultsCh
210-
results[result.batchID] = result
220+
select {
221+
case <-c.quit:
222+
break loopResults
223+
case result := <-resultsCh:
224+
results[result.batchID] = result
225+
}
211226
}
212227
close(resultsCh)
213228

214229
for _, result := range results {
230+
if result == nil {
231+
continue
232+
}
215233
found = append(found, result.found...)
216234
bufs = append(bufs, result.bufs...)
217235
missed = append(missed, result.missed...)
@@ -239,11 +257,15 @@ func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) {
239257

240258
// Stop does nothing.
241259
func (c *Memcached) Stop() {
242-
if c.inputCh == nil {
260+
if c.quit == nil {
243261
return
244262
}
245263

246-
close(c.inputCh)
264+
select {
265+
case <-c.quit:
266+
default:
267+
close(c.quit)
268+
}
247269
c.wg.Wait()
248270
}
249271

pkg/chunk/cache/memcached_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,39 @@ func testMemcacheFailing(t *testing.T, memcache *cache.Memcached) {
156156
}
157157
}
158158
}
159+
160+
func TestMemcacheStop(t *testing.T) {
161+
t.Run("unbatched", func(t *testing.T) {
162+
client := newMockMemcacheFailing()
163+
memcache := cache.NewMemcached(cache.MemcachedConfig{}, client,
164+
"test", nil, log.NewNopLogger())
165+
166+
testMemcachedStopping(t, memcache)
167+
})
168+
169+
t.Run("batched", func(t *testing.T) {
170+
client := newMockMemcacheFailing()
171+
memcache := cache.NewMemcached(cache.MemcachedConfig{
172+
BatchSize: 10,
173+
Parallelism: 5,
174+
}, client, "test", nil, log.NewNopLogger())
175+
176+
testMemcachedStopping(t, memcache)
177+
})
178+
}
179+
180+
func testMemcachedStopping(t *testing.T, memcache *cache.Memcached) {
181+
numKeys := 1000
182+
ctx := context.Background()
183+
keys := make([]string, 0, numKeys)
184+
bufs := make([][]byte, 0, numKeys)
185+
for i := 0; i < numKeys; i++ {
186+
keys = append(keys, fmt.Sprint(i))
187+
bufs = append(bufs, []byte(fmt.Sprint(i)))
188+
}
189+
190+
memcache.Store(ctx, keys, bufs)
191+
192+
go memcache.Fetch(ctx, keys)
193+
memcache.Stop()
194+
}

pkg/chunk/chunk_store_utils.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type Fetcher struct {
8686

8787
wait sync.WaitGroup
8888
decodeRequests chan decodeRequest
89+
quit chan struct{}
8990
}
9091

9192
type decodeRequest struct {
@@ -105,6 +106,7 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, storage Client) (*Fetc
105106
cache: cacher,
106107
cacheStubs: cacheStubs,
107108
decodeRequests: make(chan decodeRequest),
109+
quit: make(chan struct{}),
108110
}
109111

110112
c.wait.Add(chunkDecodeParallelism)
@@ -117,22 +119,32 @@ func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, storage Client) (*Fetc
117119

118120
// Stop the ChunkFetcher.
119121
func (c *Fetcher) Stop() {
120-
close(c.decodeRequests)
122+
select {
123+
case <-c.quit:
124+
default:
125+
close(c.quit)
126+
}
127+
121128
c.wait.Wait()
122129
c.cache.Stop()
123130
}
124131

125132
func (c *Fetcher) worker() {
126133
defer c.wait.Done()
127134
decodeContext := NewDecodeContext()
128-
for req := range c.decodeRequests {
129-
err := req.chunk.Decode(decodeContext, req.buf)
130-
if err != nil {
131-
cacheCorrupt.Inc()
132-
}
133-
req.responses <- decodeResponse{
134-
chunk: req.chunk,
135-
err: err,
135+
for {
136+
select {
137+
case <-c.quit:
138+
return
139+
case req := <-c.decodeRequests:
140+
err := req.chunk.Decode(decodeContext, req.buf)
141+
if err != nil {
142+
cacheCorrupt.Inc()
143+
}
144+
req.responses <- decodeResponse{
145+
chunk: req.chunk,
146+
err: err,
147+
}
136148
}
137149
}
138150
}
@@ -230,22 +242,31 @@ func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys
230242

231243
go func() {
232244
for _, request := range requests {
233-
c.decodeRequests <- request
245+
select {
246+
case <-c.quit:
247+
return
248+
case c.decodeRequests <- request:
249+
}
234250
}
235251
}()
236252

237253
var (
238254
err error
239255
found []Chunk
240256
)
241-
for i := 0; i < len(requests); i++ {
242-
response := <-responses
243257

244-
// Don't exit early, as we don't want to block the workers.
245-
if response.err != nil {
246-
err = response.err
247-
} else {
248-
found = append(found, response.chunk)
258+
loopResponses:
259+
for i := 0; i < len(requests); i++ {
260+
select {
261+
case <-c.quit:
262+
break loopResponses
263+
case response := <-responses:
264+
// Don't exit early, as we don't want to block the workers.
265+
if response.err != nil {
266+
err = response.err
267+
} else {
268+
found = append(found, response.chunk)
269+
}
249270
}
250271
}
251272
return found, missing, err

0 commit comments

Comments
 (0)