@@ -22,6 +22,7 @@ import (
22
22
"testing"
23
23
"time"
24
24
25
+ "github.com/google/go-cmp/cmp"
25
26
"github.com/prometheus/client_golang/prometheus/testutil"
26
27
"github.com/stretchr/testify/assert"
27
28
"github.com/stretchr/testify/require"
@@ -572,105 +573,115 @@ func TestWatchFutureRev(t *testing.T) {
572
573
}
573
574
574
575
func TestWatchRestore (t * testing.T ) {
575
- test := func (delay time.Duration ) func (t * testing.T ) {
576
- return func (t * testing.T ) {
577
- b , _ := betesting .NewDefaultTmpBackend (t )
578
- s := New (zaptest .NewLogger (t ), b , & lease.FakeLessor {}, StoreConfig {})
579
- defer cleanup (s , b )
576
+ resyncDelay := watchResyncPeriod * 3 / 2
580
577
581
- testKey := []byte ("foo" )
582
- testValue := []byte ("bar" )
583
- w := s .NewWatchStream ()
584
- defer w .Close ()
585
- w .Watch (0 , testKey , nil , 1 )
578
+ t .Run ("NoResync" , func (t * testing.T ) {
579
+ testWatchRestore (t , 0 , 0 )
580
+ })
581
+ t .Run ("ResyncBefore" , func (t * testing.T ) {
582
+ testWatchRestore (t , resyncDelay , 0 )
583
+ })
584
+ t .Run ("ResyncAfter" , func (t * testing.T ) {
585
+ testWatchRestore (t , 0 , resyncDelay )
586
+ })
587
+
588
+ t .Run ("ResyncBeforeAndAfter" , func (t * testing.T ) {
589
+ testWatchRestore (t , resyncDelay , resyncDelay )
590
+ })
591
+ }
586
592
587
- time .Sleep (delay )
588
- wantRev := s .Put (testKey , testValue , lease .NoLease )
593
+ func testWatchRestore (t * testing.T , delayBeforeRestore , delayAfterRestore time.Duration ) {
594
+ b , _ := betesting .NewDefaultTmpBackend (t )
595
+ s := New (zaptest .NewLogger (t ), b , & lease.FakeLessor {}, StoreConfig {})
596
+ defer cleanup (s , b )
589
597
590
- s .Restore (b )
591
- events := readEventsForSecond (w .Chan ())
592
- if len (events ) != 1 {
593
- t .Errorf ("Expected only one event, got %d" , len (events ))
594
- }
595
- if events [0 ].Kv .ModRevision != wantRev {
596
- t .Errorf ("Expected revision to match, got %d, want %d" , events [0 ].Kv .ModRevision , wantRev )
597
- }
598
- }
598
+ testKey := []byte ("foo" )
599
+ testValue := []byte ("bar" )
600
+
601
+ tcs := []struct {
602
+ name string
603
+ startRevision int64
604
+ wantEvents []mvccpb.Event
605
+ }{
606
+ {
607
+ name : "zero revision" ,
608
+ startRevision : 0 ,
609
+ wantEvents : []mvccpb.Event {
610
+ {Type : mvccpb .PUT , Kv : & mvccpb.KeyValue {Key : testKey , Value : testValue , CreateRevision : 2 , ModRevision : 2 , Version : 1 }},
611
+ {Type : mvccpb .DELETE , Kv : & mvccpb.KeyValue {Key : testKey , ModRevision : 3 }},
612
+ },
613
+ },
614
+ {
615
+ name : "revsion before first write" ,
616
+ startRevision : 1 ,
617
+ wantEvents : []mvccpb.Event {
618
+ {Type : mvccpb .PUT , Kv : & mvccpb.KeyValue {Key : testKey , Value : testValue , CreateRevision : 2 , ModRevision : 2 , Version : 1 }},
619
+ {Type : mvccpb .DELETE , Kv : & mvccpb.KeyValue {Key : testKey , ModRevision : 3 }},
620
+ },
621
+ },
622
+ {
623
+ name : "revision of first write" ,
624
+ startRevision : 2 ,
625
+ wantEvents : []mvccpb.Event {
626
+ {Type : mvccpb .PUT , Kv : & mvccpb.KeyValue {Key : testKey , Value : testValue , CreateRevision : 2 , ModRevision : 2 , Version : 1 }},
627
+ {Type : mvccpb .DELETE , Kv : & mvccpb.KeyValue {Key : testKey , ModRevision : 3 }},
628
+ },
629
+ },
630
+ {
631
+ name : "current revision" ,
632
+ startRevision : 3 ,
633
+ wantEvents : []mvccpb.Event {
634
+ {Type : mvccpb .DELETE , Kv : & mvccpb.KeyValue {Key : testKey , ModRevision : 3 }},
635
+ },
636
+ },
637
+ {
638
+ name : "future revision" ,
639
+ startRevision : 4 ,
640
+ wantEvents : []mvccpb.Event {},
641
+ },
642
+ }
643
+ watchers := []WatchStream {}
644
+ for i , tc := range tcs {
645
+ w := s .NewWatchStream ()
646
+ defer w .Close ()
647
+ watchers = append (watchers , w )
648
+ w .Watch (WatchID (i + 1 ), testKey , nil , tc .startRevision )
599
649
}
600
650
601
- t .Run ("Normal" , test (0 ))
602
- t .Run ("RunSyncWatchLoopBeforeRestore" , test (time .Millisecond * 120 )) // longer than default waitDuration
651
+ s .Put (testKey , testValue , lease .NoLease )
652
+ time .Sleep (delayBeforeRestore )
653
+ s .Restore (b )
654
+ time .Sleep (delayAfterRestore )
655
+ s .DeleteRange (testKey , nil )
656
+
657
+ for i , tc := range tcs {
658
+ t .Run (tc .name , func (t * testing.T ) {
659
+ events := readEventsForSecond (t , watchers [i ].Chan ())
660
+ if diff := cmp .Diff (tc .wantEvents , events ); diff != "" {
661
+ t .Errorf ("unexpected events (-want +got):\n %s" , diff )
662
+ }
663
+ })
664
+ }
603
665
}
604
666
605
- func readEventsForSecond (ws <- chan WatchResponse ) (events []mvccpb.Event ) {
667
+ func readEventsForSecond (t * testing.T , ws <- chan WatchResponse ) []mvccpb.Event {
668
+ events := []mvccpb.Event {}
669
+ deadline := time .After (time .Second )
606
670
for {
607
671
select {
608
672
case resp := <- ws :
673
+ if len (resp .Events ) == 0 {
674
+ t .Fatalf ("Events should never be empty, resp: %+v" , resp )
675
+ }
609
676
events = append (events , resp .Events ... )
610
- case <- time .After (time .Second ):
677
+ case <- deadline :
678
+ return events
679
+ case <- time .After (watchResyncPeriod ):
611
680
return events
612
681
}
613
682
}
614
683
}
615
684
616
- // TestWatchRestoreSyncedWatcher tests such a case that:
617
- // 1. watcher is created with a future revision "math.MaxInt64 - 2"
618
- // 2. watcher with a future revision is added to "synced" watcher group
619
- // 3. restore/overwrite storage with snapshot of a higher lasat revision
620
- // 4. restore operation moves "synced" to "unsynced" watcher group
621
- // 5. choose the watcher from step 1, without panic
622
- func TestWatchRestoreSyncedWatcher (t * testing.T ) {
623
- b1 , _ := betesting .NewDefaultTmpBackend (t )
624
- s1 := New (zaptest .NewLogger (t ), b1 , & lease.FakeLessor {}, StoreConfig {})
625
- defer cleanup (s1 , b1 )
626
-
627
- b2 , _ := betesting .NewDefaultTmpBackend (t )
628
- s2 := New (zaptest .NewLogger (t ), b2 , & lease.FakeLessor {}, StoreConfig {})
629
- defer cleanup (s2 , b2 )
630
-
631
- testKey , testValue := []byte ("foo" ), []byte ("bar" )
632
- rev := s1 .Put (testKey , testValue , lease .NoLease )
633
- startRev := rev + 2
634
-
635
- // create a watcher with a future revision
636
- // add to "synced" watcher group (startRev > s.store.currentRev)
637
- w1 := s1 .NewWatchStream ()
638
- defer w1 .Close ()
639
-
640
- w1 .Watch (0 , testKey , nil , startRev )
641
-
642
- // make "s2" ends up with a higher last revision
643
- s2 .Put (testKey , testValue , lease .NoLease )
644
- s2 .Put (testKey , testValue , lease .NoLease )
645
-
646
- // overwrite storage with higher revisions
647
- if err := s1 .Restore (b2 ); err != nil {
648
- t .Fatal (err )
649
- }
650
-
651
- // wait for next "syncWatchersLoop" iteration
652
- // and the unsynced watcher should be chosen
653
- time .Sleep (2 * time .Second )
654
-
655
- // trigger events for "startRev"
656
- s1 .Put (testKey , testValue , lease .NoLease )
657
-
658
- select {
659
- case resp := <- w1 .Chan ():
660
- if resp .Revision != startRev {
661
- t .Fatalf ("resp.Revision expect %d, got %d" , startRev , resp .Revision )
662
- }
663
- if len (resp .Events ) != 1 {
664
- t .Fatalf ("len(resp.Events) expect 1, got %d" , len (resp .Events ))
665
- }
666
- if resp .Events [0 ].Kv .ModRevision != startRev {
667
- t .Fatalf ("resp.Events[0].Kv.ModRevision expect %d, got %d" , startRev , resp .Events [0 ].Kv .ModRevision )
668
- }
669
- case <- time .After (time .Second ):
670
- t .Fatal ("failed to receive event in 1 second" )
671
- }
672
- }
673
-
674
685
// TestWatchBatchUnsynced tests batching on unsynced watchers
675
686
func TestWatchBatchUnsynced (t * testing.T ) {
676
687
tcs := []struct {
0 commit comments