Skip to content

Commit 302c89e

Browse files
committed
mvcc: avoid double decrement of watcher gauge on close/cancel race
This occurs specifically when the watch is for a compacted revision, as there's a possible interleaving of cancel/close that invokes the `cancelWatch` function twice. It's fairly difficult to provoke the race condition but it is possible to observe on `main` the racing test can fail with a negative gauge: ``` $ go test ./... -run TestNewWatcherCountGauge/compacted_watch,_close/cancel_race --- FAIL: TestNewWatcherCountGauge (0.34s) watchable_store_test.go:86: # HELP etcd_debugging_mvcc_watcher_total Total number of watchers. # TYPE etcd_debugging_mvcc_watcher_total gauge -etcd_debugging_mvcc_watcher_total -1 +etcd_debugging_mvcc_watcher_total 0 FAIL FAIL go.etcd.io/etcd/server/v3/storage/mvcc 0.830s ? go.etcd.io/etcd/server/v3/storage/mvcc/testutil [no test files] FAIL ``` It seems as though it is partially expected for the cancel function to be invoked multiple times and to handle that safely (i.e., the existing `ch == nil` check) - the bug here is that in the `if/else if` branches it comes "too late", and multiple invocations where `wa.compacted` is true will both decrement the counter. Shifting the case up one ensures that we can't follow that decrement branch multiple times. In fact, it seems logically more sensible to put this `wa.ch == nil` case _first_, as a guard for the function being invoked multiple times, but moving i before the sync/unsynced watch set delete functions could have a greater inadvertent functional impact (i.e., if we never deleted cancelled watches from these sets it would presumably introduce a leak), so from an abundance of caution I've made the smallest change I think will fix my issue. Signed-off-by: Kieran Gorman <[email protected]>
1 parent 7b35142 commit 302c89e

File tree

2 files changed

+174
-3
lines changed

2 files changed

+174
-3
lines changed

server/storage/mvcc/watchable_store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,12 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
162162
} else if s.synced.delete(wa) {
163163
watcherGauge.Dec()
164164
break
165-
} else if wa.compacted {
166-
watcherGauge.Dec()
167-
break
168165
} else if wa.ch == nil {
169166
// already canceled (e.g., cancel/close race)
170167
break
168+
} else if wa.compacted {
169+
watcherGauge.Dec()
170+
break
171171
}
172172

173173
if !wa.victim {

server/storage/mvcc/watchable_store_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ package mvcc
1717
import (
1818
"fmt"
1919
"reflect"
20+
"strings"
2021
"sync"
2122
"testing"
2223
"time"
2324

25+
"github.com/prometheus/client_golang/prometheus/testutil"
2426
"github.com/stretchr/testify/assert"
2527
"github.com/stretchr/testify/require"
2628
"go.uber.org/zap/zaptest"
@@ -73,6 +75,175 @@ func TestNewWatcherCancel(t *testing.T) {
7375
}
7476
}
7577

78+
func TestNewWatcherCountGauge(t *testing.T) {
79+
expectWatchGauge := func(watchers int) {
80+
expected := fmt.Sprintf(`# HELP etcd_debugging_mvcc_watcher_total Total number of watchers.
81+
# TYPE etcd_debugging_mvcc_watcher_total gauge
82+
etcd_debugging_mvcc_watcher_total %d
83+
`, watchers)
84+
err := testutil.CollectAndCompare(watcherGauge, strings.NewReader(expected), "etcd_debugging_mvcc_watcher_total")
85+
if err != nil {
86+
t.Error(err)
87+
}
88+
}
89+
90+
t.Run("regular watch", func(t *testing.T) {
91+
b, _ := betesting.NewDefaultTmpBackend(t)
92+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
93+
defer cleanup(s, b)
94+
95+
// watcherGauge is a package variable and its value may change depending on
96+
// the execution of other tests
97+
initialGaugeState := int(testutil.ToFloat64(watcherGauge))
98+
99+
testKey := []byte("foo")
100+
testValue := []byte("bar")
101+
s.Put(testKey, testValue, lease.NoLease)
102+
103+
// we expect the gauge state to still be in its initial state
104+
expectWatchGauge(initialGaugeState)
105+
106+
w := s.NewWatchStream()
107+
defer w.Close()
108+
109+
wt, _ := w.Watch(0, testKey, nil, 0)
110+
111+
// after creating watch, the gauge state should have increased
112+
expectWatchGauge(initialGaugeState + 1)
113+
114+
if err := w.Cancel(wt); err != nil {
115+
t.Error(err)
116+
}
117+
118+
// after cancelling watch, the gauge state should have decreased
119+
expectWatchGauge(initialGaugeState)
120+
121+
w.Cancel(wt)
122+
123+
// cancelling the watch twice shouldn't decrement the counter twice
124+
expectWatchGauge(initialGaugeState)
125+
})
126+
127+
t.Run("compacted watch", func(t *testing.T) {
128+
b, _ := betesting.NewDefaultTmpBackend(t)
129+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
130+
defer cleanup(s, b)
131+
132+
// watcherGauge is a package variable and its value may change depending on
133+
// the execution of other tests
134+
initialGaugeState := int(testutil.ToFloat64(watcherGauge))
135+
136+
testKey := []byte("foo")
137+
testValue := []byte("bar")
138+
139+
s.Put(testKey, testValue, lease.NoLease)
140+
rev := s.Put(testKey, testValue, lease.NoLease)
141+
142+
// compact up to the revision of the key we just put
143+
_, err := s.Compact(traceutil.TODO(), rev)
144+
if err != nil {
145+
t.Error(err)
146+
}
147+
148+
// we expect the gauge state to still be in its initial state
149+
expectWatchGauge(initialGaugeState)
150+
151+
w := s.NewWatchStream()
152+
defer w.Close()
153+
154+
wt, _ := w.Watch(0, testKey, nil, rev-1)
155+
156+
// wait for the watcher to be marked as compacted
157+
select {
158+
case resp := <-w.Chan():
159+
if resp.CompactRevision == 0 {
160+
t.Errorf("resp.Compacted = %v, want %v", resp.CompactRevision, rev)
161+
}
162+
case <-time.After(time.Second):
163+
t.Fatalf("failed to receive response (timeout)")
164+
}
165+
166+
// after creating watch, the gauge state should have increased
167+
expectWatchGauge(initialGaugeState + 1)
168+
169+
if err := w.Cancel(wt); err != nil {
170+
t.Error(err)
171+
}
172+
173+
// after cancelling watch, the gauge state should have decreased
174+
expectWatchGauge(initialGaugeState)
175+
176+
w.Cancel(wt)
177+
178+
// cancelling the watch twice shouldn't decrement the counter twice
179+
expectWatchGauge(initialGaugeState)
180+
})
181+
182+
t.Run("compacted watch, close/cancel race", func(t *testing.T) {
183+
b, _ := betesting.NewDefaultTmpBackend(t)
184+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
185+
defer cleanup(s, b)
186+
187+
// watcherGauge is a package variable and its value may change depending on
188+
// the execution of other tests
189+
initialGaugeState := int(testutil.ToFloat64(watcherGauge))
190+
191+
testKey := []byte("foo")
192+
testValue := []byte("bar")
193+
194+
s.Put(testKey, testValue, lease.NoLease)
195+
rev := s.Put(testKey, testValue, lease.NoLease)
196+
197+
// compact up to the revision of the key we just put
198+
_, err := s.Compact(traceutil.TODO(), rev)
199+
if err != nil {
200+
t.Error(err)
201+
}
202+
203+
// we expect the gauge state to still be in its initial state
204+
expectWatchGauge(initialGaugeState)
205+
206+
w := s.NewWatchStream()
207+
208+
wt, _ := w.Watch(0, testKey, nil, rev-1)
209+
210+
// wait for the watcher to be marked as compacted
211+
select {
212+
case resp := <-w.Chan():
213+
if resp.CompactRevision == 0 {
214+
t.Errorf("resp.Compacted = %v, want %v", resp.CompactRevision, rev)
215+
}
216+
case <-time.After(time.Second):
217+
t.Fatalf("failed to receive response (timeout)")
218+
}
219+
220+
// after creating watch, the gauge state should have increased
221+
expectWatchGauge(initialGaugeState + 1)
222+
223+
// now race cancelling and closing the watcher and watch stream.
224+
// in rare scenarios the watcher cancel function can be invoked
225+
// multiple times, leading to a potentially negative gauge state,
226+
// see: https://github.com/etcd-io/etcd/issues/19577
227+
wg := sync.WaitGroup{}
228+
wg.Add(2)
229+
230+
go func() {
231+
w.Cancel(wt)
232+
wg.Done()
233+
}()
234+
235+
go func() {
236+
w.Close()
237+
wg.Done()
238+
}()
239+
240+
wg.Wait()
241+
242+
// the gauge should be decremented to its original state
243+
expectWatchGauge(initialGaugeState)
244+
})
245+
}
246+
76247
// TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced.
77248
func TestCancelUnsynced(t *testing.T) {
78249
b, _ := betesting.NewDefaultTmpBackend(t)

0 commit comments

Comments
 (0)