Skip to content

Commit 68075c9

Browse files
authored
Merge pull request #20241 from serathius/skip-future-progress-notification-release-3.6
[release-3.6] Skip sending progress notification for watch with starting revision in the future
2 parents c093b79 + 9fc770f commit 68075c9

File tree

3 files changed

+83
-38
lines changed

3 files changed

+83
-38
lines changed

server/etcdserver/api/v3rpc/watch.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -305,12 +305,7 @@ func (sws *serverWatchStream) recvLoop() error {
305305

306306
filters := FiltersFromRequest(creq)
307307

308-
wsrev := sws.watchStream.Rev()
309-
rev := creq.StartRevision
310-
if rev == 0 {
311-
rev = wsrev + 1
312-
}
313-
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
308+
id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, creq.StartRevision, filters...)
314309
if err == nil {
315310
sws.mu.Lock()
316311
if creq.ProgressNotify {
@@ -328,7 +323,7 @@ func (sws *serverWatchStream) recvLoop() error {
328323
}
329324

330325
wr := &pb.WatchResponse{
331-
Header: sws.newResponseHeader(wsrev),
326+
Header: sws.newResponseHeader(sws.watchStream.Rev()),
332327
WatchId: int64(id),
333328
Created: true,
334329
Canceled: err != nil,

server/storage/mvcc/watchable_store.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,13 @@ func (s *watchableStore) NewWatchStream() WatchStream {
122122

123123
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
124124
wa := &watcher{
125-
key: key,
126-
end: end,
127-
minRev: startRev,
128-
id: id,
129-
ch: ch,
130-
fcs: fcs,
125+
key: key,
126+
end: end,
127+
startRev: startRev,
128+
minRev: startRev,
129+
id: id,
130+
ch: ch,
131+
fcs: fcs,
131132
}
132133

133134
s.mu.Lock()
@@ -532,11 +533,15 @@ func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseW
532533
s.mu.RLock()
533534
defer s.mu.RUnlock()
534535

536+
rev := s.rev()
535537
// Any watcher unsynced?
536538
for _, w := range watchers {
537539
if _, ok := s.synced.watchers[w]; !ok {
538540
return false
539541
}
542+
if rev < w.startRev {
543+
return false
544+
}
540545
}
541546

542547
// If all watchers are synchronised, send out progress
@@ -545,7 +550,7 @@ func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseW
545550
// notification will be broadcasted client-side if required
546551
// (see dispatchEvent in client/v3/watch.go)
547552
for _, w := range watchers {
548-
w.send(WatchResponse{WatchID: responseWatchID, Revision: s.rev()})
553+
w.send(WatchResponse{WatchID: responseWatchID, Revision: rev})
549554
return true
550555
}
551556
return true
@@ -572,6 +577,7 @@ type watcher struct {
572577
// except when the watcher were to be moved from "synced" watcher group
573578
restore bool
574579

580+
startRev int64
575581
// minRev is the minimum revision update the watcher will accept
576582
minRev int64
577583
id WatchID

server/storage/mvcc/watcher_test.go

Lines changed: 68 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -289,48 +289,92 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) {
289289
}
290290
}
291291

292-
// TestWatcherRequestProgress ensures synced watcher can correctly
293-
// report its correct progress.
294-
func TestWatcherRequestProgress(t *testing.T) {
292+
func TestWatcherRequestProgressBadId(t *testing.T) {
295293
b, _ := betesting.NewDefaultTmpBackend(t)
296294
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
297295

298296
defer cleanup(s, b)
299-
300-
testKey := []byte("foo")
301-
notTestKey := []byte("bad")
302-
testValue := []byte("bar")
303-
s.Put(testKey, testValue, lease.NoLease)
304-
305297
w := s.NewWatchStream()
306-
307298
badID := WatchID(1000)
308299
w.RequestProgress(badID)
309300
select {
310301
case resp := <-w.Chan():
311302
t.Fatalf("unexpected %+v", resp)
312303
default:
313304
}
305+
}
314306

315-
id, _ := w.Watch(0, notTestKey, nil, 1)
316-
w.RequestProgress(id)
317-
select {
318-
case resp := <-w.Chan():
319-
t.Fatalf("unexpected %+v", resp)
320-
default:
307+
func TestWatcherRequestProgress(t *testing.T) {
308+
testKey := []byte("foo")
309+
notTestKey := []byte("bad")
310+
testValue := []byte("bar")
311+
tcs := []struct {
312+
name string
313+
startRev int64
314+
expectProgressBeforeSync bool
315+
expectProgressAfterSync bool
316+
}{
317+
{
318+
name: "Zero revision",
319+
startRev: 0,
320+
expectProgressBeforeSync: true,
321+
expectProgressAfterSync: true,
322+
},
323+
{
324+
name: "Old revision",
325+
startRev: 1,
326+
expectProgressAfterSync: true,
327+
},
328+
{
329+
name: "Current revision",
330+
startRev: 2,
331+
expectProgressAfterSync: true,
332+
},
333+
{
334+
name: "Current revision plus one",
335+
startRev: 3,
336+
},
337+
{
338+
name: "Current revision plus two",
339+
startRev: 4,
340+
},
321341
}
342+
for _, tc := range tcs {
343+
t.Run(tc.name, func(t *testing.T) {
344+
b, _ := betesting.NewDefaultTmpBackend(t)
345+
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
322346

323-
s.syncWatchers([]mvccpb.Event{})
347+
defer cleanup(s, b)
348+
349+
s.Put(testKey, testValue, lease.NoLease)
350+
351+
w := s.NewWatchStream()
352+
353+
id, _ := w.Watch(0, notTestKey, nil, tc.startRev)
354+
w.RequestProgress(id)
355+
asssertProgressSent(t, w, id, tc.expectProgressBeforeSync)
356+
s.syncWatchers([]mvccpb.Event{})
357+
w.RequestProgress(id)
358+
asssertProgressSent(t, w, id, tc.expectProgressAfterSync)
359+
})
360+
}
361+
}
324362

325-
w.RequestProgress(id)
326-
wrs := WatchResponse{WatchID: id, Revision: 2}
363+
func asssertProgressSent(t *testing.T, stream WatchStream, id WatchID, expectProgress bool) {
327364
select {
328-
case resp := <-w.Chan():
329-
if !reflect.DeepEqual(resp, wrs) {
330-
t.Fatalf("got %+v, expect %+v", resp, wrs)
365+
case resp := <-stream.Chan():
366+
if expectProgress {
367+
wrs := WatchResponse{WatchID: id, Revision: 2}
368+
if !reflect.DeepEqual(resp, wrs) {
369+
t.Fatalf("got %+v, expect %+v", resp, wrs)
370+
}
371+
} else {
372+
t.Fatalf("unexpected response %+v", resp)
373+
}
374+
default:
375+
if expectProgress {
376+
t.Fatalf("failed to receive progress")
331377
}
332-
case <-time.After(time.Second):
333-
t.Fatal("failed to receive progress")
334378
}
335379
}
336380

0 commit comments

Comments
 (0)