Skip to content

Commit 50c3726

Browse files
serathiusk8s-infra-cherrypick-robot
authored andcommitted
Avoid lowering revision of watchers in the future after restore
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 61dae25 commit 50c3726

File tree

2 files changed

+100
-87
lines changed

2 files changed

+100
-87
lines changed

server/storage/mvcc/watchable_store.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ var (
3939

4040
// maxWatchersPerSync is the number of watchers to sync in a single batch
4141
maxWatchersPerSync = 512
42+
43+
// maxResyncPeriod is the period of executing resync.
44+
watchResyncPeriod = 100 * time.Millisecond
4245
)
4346

4447
func ChanBufLen() int { return chanBufLen }
@@ -221,8 +224,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
221224
func (s *watchableStore) syncWatchersLoop() {
222225
defer s.wg.Done()
223226

224-
waitDuration := 100 * time.Millisecond
225-
delayTicker := time.NewTicker(waitDuration)
227+
delayTicker := time.NewTicker(watchResyncPeriod)
226228
defer delayTicker.Stop()
227229
var evs []mvccpb.Event
228230

@@ -238,7 +240,7 @@ func (s *watchableStore) syncWatchersLoop() {
238240
}
239241
syncDuration := time.Since(st)
240242

241-
delayTicker.Reset(waitDuration)
243+
delayTicker.Reset(watchResyncPeriod)
242244
// more work pending?
243245
if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
244246
// be fair to other store operations by yielding time taken
@@ -370,7 +372,7 @@ func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event)
370372
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
371373
continue
372374
}
373-
w.minRev = curRev + 1
375+
w.minRev = max(curRev+1, w.minRev)
374376

375377
eb, ok := wb[w]
376378
if !ok {

server/storage/mvcc/watchable_store_test.go

Lines changed: 94 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/google/go-cmp/cmp"
2526
"github.com/prometheus/client_golang/prometheus/testutil"
2627
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/require"
@@ -572,105 +573,115 @@ func TestWatchFutureRev(t *testing.T) {
572573
}
573574

574575
func TestWatchRestore(t *testing.T) {
575-
test := func(delay time.Duration) func(t *testing.T) {
576-
return func(t *testing.T) {
577-
b, _ := betesting.NewDefaultTmpBackend(t)
578-
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
579-
defer cleanup(s, b)
576+
resyncDelay := watchResyncPeriod * 3 / 2
580577

581-
testKey := []byte("foo")
582-
testValue := []byte("bar")
583-
w := s.NewWatchStream()
584-
defer w.Close()
585-
w.Watch(0, testKey, nil, 1)
578+
t.Run("NoResync", func(t *testing.T) {
579+
testWatchRestore(t, 0, 0)
580+
})
581+
t.Run("ResyncBefore", func(t *testing.T) {
582+
testWatchRestore(t, resyncDelay, 0)
583+
})
584+
t.Run("ResyncAfter", func(t *testing.T) {
585+
testWatchRestore(t, 0, resyncDelay)
586+
})
587+
588+
t.Run("ResyncBeforeAndAfter", func(t *testing.T) {
589+
testWatchRestore(t, resyncDelay, resyncDelay)
590+
})
591+
}
586592

587-
time.Sleep(delay)
588-
wantRev := s.Put(testKey, testValue, lease.NoLease)
593+
func testWatchRestore(t *testing.T, delayBeforeRestore, delayAfterRestore time.Duration) {
594+
b, _ := betesting.NewDefaultTmpBackend(t)
595+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
596+
defer cleanup(s, b)
589597

590-
s.Restore(b)
591-
events := readEventsForSecond(w.Chan())
592-
if len(events) != 1 {
593-
t.Errorf("Expected only one event, got %d", len(events))
594-
}
595-
if events[0].Kv.ModRevision != wantRev {
596-
t.Errorf("Expected revision to match, got %d, want %d", events[0].Kv.ModRevision, wantRev)
597-
}
598-
}
598+
testKey := []byte("foo")
599+
testValue := []byte("bar")
600+
601+
tcs := []struct {
602+
name string
603+
startRevision int64
604+
wantEvents []mvccpb.Event
605+
}{
606+
{
607+
name: "zero revision",
608+
startRevision: 0,
609+
wantEvents: []mvccpb.Event{
610+
{Type: mvccpb.PUT, Kv: &mvccpb.KeyValue{Key: testKey, Value: testValue, CreateRevision: 2, ModRevision: 2, Version: 1}},
611+
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: testKey, ModRevision: 3}},
612+
},
613+
},
614+
{
615+
name: "revsion before first write",
616+
startRevision: 1,
617+
wantEvents: []mvccpb.Event{
618+
{Type: mvccpb.PUT, Kv: &mvccpb.KeyValue{Key: testKey, Value: testValue, CreateRevision: 2, ModRevision: 2, Version: 1}},
619+
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: testKey, ModRevision: 3}},
620+
},
621+
},
622+
{
623+
name: "revision of first write",
624+
startRevision: 2,
625+
wantEvents: []mvccpb.Event{
626+
{Type: mvccpb.PUT, Kv: &mvccpb.KeyValue{Key: testKey, Value: testValue, CreateRevision: 2, ModRevision: 2, Version: 1}},
627+
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: testKey, ModRevision: 3}},
628+
},
629+
},
630+
{
631+
name: "current revision",
632+
startRevision: 3,
633+
wantEvents: []mvccpb.Event{
634+
{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: testKey, ModRevision: 3}},
635+
},
636+
},
637+
{
638+
name: "future revision",
639+
startRevision: 4,
640+
wantEvents: []mvccpb.Event{},
641+
},
642+
}
643+
watchers := []WatchStream{}
644+
for i, tc := range tcs {
645+
w := s.NewWatchStream()
646+
defer w.Close()
647+
watchers = append(watchers, w)
648+
w.Watch(WatchID(i+1), testKey, nil, tc.startRevision)
599649
}
600650

601-
t.Run("Normal", test(0))
602-
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
651+
s.Put(testKey, testValue, lease.NoLease)
652+
time.Sleep(delayBeforeRestore)
653+
s.Restore(b)
654+
time.Sleep(delayAfterRestore)
655+
s.DeleteRange(testKey, nil)
656+
657+
for i, tc := range tcs {
658+
t.Run(tc.name, func(t *testing.T) {
659+
events := readEventsForSecond(t, watchers[i].Chan())
660+
if diff := cmp.Diff(tc.wantEvents, events); diff != "" {
661+
t.Errorf("unexpected events (-want +got):\n%s", diff)
662+
}
663+
})
664+
}
603665
}
604666

605-
func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) {
667+
func readEventsForSecond(t *testing.T, ws <-chan WatchResponse) []mvccpb.Event {
668+
events := []mvccpb.Event{}
669+
deadline := time.After(time.Second)
606670
for {
607671
select {
608672
case resp := <-ws:
673+
if len(resp.Events) == 0 {
674+
t.Fatalf("Events should never be empty, resp: %+v", resp)
675+
}
609676
events = append(events, resp.Events...)
610-
case <-time.After(time.Second):
677+
case <-deadline:
678+
return events
679+
case <-time.After(watchResyncPeriod):
611680
return events
612681
}
613682
}
614683
}
615684

616-
// TestWatchRestoreSyncedWatcher tests such a case that:
617-
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
618-
// 2. watcher with a future revision is added to "synced" watcher group
619-
// 3. restore/overwrite storage with snapshot of a higher lasat revision
620-
// 4. restore operation moves "synced" to "unsynced" watcher group
621-
// 5. choose the watcher from step 1, without panic
622-
func TestWatchRestoreSyncedWatcher(t *testing.T) {
623-
b1, _ := betesting.NewDefaultTmpBackend(t)
624-
s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, StoreConfig{})
625-
defer cleanup(s1, b1)
626-
627-
b2, _ := betesting.NewDefaultTmpBackend(t)
628-
s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, StoreConfig{})
629-
defer cleanup(s2, b2)
630-
631-
testKey, testValue := []byte("foo"), []byte("bar")
632-
rev := s1.Put(testKey, testValue, lease.NoLease)
633-
startRev := rev + 2
634-
635-
// create a watcher with a future revision
636-
// add to "synced" watcher group (startRev > s.store.currentRev)
637-
w1 := s1.NewWatchStream()
638-
defer w1.Close()
639-
640-
w1.Watch(0, testKey, nil, startRev)
641-
642-
// make "s2" ends up with a higher last revision
643-
s2.Put(testKey, testValue, lease.NoLease)
644-
s2.Put(testKey, testValue, lease.NoLease)
645-
646-
// overwrite storage with higher revisions
647-
if err := s1.Restore(b2); err != nil {
648-
t.Fatal(err)
649-
}
650-
651-
// wait for next "syncWatchersLoop" iteration
652-
// and the unsynced watcher should be chosen
653-
time.Sleep(2 * time.Second)
654-
655-
// trigger events for "startRev"
656-
s1.Put(testKey, testValue, lease.NoLease)
657-
658-
select {
659-
case resp := <-w1.Chan():
660-
if resp.Revision != startRev {
661-
t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
662-
}
663-
if len(resp.Events) != 1 {
664-
t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
665-
}
666-
if resp.Events[0].Kv.ModRevision != startRev {
667-
t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
668-
}
669-
case <-time.After(time.Second):
670-
t.Fatal("failed to receive event in 1 second")
671-
}
672-
}
673-
674685
// TestWatchBatchUnsynced tests batching on unsynced watchers
675686
func TestWatchBatchUnsynced(t *testing.T) {
676687
tcs := []struct {

0 commit comments

Comments
 (0)