Skip to content

Commit 1af9ff2

Browse files
craig[bot]wenyihu6
andcommitted
Merge #150597
150597: gossip: pass OrigStamp as part of gossip callback r=tbg a=wenyihu6 Previously, gossip callbacks were registered with a key string to receive updates when new gossip info for that key arrived. These callbacks took the key and value as arguments. This commit adds the originating timestamp (from gossip.Info) as an additional argument, representing the wall time when the info was first generated by the originating node. The new argument is not used by existing code, but mma will use it in future commits to determine whether store load info reflects certain replication changes. Epic: none Release note: none Co-authored-by: wenyihu6 <[email protected]>
2 parents 98d9e07 + b9cbd4e commit 1af9ff2

File tree

16 files changed

+99
-53
lines changed

16 files changed

+99
-53
lines changed

pkg/gossip/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ func (c *client) gossip(
312312
sendGossipChan := make(chan struct{}, 1)
313313

314314
// Register a callback for gossip updates.
315-
updateCallback := func(_ string, _ roachpb.Value) {
315+
updateCallback := func(_ string, _ roachpb.Value, _ int64) {
316316
select {
317317
case sendGossipChan <- struct{}{}:
318318
default:

pkg/gossip/gossip.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ func maxPeers(nodeCount int) int {
756756
// new addresses for each encountered host and to write the
757757
// set of gossip node addresses to persistent storage when it
758758
// changes.
759-
func (g *Gossip) updateNodeAddress(key string, content roachpb.Value) {
759+
func (g *Gossip) updateNodeAddress(key string, content roachpb.Value, _ int64) {
760760
ctx := g.AnnotateCtx(context.TODO())
761761
var desc roachpb.NodeDescriptor
762762
if err := content.GetProto(&desc); err != nil {
@@ -825,7 +825,7 @@ func (g *Gossip) removeNodeDescriptorLocked(nodeID roachpb.NodeID) {
825825
}
826826

827827
// updateStoreMap is a gossip callback which is used to update storeMap.
828-
func (g *Gossip) updateStoreMap(key string, content roachpb.Value) {
828+
func (g *Gossip) updateStoreMap(key string, content roachpb.Value, _ int64) {
829829
ctx := g.AnnotateCtx(context.TODO())
830830
var desc roachpb.StoreDescriptor
831831
if err := content.GetProto(&desc); err != nil {
@@ -1133,9 +1133,10 @@ func (g *Gossip) IterateInfos(prefix string, visit func(k string, info Info) err
11331133
return nil
11341134
}
11351135

1136-
// Callback is a callback method to be invoked on gossip update
1137-
// of info denoted by key.
1138-
type Callback func(string, roachpb.Value)
1136+
// Callback is a callback method to be invoked on gossip update of info denoted
1137+
// by key. origTimestamp is the info wall time when generated by the originating
1138+
// node.
1139+
type Callback func(key string, value roachpb.Value, origTimestamp int64)
11391140

11401141
// CallbackOption is a marker interface that callback options must implement.
11411142
type CallbackOption interface {
@@ -1616,7 +1617,7 @@ func (g *Gossip) TestingAddInfoProtoAndWaitForAllCallbacks(
16161617
if cb.matcher.MatchString(key) {
16171618
cb.cw.mu.Lock()
16181619
cb.cw.mu.workQueue = append(cb.cw.mu.workQueue, callbackWorkItem{
1619-
method: func(_ string, _ roachpb.Value) {
1620+
method: func(_ string, _ roachpb.Value, _ int64) {
16201621
wg.Done()
16211622
},
16221623
schedulingTime: timeutil.Now(),
@@ -1657,7 +1658,7 @@ func (g *Gossip) GetFirstRangeDescriptor() (*roachpb.RangeDescriptor, error) {
16571658

16581659
// OnFirstRangeChanged implements kvcoord.FirstRangeProvider.
16591660
func (g *Gossip) OnFirstRangeChanged(cb func(*roachpb.RangeDescriptor)) {
1660-
g.RegisterCallback(KeyFirstRangeDescriptor, func(_ string, value roachpb.Value) {
1661+
g.RegisterCallback(KeyFirstRangeDescriptor, func(_ string, value roachpb.Value, _ int64) {
16611662
ctx := context.Background()
16621663
desc := &roachpb.RangeDescriptor{}
16631664
if err := value.GetProto(desc); err != nil {

pkg/gossip/gossip_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,7 @@ func TestCallbacksPendingMetricGoesToZeroOnStop(t *testing.T) {
11721172
defer stopper.Stop(ctx)
11731173
g := NewTest(1, stopper, metric.NewRegistry())
11741174

1175-
unregister := g.RegisterCallback("test.*", func(key string, val roachpb.Value) {
1175+
unregister := g.RegisterCallback("test.*", func(key string, val roachpb.Value, _ int64) {
11761176
// Do nothing.
11771177
})
11781178

pkg/gossip/infostore.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,11 @@ type callbackWorkItem struct {
7979
// schedulingTime is the time when the callback was scheduled.
8080
schedulingTime time.Time
8181
method Callback
82-
key string
83-
content roachpb.Value
82+
// key, content, origTimestamp are the parameters that will be passed to the
83+
// callback method. They are based on the infos added to the infostore.
84+
key string
85+
content roachpb.Value
86+
origTimestamp int64
8487
}
8588

8689
type callbackWork struct {
@@ -274,7 +277,7 @@ func (is *infoStore) launchCallbackWorker(ambient log.AmbientContext, cw *callba
274277
is.metrics.CallbacksPendingDuration.RecordValue(queueDur.Nanoseconds())
275278
}
276279

277-
work.method(work.key, work.content)
280+
work.method(work.key, work.content, work.origTimestamp)
278281

279282
afterProcess := timeutil.Now()
280283
processDur := afterProcess.Sub(afterQueue)
@@ -362,7 +365,7 @@ func (is *infoStore) addInfo(key string, i *Info) error {
362365
ratchetHighWaterStamp(is.highWaterStamps, i.NodeID, i.OrigStamp)
363366
changed := existingInfo == nil ||
364367
!bytes.Equal(existingInfo.Value.RawBytes, i.Value.RawBytes)
365-
is.processCallbacks(key, i.Value, changed)
368+
is.processCallbacks(key, i.Value, i.OrigStamp, changed)
366369
return nil
367370
}
368371

@@ -419,7 +422,7 @@ func (is *infoStore) registerCallback(
419422

420423
if err := is.visitInfos(func(key string, i *Info) error {
421424
if matcher.MatchString(key) {
422-
is.runCallbacks(key, i.Value, cb)
425+
is.runCallbacks(key, i.Value, i.OrigStamp, cb)
423426
}
424427
return nil
425428
}, true /* deleteExpired */); err != nil {
@@ -446,20 +449,24 @@ func (is *infoStore) registerCallback(
446449
// processCallbacks processes callbacks for the specified key by
447450
// matching each callback's regular expression against the key and invoking
448451
// the corresponding callback method on a match.
449-
func (is *infoStore) processCallbacks(key string, content roachpb.Value, changed bool) {
452+
func (is *infoStore) processCallbacks(
453+
key string, content roachpb.Value, origTimestamp int64, changed bool,
454+
) {
450455
var callbacks []*callback
451456
for _, cb := range is.callbacks {
452457
if (changed || cb.redundant) && cb.matcher.MatchString(key) {
453458
callbacks = append(callbacks, cb)
454459
}
455460
}
456-
is.runCallbacks(key, content, callbacks...)
461+
is.runCallbacks(key, content, origTimestamp, callbacks...)
457462
}
458463

459464
// runCallbacks receives a list of callbacks and contents that match the key.
460465
// It adds work to the callback work slices, and signals the associated callback
461466
// workers to execute the work.
462-
func (is *infoStore) runCallbacks(key string, content roachpb.Value, callbacks ...*callback) {
467+
func (is *infoStore) runCallbacks(
468+
key string, content roachpb.Value, origTimestamp int64, callbacks ...*callback,
469+
) {
463470
// Check if the stopper is quiescing. If so, do not add the callbacks to the
464471
// callback work list because they won't be processed anyways.
465472
select {
@@ -478,6 +485,7 @@ func (is *infoStore) runCallbacks(key string, content roachpb.Value, callbacks .
478485
method: cb.method,
479486
key: key,
480487
content: content,
488+
origTimestamp: origTimestamp,
481489
})
482490
cb.cw.mu.Unlock()
483491

pkg/gossip/infostore_test.go

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -479,32 +479,48 @@ func TestLeastUseful(t *testing.T) {
479479
}
480480

481481
type callbackRecord struct {
482-
keys []string
483-
wg *sync.WaitGroup
482+
key string
483+
value roachpb.Value
484+
timestamp int64
485+
}
486+
487+
type callbackRecords struct {
488+
records []callbackRecord
489+
wg *sync.WaitGroup
484490
syncutil.Mutex
485491
}
486492

487-
func (cr *callbackRecord) Add(key string, _ roachpb.Value) {
493+
func (cr *callbackRecords) Add(key string, value roachpb.Value, origTs int64) {
488494
cr.Lock()
489495
defer cr.Unlock()
490-
cr.keys = append(cr.keys, key)
496+
cr.records = append(cr.records, callbackRecord{key, value, origTs})
491497
cr.wg.Done()
492498
}
493499

494-
func (cr *callbackRecord) Keys() []string {
500+
func (cr *callbackRecords) Keys() []string {
495501
cr.Lock()
496502
defer cr.Unlock()
497-
return append([]string(nil), cr.keys...)
503+
keys := make([]string, len(cr.records))
504+
for i, record := range cr.records {
505+
keys[i] = record.key
506+
}
507+
return keys
508+
}
509+
510+
func (cr *callbackRecords) Records() []callbackRecord {
511+
cr.Lock()
512+
defer cr.Unlock()
513+
return append([]callbackRecord(nil), cr.records...)
498514
}
499515

500516
func TestCallbacks(t *testing.T) {
501517
defer leaktest.AfterTest(t)()
502518
is, stopper := newTestInfoStore()
503519
defer stopper.Stop(context.Background())
504520
wg := &sync.WaitGroup{}
505-
cb1 := callbackRecord{wg: wg}
506-
cb2 := callbackRecord{wg: wg}
507-
cbAll := callbackRecord{wg: wg}
521+
cb1 := callbackRecords{wg: wg}
522+
cb2 := callbackRecords{wg: wg}
523+
cbAll := callbackRecords{wg: wg}
508524

509525
unregisterCB1 := is.registerCallback("key1", cb1.Add)
510526
is.registerCallback("key2", cb2.Add)
@@ -603,18 +619,17 @@ func TestCallbacks(t *testing.T) {
603619
}
604620
}
605621

606-
// TestRegisterCallback verifies that a callback is invoked when
607-
// registered if there are items which match its regexp in the
608-
// infostore.
622+
// TestRegisterCallback verifies that a callback is invoked correctly when
623+
// registered if there are items which match its regexp in the infostore.
609624
func TestRegisterCallback(t *testing.T) {
610625
defer leaktest.AfterTest(t)()
611626
is, stopper := newTestInfoStore()
612627
defer stopper.Stop(context.Background())
613628
wg := &sync.WaitGroup{}
614-
cb := callbackRecord{wg: wg}
629+
cb := callbackRecords{wg: wg}
615630

616-
i1 := is.newInfo(nil, time.Second)
617-
i2 := is.newInfo(nil, time.Second)
631+
i1 := is.newInfo([]byte("val1"), time.Second)
632+
i2 := is.newInfo([]byte("val2"), time.Second)
618633
if err := is.addInfo("key1", i1); err != nil {
619634
t.Fatal(err)
620635
}
@@ -623,13 +638,35 @@ func TestRegisterCallback(t *testing.T) {
623638
}
624639

625640
wg.Add(2)
641+
// Register a callback after the infos are added.
626642
is.registerCallback("key.*", cb.Add)
627643
wg.Wait()
628-
actKeys := cb.Keys()
629-
sort.Strings(actKeys)
630-
if expKeys := []string{"key1", "key2"}; !reflect.DeepEqual(actKeys, expKeys) {
631-
t.Errorf("expected %v, got %v", expKeys, cb.Keys())
644+
actRecords := cb.Records()
645+
// Sort records by key since callback order is not guaranteed.
646+
sort.Slice(actRecords, func(i, j int) bool {
647+
return actRecords[i].key < actRecords[j].key
648+
})
649+
650+
expectedRecords := []callbackRecord{
651+
{key: "key1", value: i1.Value, timestamp: i1.OrigStamp},
652+
{key: "key2", value: i2.Value, timestamp: i2.OrigStamp},
653+
}
654+
655+
require.Equal(t, expectedRecords, actRecords)
656+
657+
// Verify callback fires for new matching info
658+
i3 := is.newInfo([]byte("val3"), time.Second)
659+
wg.Add(1)
660+
if err := is.addInfo("key3", i3); err != nil {
661+
t.Fatal(err)
632662
}
663+
wg.Wait()
664+
actRecords = cb.Records()
665+
sort.Slice(actRecords, func(i, j int) bool {
666+
return actRecords[i].key < actRecords[j].key
667+
})
668+
expectedRecords = append(expectedRecords, callbackRecord{key: "key3", value: i3.Value, timestamp: i3.OrigStamp})
669+
require.Equal(t, expectedRecords, actRecords)
633670
}
634671

635672
// TestCallbacksCalledSequentially verifies that callbacks are called in a
@@ -650,14 +687,14 @@ func TestCallbacksCalledSequentially(t *testing.T) {
650687

651688
// Create a callback generator that will generate a callback that will
652689
// assert that the keys are in sequential order.
653-
callbackGenerator := func() func(key string, _ roachpb.Value) {
690+
callbackGenerator := func() func(key string, _ roachpb.Value, _ int64) {
654691
// Add the number of updates to the wait group.
655692
wg.Add(numUpdates)
656693
// Initially, the expected next key is 0.
657694
expectedNextKey := 0
658695

659696
// Return a callback that will assert the key is in sequential order.
660-
return func(key string, _ roachpb.Value) {
697+
return func(key string, _ roachpb.Value, _ int64) {
661698
// Convert key to int and assert it matches the expected value.
662699
keyInt, err := strconv.Atoi(key)
663700
require.NoError(t, err)
@@ -672,7 +709,7 @@ func TestCallbacksCalledSequentially(t *testing.T) {
672709
// halfway through the test to assert that it doesn't impact the
673710
// sequential order of the other callbacks.
674711
is.registerCallback(".*", callbackGenerator())
675-
unregister := is.registerCallback(".*", func(key string, _ roachpb.Value) {})
712+
unregister := is.registerCallback(".*", func(key string, _ roachpb.Value, _ int64) {})
676713
is.registerCallback(".*", callbackGenerator())
677714

678715
for i := range numUpdates {
@@ -694,7 +731,7 @@ func BenchmarkCallbackParallelism(b *testing.B) {
694731
defer stopper.Stop(ctx)
695732
wg := &sync.WaitGroup{}
696733

697-
callback := func(key string, val roachpb.Value) {
734+
callback := func(key string, val roachpb.Value, _ int64) {
698735
// Sleep for a short duration to simulate work done in callback.
699736
time.Sleep(time.Millisecond)
700737
wg.Done()

pkg/gossip/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ func (s *server) start(addr net.Addr) {
396396
// We require redundant callbacks here as the broadcast callback is
397397
// propagating gossip infos to other nodes and needs to propagate the new
398398
// expiration info.
399-
unregister := s.mu.is.registerCallback(".*", func(_ string, _ roachpb.Value) {
399+
unregister := s.mu.is.registerCallback(".*", func(_ string, _ roachpb.Value, _ int64) {
400400
broadcast()
401401
}, Redundant)
402402

pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8829,7 +8829,7 @@ func TestAllocatorFullDisks(t *testing.T) {
88298829

88308830
var wg sync.WaitGroup
88318831
g.RegisterCallback(gossip.MakePrefixPattern(gossip.KeyStoreDescPrefix),
8832-
func(_ string, _ roachpb.Value) { wg.Done() },
8832+
func(_ string, _ roachpb.Value, _ int64) { wg.Done() },
88338833
// Redundant callbacks are required by this test.
88348834
gossip.Redundant)
88358835

pkg/kv/kvserver/allocator/storepool/store_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) redact.RedactableString {
482482
}
483483

484484
// storeGossipUpdate is the Gossip callback used to keep the StorePool up to date.
485-
func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value) {
485+
func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value, _ int64) {
486486
var storeDesc roachpb.StoreDescriptor
487487

488488
if err := content.GetProto(&storeDesc); err != nil {

pkg/kv/kvserver/client_split_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2817,7 +2817,7 @@ func TestStoreRangeGossipOnSplits(t *testing.T) {
28172817

28182818
var lastSD roachpb.StoreDescriptor
28192819
rangeCountCh := make(chan int32)
2820-
unregister := store.Gossip().RegisterCallback(storeKey, func(_ string, val roachpb.Value) {
2820+
unregister := store.Gossip().RegisterCallback(storeKey, func(_ string, val roachpb.Value, _ int64) {
28212821
var sd roachpb.StoreDescriptor
28222822
if err := val.GetProto(&sd); err != nil {
28232823
panic(err)

pkg/kv/kvserver/gossip_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestGossipFirstRange(t *testing.T) {
4040
descs := make(chan *roachpb.RangeDescriptor)
4141
unregister := tc.Servers[0].GossipI().(*gossip.Gossip).
4242
RegisterCallback(gossip.KeyFirstRangeDescriptor,
43-
func(_ string, content roachpb.Value) {
43+
func(_ string, content roachpb.Value, _ int64) {
4444
var desc roachpb.RangeDescriptor
4545
if err := content.GetProto(&desc); err != nil {
4646
select {

0 commit comments

Comments
 (0)