@@ -479,32 +479,48 @@ func TestLeastUseful(t *testing.T) {
479
479
}
480
480
481
481
type callbackRecord struct {
482
- keys []string
483
- wg * sync.WaitGroup
482
+ key string
483
+ value roachpb.Value
484
+ timestamp int64
485
+ }
486
+
487
+ type callbackRecords struct {
488
+ records []callbackRecord
489
+ wg * sync.WaitGroup
484
490
syncutil.Mutex
485
491
}
486
492
487
- func (cr * callbackRecord ) Add (key string , _ roachpb.Value ) {
493
+ func (cr * callbackRecords ) Add (key string , value roachpb.Value , origTs int64 ) {
488
494
cr .Lock ()
489
495
defer cr .Unlock ()
490
- cr .keys = append (cr .keys , key )
496
+ cr .records = append (cr .records , callbackRecord { key , value , origTs } )
491
497
cr .wg .Done ()
492
498
}
493
499
494
- func (cr * callbackRecord ) Keys () []string {
500
+ func (cr * callbackRecords ) Keys () []string {
495
501
cr .Lock ()
496
502
defer cr .Unlock ()
497
- return append ([]string (nil ), cr .keys ... )
503
+ keys := make ([]string , len (cr .records ))
504
+ for i , record := range cr .records {
505
+ keys [i ] = record .key
506
+ }
507
+ return keys
508
+ }
509
+
510
+ func (cr * callbackRecords ) Records () []callbackRecord {
511
+ cr .Lock ()
512
+ defer cr .Unlock ()
513
+ return append ([]callbackRecord (nil ), cr .records ... )
498
514
}
499
515
500
516
func TestCallbacks (t * testing.T ) {
501
517
defer leaktest .AfterTest (t )()
502
518
is , stopper := newTestInfoStore ()
503
519
defer stopper .Stop (context .Background ())
504
520
wg := & sync.WaitGroup {}
505
- cb1 := callbackRecord {wg : wg }
506
- cb2 := callbackRecord {wg : wg }
507
- cbAll := callbackRecord {wg : wg }
521
+ cb1 := callbackRecords {wg : wg }
522
+ cb2 := callbackRecords {wg : wg }
523
+ cbAll := callbackRecords {wg : wg }
508
524
509
525
unregisterCB1 := is .registerCallback ("key1" , cb1 .Add )
510
526
is .registerCallback ("key2" , cb2 .Add )
@@ -603,18 +619,17 @@ func TestCallbacks(t *testing.T) {
603
619
}
604
620
}
605
621
606
- // TestRegisterCallback verifies that a callback is invoked when
607
- // registered if there are items which match its regexp in the
608
- // infostore.
622
+ // TestRegisterCallback verifies that a callback is invoked correctly when
623
+ // registered if there are items which match its regexp in the infostore.
609
624
func TestRegisterCallback (t * testing.T ) {
610
625
defer leaktest .AfterTest (t )()
611
626
is , stopper := newTestInfoStore ()
612
627
defer stopper .Stop (context .Background ())
613
628
wg := & sync.WaitGroup {}
614
- cb := callbackRecord {wg : wg }
629
+ cb := callbackRecords {wg : wg }
615
630
616
- i1 := is .newInfo (nil , time .Second )
617
- i2 := is .newInfo (nil , time .Second )
631
+ i1 := is .newInfo ([] byte ( "val1" ) , time .Second )
632
+ i2 := is .newInfo ([] byte ( "val2" ) , time .Second )
618
633
if err := is .addInfo ("key1" , i1 ); err != nil {
619
634
t .Fatal (err )
620
635
}
@@ -623,13 +638,35 @@ func TestRegisterCallback(t *testing.T) {
623
638
}
624
639
625
640
wg .Add (2 )
641
+ // Register a callback after the infos are added.
626
642
is .registerCallback ("key.*" , cb .Add )
627
643
wg .Wait ()
628
- actKeys := cb .Keys ()
629
- sort .Strings (actKeys )
630
- if expKeys := []string {"key1" , "key2" }; ! reflect .DeepEqual (actKeys , expKeys ) {
631
- t .Errorf ("expected %v, got %v" , expKeys , cb .Keys ())
644
+ actRecords := cb .Records ()
645
+ // Sort records by key since callback order is not guaranteed.
646
+ sort .Slice (actRecords , func (i , j int ) bool {
647
+ return actRecords [i ].key < actRecords [j ].key
648
+ })
649
+
650
+ expectedRecords := []callbackRecord {
651
+ {key : "key1" , value : i1 .Value , timestamp : i1 .OrigStamp },
652
+ {key : "key2" , value : i2 .Value , timestamp : i2 .OrigStamp },
653
+ }
654
+
655
+ require .Equal (t , expectedRecords , actRecords )
656
+
657
+ // Verify callback fires for new matching info
658
+ i3 := is .newInfo ([]byte ("val3" ), time .Second )
659
+ wg .Add (1 )
660
+ if err := is .addInfo ("key3" , i3 ); err != nil {
661
+ t .Fatal (err )
632
662
}
663
+ wg .Wait ()
664
+ actRecords = cb .Records ()
665
+ sort .Slice (actRecords , func (i , j int ) bool {
666
+ return actRecords [i ].key < actRecords [j ].key
667
+ })
668
+ expectedRecords = append (expectedRecords , callbackRecord {key : "key3" , value : i3 .Value , timestamp : i3 .OrigStamp })
669
+ require .Equal (t , expectedRecords , actRecords )
633
670
}
634
671
635
672
// TestCallbacksCalledSequentially verifies that callbacks are called in a
@@ -650,14 +687,14 @@ func TestCallbacksCalledSequentially(t *testing.T) {
650
687
651
688
// Create a callback generator that will generate a callback that will
652
689
// assert that the keys are in sequential order.
653
- callbackGenerator := func () func (key string , _ roachpb.Value ) {
690
+ callbackGenerator := func () func (key string , _ roachpb.Value , _ int64 ) {
654
691
// Add the number of updates to the wait group.
655
692
wg .Add (numUpdates )
656
693
// Initially, the expected next key is 0.
657
694
expectedNextKey := 0
658
695
659
696
// Return a callback that will assert the key is in sequential order.
660
- return func (key string , _ roachpb.Value ) {
697
+ return func (key string , _ roachpb.Value , _ int64 ) {
661
698
// Convert key to int and assert it matches the expected value.
662
699
keyInt , err := strconv .Atoi (key )
663
700
require .NoError (t , err )
@@ -672,7 +709,7 @@ func TestCallbacksCalledSequentially(t *testing.T) {
672
709
// halfway through the test to assert that it doesn't impact the
673
710
// sequential order of the other callbacks.
674
711
is .registerCallback (".*" , callbackGenerator ())
675
- unregister := is .registerCallback (".*" , func (key string , _ roachpb.Value ) {})
712
+ unregister := is .registerCallback (".*" , func (key string , _ roachpb.Value , _ int64 ) {})
676
713
is .registerCallback (".*" , callbackGenerator ())
677
714
678
715
for i := range numUpdates {
@@ -694,7 +731,7 @@ func BenchmarkCallbackParallelism(b *testing.B) {
694
731
defer stopper .Stop (ctx )
695
732
wg := & sync.WaitGroup {}
696
733
697
- callback := func (key string , val roachpb.Value ) {
734
+ callback := func (key string , val roachpb.Value , _ int64 ) {
698
735
// Sleep for a short duration to simulate work done in callback.
699
736
time .Sleep (time .Millisecond )
700
737
wg .Done ()
0 commit comments