Skip to content

Commit d487716

Browse files
committed
gossip: fix pending callbacks not dropping to zero
We have seen cases before where there was an issue with the pending callbacks metric that it's stuck at some non-zero number. This commit fixes that by: 1) If the infoStore stopper() is quiescing, we should not add more items to the work queue, because they could not be processed, and the pending callbacks metrics won't get decremented. 2) Only increment the pending callbacks metric when we actually add items to the work queue, and not before. Otherwise, there could be a race condition where we increment the pending callbacks metric, and then the infostore Stopper gets quiesced before we added any items to the work queue. This means that we won't decrement the pending callbacks metric. 3) When the callback gets cancelled, or when the infostore stopper gets quiesced, we decrement the pending callbacks metric by the number of items in the work queue. Otherwise, we might not decrement the number of pending callbacks. Fixes: #147982 Release note: None
1 parent 44853b2 commit d487716

File tree

2 files changed

+92
-1
lines changed

2 files changed

+92
-1
lines changed

pkg/gossip/gossip_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,3 +1140,70 @@ func TestGossipBatching(t *testing.T) {
11401140
require.LessOrEqual(t, serverMessagesSentCount, upperBoundMessages)
11411141
require.LessOrEqual(t, clientMessagesSentCount, upperBoundMessages)
11421142
}
1143+
1144+
// TestCallbacksPendingMetricGoesToZeroOnStop verifies that the CallbacksPending
1145+
// metric is correctly decremented when a callback is unregistered with pending work
1146+
// or when the stopper is stopped.
1147+
func TestCallbacksPendingMetricGoesToZeroOnStop(t *testing.T) {
1148+
defer leaktest.AfterTest(t)()
1149+
1150+
testCases := []struct {
1151+
name string
1152+
cleanup func(g *Gossip, unregister func(), stopper *stop.Stopper, ctx context.Context)
1153+
}{
1154+
{
1155+
name: "unregister callback",
1156+
cleanup: func(g *Gossip, unregister func(), stopper *stop.Stopper, ctx context.Context) {
1157+
unregister()
1158+
},
1159+
},
1160+
{
1161+
name: "stopper shutdown",
1162+
cleanup: func(g *Gossip, unregister func(), stopper *stop.Stopper, ctx context.Context) {
1163+
stopper.Stop(ctx)
1164+
},
1165+
},
1166+
}
1167+
1168+
for _, tc := range testCases {
1169+
t.Run(tc.name, func(t *testing.T) {
1170+
ctx := context.Background()
1171+
stopper := stop.NewStopper()
1172+
defer stopper.Stop(ctx)
1173+
g := NewTest(1, stopper, metric.NewRegistry())
1174+
1175+
unregister := g.RegisterCallback("test.*", func(key string, val roachpb.Value) {
1176+
// Do nothing.
1177+
})
1178+
1179+
// Add 100 infos to the gossip that will be processed by the callback.
1180+
for i := 0; i < 100; i++ {
1181+
slice := []byte("b1")
1182+
require.NoError(t, g.AddInfo(fmt.Sprintf("test.key%d", i), slice, time.Hour))
1183+
}
1184+
1185+
// Execute the cleanup action (either unregister or stopper.Stop)
1186+
// We do this in a goroutine to help cause interesting potential race conditions.
1187+
go func() {
1188+
tc.cleanup(g, unregister, stopper, ctx)
1189+
}()
1190+
1191+
// Add another 100 infos to the gossip that will be processed by the callback.
1192+
// We do this in a goroutine to help cause interesting potential race conditions.
1193+
go func() {
1194+
for i := 0; i < 100; i++ {
1195+
slice := []byte("b2")
1196+
require.NoError(t, g.AddInfo(fmt.Sprintf("test.key%d", i), slice, time.Hour))
1197+
}
1198+
}()
1199+
1200+
// Wait for the pending callbacks metric to go to 0.
1201+
testutils.SucceedsSoon(t, func() error {
1202+
if g.mu.is.metrics.CallbacksPending.Value() != 0 {
1203+
return fmt.Errorf("CallbacksPending should be 0, got %d", g.mu.is.metrics.CallbacksPending.Value())
1204+
}
1205+
return nil
1206+
})
1207+
})
1208+
}
1209+
}

pkg/gossip/infostore.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,27 @@ func newInfoStore(
232232
return is
233233
}
234234

235+
// cleanupCallbackMetric decrements the callback metric by the number of remaining
236+
// items in the queue since they will never be processed. This is called when the
237+
// callback worker is stopped to avoid having the wrong metric value.
238+
// This should only be called when no new work can be added to the queue.
239+
func (is *infoStore) cleanupCallbackMetric(cw *callbackWork) {
240+
cw.mu.Lock()
241+
remainingItems := len(cw.mu.workQueue)
242+
if remainingItems > 0 {
243+
is.metrics.CallbacksPending.Dec(int64(remainingItems))
244+
}
245+
cw.mu.Unlock()
246+
}
247+
235248
// launchCallbackWorker launches a worker goroutine that is responsible for
236249
// executing callbacks for one registered callback pattern.
237250
func (is *infoStore) launchCallbackWorker(ambient log.AmbientContext, cw *callbackWork) {
238251
bgCtx := ambient.AnnotateCtx(context.Background())
239252
_ = is.stopper.RunAsyncTask(bgCtx, "callback worker", func(ctx context.Context) {
253+
// If we exit the loop, we are never going to process the work in the queues anymore, so
254+
// clean up the pending callbacks metric.
255+
defer is.cleanupCallbackMetric(cw)
240256
for {
241257
for {
242258
cw.mu.Lock()
@@ -444,11 +460,19 @@ func (is *infoStore) processCallbacks(key string, content roachpb.Value, changed
444460
// It adds work to the callback work slices, and signals the associated callback
445461
// workers to execute the work.
446462
func (is *infoStore) runCallbacks(key string, content roachpb.Value, callbacks ...*callback) {
463+
// Check if the stopper is quiescing. If so, do not add the callbacks to the
464+
// callback work list because they won't be processed anyways.
465+
select {
466+
case <-is.stopper.ShouldQuiesce():
467+
return
468+
default:
469+
}
470+
447471
// Add the callbacks to the callback work list.
448472
beforeQueue := timeutil.Now()
449-
is.metrics.CallbacksPending.Inc(int64(len(callbacks)))
450473
for _, cb := range callbacks {
451474
cb.cw.mu.Lock()
475+
is.metrics.CallbacksPending.Inc(1)
452476
cb.cw.mu.workQueue = append(cb.cw.mu.workQueue, callbackWorkItem{
453477
schedulingTime: beforeQueue,
454478
method: cb.method,

0 commit comments

Comments
 (0)