Skip to content

Commit 322b79c

Browse files
authored
Merge pull request #20067 from kjgorman/cherry-pick-19600-to-release-3.6
[release-3.6] mvcc: avoid double decrement of watcher gauge on close/cancel race (3.6 backport)
2 parents 8767dfa + 302c89e commit 322b79c

File tree

2 files changed

+174
-3
lines changed

2 files changed

+174
-3
lines changed

server/storage/mvcc/watchable_store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,12 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
162162
} else if s.synced.delete(wa) {
163163
watcherGauge.Dec()
164164
break
165-
} else if wa.compacted {
166-
watcherGauge.Dec()
167-
break
168165
} else if wa.ch == nil {
169166
// already canceled (e.g., cancel/close race)
170167
break
168+
} else if wa.compacted {
169+
watcherGauge.Dec()
170+
break
171171
}
172172

173173
if !wa.victim {

server/storage/mvcc/watchable_store_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ package mvcc
1717
import (
1818
"fmt"
1919
"reflect"
20+
"strings"
2021
"sync"
2122
"testing"
2223
"time"
2324

25+
"github.com/prometheus/client_golang/prometheus/testutil"
2426
"github.com/stretchr/testify/assert"
2527
"github.com/stretchr/testify/require"
2628
"go.uber.org/zap/zaptest"
@@ -73,6 +75,175 @@ func TestNewWatcherCancel(t *testing.T) {
7375
}
7476
}
7577

78+
func TestNewWatcherCountGauge(t *testing.T) {
79+
expectWatchGauge := func(watchers int) {
80+
expected := fmt.Sprintf(`# HELP etcd_debugging_mvcc_watcher_total Total number of watchers.
81+
# TYPE etcd_debugging_mvcc_watcher_total gauge
82+
etcd_debugging_mvcc_watcher_total %d
83+
`, watchers)
84+
err := testutil.CollectAndCompare(watcherGauge, strings.NewReader(expected), "etcd_debugging_mvcc_watcher_total")
85+
if err != nil {
86+
t.Error(err)
87+
}
88+
}
89+
90+
t.Run("regular watch", func(t *testing.T) {
91+
b, _ := betesting.NewDefaultTmpBackend(t)
92+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
93+
defer cleanup(s, b)
94+
95+
// watcherGauge is a package variable and its value may change depending on
96+
// the execution of other tests
97+
initialGaugeState := int(testutil.ToFloat64(watcherGauge))
98+
99+
testKey := []byte("foo")
100+
testValue := []byte("bar")
101+
s.Put(testKey, testValue, lease.NoLease)
102+
103+
// we expect the gauge state to still be in its initial state
104+
expectWatchGauge(initialGaugeState)
105+
106+
w := s.NewWatchStream()
107+
defer w.Close()
108+
109+
wt, _ := w.Watch(0, testKey, nil, 0)
110+
111+
// after creating watch, the gauge state should have increased
112+
expectWatchGauge(initialGaugeState + 1)
113+
114+
if err := w.Cancel(wt); err != nil {
115+
t.Error(err)
116+
}
117+
118+
// after cancelling watch, the gauge state should have decreased
119+
expectWatchGauge(initialGaugeState)
120+
121+
w.Cancel(wt)
122+
123+
// cancelling the watch twice shouldn't decrement the counter twice
124+
expectWatchGauge(initialGaugeState)
125+
})
126+
127+
t.Run("compacted watch", func(t *testing.T) {
128+
b, _ := betesting.NewDefaultTmpBackend(t)
129+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
130+
defer cleanup(s, b)
131+
132+
// watcherGauge is a package variable and its value may change depending on
133+
// the execution of other tests
134+
initialGaugeState := int(testutil.ToFloat64(watcherGauge))
135+
136+
testKey := []byte("foo")
137+
testValue := []byte("bar")
138+
139+
s.Put(testKey, testValue, lease.NoLease)
140+
rev := s.Put(testKey, testValue, lease.NoLease)
141+
142+
// compact up to the revision of the key we just put
143+
_, err := s.Compact(traceutil.TODO(), rev)
144+
if err != nil {
145+
t.Error(err)
146+
}
147+
148+
// we expect the gauge state to still be in its initial state
149+
expectWatchGauge(initialGaugeState)
150+
151+
w := s.NewWatchStream()
152+
defer w.Close()
153+
154+
wt, _ := w.Watch(0, testKey, nil, rev-1)
155+
156+
// wait for the watcher to be marked as compacted
157+
select {
158+
case resp := <-w.Chan():
159+
if resp.CompactRevision == 0 {
160+
t.Errorf("resp.Compacted = %v, want %v", resp.CompactRevision, rev)
161+
}
162+
case <-time.After(time.Second):
163+
t.Fatalf("failed to receive response (timeout)")
164+
}
165+
166+
// after creating watch, the gauge state should have increased
167+
expectWatchGauge(initialGaugeState + 1)
168+
169+
if err := w.Cancel(wt); err != nil {
170+
t.Error(err)
171+
}
172+
173+
// after cancelling watch, the gauge state should have decreased
174+
expectWatchGauge(initialGaugeState)
175+
176+
w.Cancel(wt)
177+
178+
// cancelling the watch twice shouldn't decrement the counter twice
179+
expectWatchGauge(initialGaugeState)
180+
})
181+
182+
t.Run("compacted watch, close/cancel race", func(t *testing.T) {
183+
b, _ := betesting.NewDefaultTmpBackend(t)
184+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
185+
defer cleanup(s, b)
186+
187+
// watcherGauge is a package variable and its value may change depending on
188+
// the execution of other tests
189+
initialGaugeState := int(testutil.ToFloat64(watcherGauge))
190+
191+
testKey := []byte("foo")
192+
testValue := []byte("bar")
193+
194+
s.Put(testKey, testValue, lease.NoLease)
195+
rev := s.Put(testKey, testValue, lease.NoLease)
196+
197+
// compact up to the revision of the key we just put
198+
_, err := s.Compact(traceutil.TODO(), rev)
199+
if err != nil {
200+
t.Error(err)
201+
}
202+
203+
// we expect the gauge state to still be in its initial state
204+
expectWatchGauge(initialGaugeState)
205+
206+
w := s.NewWatchStream()
207+
208+
wt, _ := w.Watch(0, testKey, nil, rev-1)
209+
210+
// wait for the watcher to be marked as compacted
211+
select {
212+
case resp := <-w.Chan():
213+
if resp.CompactRevision == 0 {
214+
t.Errorf("resp.Compacted = %v, want %v", resp.CompactRevision, rev)
215+
}
216+
case <-time.After(time.Second):
217+
t.Fatalf("failed to receive response (timeout)")
218+
}
219+
220+
// after creating watch, the gauge state should have increased
221+
expectWatchGauge(initialGaugeState + 1)
222+
223+
// now race cancelling and closing the watcher and watch stream.
224+
// in rare scenarios the watcher cancel function can be invoked
225+
// multiple times, leading to a potentially negative gauge state,
226+
// see: https://github.com/etcd-io/etcd/issues/19577
227+
wg := sync.WaitGroup{}
228+
wg.Add(2)
229+
230+
go func() {
231+
w.Cancel(wt)
232+
wg.Done()
233+
}()
234+
235+
go func() {
236+
w.Close()
237+
wg.Done()
238+
}()
239+
240+
wg.Wait()
241+
242+
// the gauge should be decremented to its original state
243+
expectWatchGauge(initialGaugeState)
244+
})
245+
}
246+
76247
// TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced.
77248
func TestCancelUnsynced(t *testing.T) {
78249
b, _ := betesting.NewDefaultTmpBackend(t)

0 commit comments

Comments
 (0)