Skip to content

Commit 68602ca

Browse files
authored
Merge pull request #152467 from iskettaneh/backport25.2-144557
release-25.2: gossip: parallelize callback handling
2 parents c8a218a + 3872b5e commit 68602ca

File tree

7 files changed

+379
-100
lines changed

7 files changed

+379
-100
lines changed

pkg/gossip/gossip.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1565,6 +1565,66 @@ func (g *Gossip) findClient(match func(*client) bool) *client {
15651565
return nil
15661566
}
15671567

1568+
// TestingAddInfoProtoAndWaitForAllCallbacks adds an info proto, and waits for all
1569+
// matching callbacks to get called before returning. It's only intended to be
1570+
// used for tests that assert on the result of the gossip propagation.
1571+
func (g *Gossip) TestingAddInfoProtoAndWaitForAllCallbacks(
1572+
key string, msg protoutil.Message, ttl time.Duration,
1573+
) error {
1574+
// Take the lock to avoid races where a callback could be added while this
1575+
// method is waiting for matching callbacks to be called.
1576+
g.mu.Lock()
1577+
defer g.mu.Unlock()
1578+
1579+
wg := &sync.WaitGroup{}
1580+
1581+
// Increment the wait group once per matching callback. It will be decremented
1582+
// once the processing is complete.
1583+
for _, cb := range g.mu.is.callbacks {
1584+
if cb.matcher.MatchString(key) {
1585+
wg.Add(1)
1586+
}
1587+
}
1588+
1589+
// Add the target info to the infoStore. This will trigger the registered
1590+
// callbacks to be called.
1591+
bytes, err := protoutil.Marshal(msg)
1592+
if err != nil {
1593+
return err
1594+
}
1595+
if err := g.addInfoLocked(key, bytes, ttl); err != nil {
1596+
return err
1597+
}
1598+
1599+
// At this point, we know that the callbacks that will be called have been
1600+
// added to the work queues. Now, we can append an entry item at the end of
1601+
// the matching callback's work queue that will decrement the wait group that
1602+
// was incremented earlier. This ensures that ALL matching callbacks have
1603+
// been called.
1604+
for _, cb := range g.mu.is.callbacks {
1605+
if cb.matcher.MatchString(key) {
1606+
cb.cw.mu.Lock()
1607+
cb.cw.mu.workQueue = append(cb.cw.mu.workQueue, callbackWorkItem{
1608+
method: func(_ string, _ roachpb.Value) {
1609+
wg.Done()
1610+
},
1611+
schedulingTime: timeutil.Now(),
1612+
})
1613+
cb.cw.mu.Unlock()
1614+
}
1615+
1616+
// Make sure to notify the callback worker that there is work to do.
1617+
select {
1618+
case cb.cw.callbackCh <- struct{}{}:
1619+
default:
1620+
}
1621+
}
1622+
1623+
// Wait for all the callbacks to finish processing.
1624+
wg.Wait()
1625+
return nil
1626+
}
1627+
15681628
// A firstRangeMissingError indicates that the first range has not yet
15691629
// been gossiped. This will be the case for a node which hasn't yet
15701630
// joined the gossip network.

pkg/gossip/gossip_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,3 +1140,70 @@ func TestGossipBatching(t *testing.T) {
11401140
require.LessOrEqual(t, serverMessagesSentCount, upperBoundMessages)
11411141
require.LessOrEqual(t, clientMessagesSentCount, upperBoundMessages)
11421142
}
1143+
1144+
// TestCallbacksPendingMetricGoesToZeroOnStop verifies that the CallbacksPending
1145+
// metric is correctly decremented when a callback is unregistered with pending work
1146+
// or when the stopper is stopped.
1147+
func TestCallbacksPendingMetricGoesToZeroOnStop(t *testing.T) {
1148+
defer leaktest.AfterTest(t)()
1149+
1150+
testCases := []struct {
1151+
name string
1152+
cleanup func(g *Gossip, unregister func(), stopper *stop.Stopper, ctx context.Context)
1153+
}{
1154+
{
1155+
name: "unregister callback",
1156+
cleanup: func(g *Gossip, unregister func(), stopper *stop.Stopper, ctx context.Context) {
1157+
unregister()
1158+
},
1159+
},
1160+
{
1161+
name: "stopper shutdown",
1162+
cleanup: func(g *Gossip, unregister func(), stopper *stop.Stopper, ctx context.Context) {
1163+
stopper.Stop(ctx)
1164+
},
1165+
},
1166+
}
1167+
1168+
for _, tc := range testCases {
1169+
t.Run(tc.name, func(t *testing.T) {
1170+
ctx := context.Background()
1171+
stopper := stop.NewStopper()
1172+
defer stopper.Stop(ctx)
1173+
g := NewTest(1, stopper, metric.NewRegistry())
1174+
1175+
unregister := g.RegisterCallback("test.*", func(key string, val roachpb.Value) {
1176+
// Do nothing.
1177+
})
1178+
1179+
// Add 100 infos to the gossip that will be processed by the callback.
1180+
for i := 0; i < 100; i++ {
1181+
slice := []byte("b1")
1182+
require.NoError(t, g.AddInfo(fmt.Sprintf("test.key%d", i), slice, time.Hour))
1183+
}
1184+
1185+
// Execute the cleanup action (either unregister or stopper.Stop)
1186+
// We do this in a goroutine to help cause interesting potential race conditions.
1187+
go func() {
1188+
tc.cleanup(g, unregister, stopper, ctx)
1189+
}()
1190+
1191+
// Add another 100 infos to the gossip that will be processed by the callback.
1192+
// We do this in a goroutine to help cause interesting potential race conditions.
1193+
go func() {
1194+
for i := 0; i < 100; i++ {
1195+
slice := []byte("b2")
1196+
require.NoError(t, g.AddInfo(fmt.Sprintf("test.key%d", i), slice, time.Hour))
1197+
}
1198+
}()
1199+
1200+
// Wait for the pending callbacks metric to go to 0.
1201+
testutils.SucceedsSoon(t, func() error {
1202+
if g.mu.is.metrics.CallbacksPending.Value() != 0 {
1203+
return fmt.Errorf("CallbacksPending should be 0, got %d", g.mu.is.metrics.CallbacksPending.Value())
1204+
}
1205+
return nil
1206+
})
1207+
})
1208+
}
1209+
}

0 commit comments

Comments
 (0)