Skip to content

Commit dba8748

Browse files
committed
gossip: parallelize callback handling
This commit introduces parallel callback handling in the gossip package by: 1. Moving callback work management from a single global queue to per-callback-registration work queues. 2. Launching a dedicated worker goroutine for each callback registration. 3. Maintaining the guarantee that callbacks for the same key are executed sequentially. This only applies per callback registration, and there are no ordering guarantees between different callback registrations. The changes improve performance by allowing different callbacks to execute concurrently while preserving the ordering guarantees. Each registration now has its own worker goroutine, channel for signaling new work, and mutex-protected work queue. Also, it changes the way the storeGossiper used to wait for gossip to propagate. The storeGossiper used to subscribe to store updates, and then it used to rely on the single threaded execution of gossip callbacks to wait for its callback to get called, which means that all other callbacks have been called since the storeGossiper subscribed later. However, with parallel callback execution across different subscriptions, we needed to introduce a more robust way to ensure the propagation of the gossiped info. We do this by introducing a new gossip method that creates a WaitGroup, incrementing it for every callback match. the WaitGroup will get decremented at the end of callbacks execution, ensuring that our update has definitely been propagated. Benchmark results before/after: ``` name old time/op new time/op delta CallbackParallelism-12 5.69ms ± 0% 1.15ms ± 0% -79.87% (p=0.000 n=14+12) name old alloc/op new alloc/op delta CallbackParallelism-12 696B ±68% 897B ±12% +28.83% (p=0.020 n=15+15) name old allocs/op new allocs/op delta CallbackParallelism-12 10.0 ± 0% 13.0 ± 0% +30.00% (p=0.000 n=15+12) ``` Fixes: #144187 Release note: None
1 parent 5ae5230 commit dba8748

File tree

6 files changed

+201
-101
lines changed

6 files changed

+201
-101
lines changed

pkg/gossip/gossip.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1565,6 +1565,66 @@ func (g *Gossip) findClient(match func(*client) bool) *client {
15651565
return nil
15661566
}
15671567

1568+
// TestingAddInfoProtoAndWaitForAllCallbacks adds an info proto, and waits for all
1569+
// matching callbacks to get called before returning. It's only intended to be
1570+
// used for tests that assert on the result of the gossip propagation.
1571+
func (g *Gossip) TestingAddInfoProtoAndWaitForAllCallbacks(
1572+
key string, msg protoutil.Message, ttl time.Duration,
1573+
) error {
1574+
// Take the lock to avoid races where a callback could be added while this
1575+
// method is waiting for matching callbacks to be called.
1576+
g.mu.Lock()
1577+
defer g.mu.Unlock()
1578+
1579+
wg := &sync.WaitGroup{}
1580+
1581+
// Increment the wait group once per matching callback. It will be decremented
1582+
// once the processing is complete.
1583+
for _, cb := range g.mu.is.callbacks {
1584+
if cb.matcher.MatchString(key) {
1585+
wg.Add(1)
1586+
}
1587+
}
1588+
1589+
// Add the target info to the infoStore. This will trigger the registered
1590+
// callbacks to be called.
1591+
bytes, err := protoutil.Marshal(msg)
1592+
if err != nil {
1593+
return err
1594+
}
1595+
if err := g.addInfoLocked(key, bytes, ttl); err != nil {
1596+
return err
1597+
}
1598+
1599+
// At this point, we know that the callbacks that will be called have been
1600+
// added to the work queues. Now, we can append an entry item at the end of
1601+
// the matching callback's work queue that will decrement the wait group that
1602+
// was incremented earlier. This ensures that ALL matching callbacks have
1603+
// been called.
1604+
for _, cb := range g.mu.is.callbacks {
1605+
if cb.matcher.MatchString(key) {
1606+
cb.cw.mu.Lock()
1607+
cb.cw.mu.workQueue = append(cb.cw.mu.workQueue, callbackWorkItem{
1608+
method: func(_ string, _ roachpb.Value) {
1609+
wg.Done()
1610+
},
1611+
schedulingTime: timeutil.Now(),
1612+
})
1613+
cb.cw.mu.Unlock()
1614+
}
1615+
1616+
// Make sure to notify the callback worker that there is work to do.
1617+
select {
1618+
case cb.cw.callbackCh <- struct{}{}:
1619+
default:
1620+
}
1621+
}
1622+
1623+
// Wait for all the callbacks to finish processing.
1624+
wg.Wait()
1625+
return nil
1626+
}
1627+
15681628
// A firstRangeMissingError indicates that the first range has not yet
15691629
// been gossiped. This will be the case for a node which hasn't yet
15701630
// joined the gossip network.

pkg/gossip/infostore.go

Lines changed: 133 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,76 @@ func (allMatcher) MatchString(string) bool {
3636
return true
3737
}
3838

39-
// callback holds regexp pattern match and GossipCallback method.
39+
// This is how multiple callback registrations are handled:
40+
/*
41+
+------------------+ +------------------+ +------------------+
42+
| Callback Reg 1 | | Callback Reg 2 | ... | Callback Reg N |
43+
| Pattern: "key1.*"| | Pattern: "key2.*"| ... | Pattern: "keyN.*"|
44+
+------------------+ +------------------+ +------------------+
45+
| | |
46+
v v v
47+
+------------------+ +------------------+ +------------------+
48+
| callbackWorker 1 | | callbackWorker 2 | ... | callbackWorker N |
49+
| - callbackCh | | - callbackCh | ... | - callbackCh |
50+
| - stopperCh | | - stopperCh | ... | - stopperCh |
51+
| - workQueue | | - workQueue | ... | - workQueue |
52+
+------------------+ +------------------+ +------------------+
53+
| | |
54+
v v v
55+
+------------------+ +------------------+ +------------------+
56+
| Worker Goroutine | | Worker Goroutine | ... | Worker Goroutine |
57+
| 1 | | 2 | ... | N |
58+
+------------------+ +------------------+ +------------------+
59+
*/
60+
// When a new info is added, it is checked against all the callback matchers.
61+
// The work is added inside each workQueue which matches the info key.
62+
// Each worker goroutine independently processes its own workQueue in FIFO
63+
// order.
64+
65+
// callback holds regexp pattern match, GossipCallback method, and a queue of
66+
// remaining work items.
4067
type callback struct {
4168
matcher stringMatcher
4269
method Callback
4370
redundant bool
71+
// cw contains all the information needed to orchestrate, schedule, and run
72+
// callbacks for this specific matcher.
73+
cw *callbackWork
74+
}
75+
76+
// callbackWorkItem is a struct that contains the information needed to run
77+
// a callback.
78+
type callbackWorkItem struct {
79+
// schedulingTime is the time when the callback was scheduled.
80+
schedulingTime time.Time
81+
method Callback
82+
key string
83+
content roachpb.Value
84+
}
85+
86+
type callbackWork struct {
87+
// callbackCh is used to signal the callback worker to execute the work.
88+
callbackCh chan struct{}
89+
// stopperCh is used to signal the callback worker to stop.
90+
stopperCh chan struct{}
91+
mu struct {
92+
syncutil.Mutex
93+
// workQueue contains the queue of callbacks that need to be called.
94+
workQueue []callbackWorkItem
95+
}
96+
}
97+
98+
func newCallbackWork() *callbackWork {
99+
return &callbackWork{
100+
callbackCh: make(chan struct{}, 1),
101+
stopperCh: make(chan struct{}, 1),
102+
mu: struct {
103+
syncutil.Mutex
104+
workQueue []callbackWorkItem
105+
}{
106+
workQueue: make([]callbackWorkItem, 0),
107+
},
108+
}
44109
}
45110

46111
// infoStore objects manage maps of Info objects. They maintain a
@@ -64,10 +129,6 @@ type infoStore struct {
64129
NodeAddr util.UnresolvedAddr `json:"-"` // Address of node owning this info store: "host:port"
65130
highWaterStamps map[roachpb.NodeID]int64 // Per-node information for gossip peers
66131
callbacks []*callback
67-
68-
callbackWorkMu syncutil.Mutex // Protects callbackWork
69-
callbackWork []func()
70-
callbackCh chan struct{} // Channel to signal the callback goroutine
71132
}
72133

73134
var monoTime struct {
@@ -167,34 +228,58 @@ func newInfoStore(
167228
Infos: make(infoMap),
168229
NodeAddr: nodeAddr,
169230
highWaterStamps: map[roachpb.NodeID]int64{},
170-
callbackCh: make(chan struct{}, 1),
171231
}
232+
return is
233+
}
172234

235+
// launchCallbackWorker launches a worker goroutine that is responsible for
236+
// executing callbacks for one registered callback pattern.
237+
func (is *infoStore) launchCallbackWorker(ambient log.AmbientContext, cw *callbackWork) {
173238
bgCtx := ambient.AnnotateCtx(context.Background())
174-
_ = is.stopper.RunAsyncTask(bgCtx, "infostore", func(ctx context.Context) {
239+
_ = is.stopper.RunAsyncTask(bgCtx, "callback worker", func(ctx context.Context) {
175240
for {
176241
for {
177-
is.callbackWorkMu.Lock()
178-
work := is.callbackWork
179-
is.callbackWork = nil
180-
is.callbackWorkMu.Unlock()
242+
cw.mu.Lock()
243+
wq := cw.mu.workQueue
244+
cw.mu.workQueue = nil
245+
cw.mu.Unlock()
181246

182-
if len(work) == 0 {
247+
if len(wq) == 0 {
183248
break
184249
}
185-
for _, w := range work {
186-
w()
250+
251+
// Execute all the callbacks in the queue, making sure to update the
252+
// metrics accordingly.
253+
for _, work := range wq {
254+
afterQueue := timeutil.Now()
255+
queueDur := afterQueue.Sub(work.schedulingTime)
256+
is.metrics.CallbacksPending.Dec(1)
257+
if queueDur >= minCallbackDurationToRecord {
258+
is.metrics.CallbacksPendingDuration.RecordValue(queueDur.Nanoseconds())
259+
}
260+
261+
work.method(work.key, work.content)
262+
263+
afterProcess := timeutil.Now()
264+
processDur := afterProcess.Sub(afterQueue)
265+
is.metrics.CallbacksProcessed.Inc(1)
266+
if processDur > minCallbackDurationToRecord {
267+
is.metrics.CallbacksProcessingDuration.RecordValue(processDur.Nanoseconds())
268+
}
269+
afterQueue = afterProcess // update for next iteration
187270
}
188271
}
189272

190273
select {
191-
case <-is.callbackCh:
274+
case <-cw.callbackCh:
275+
// New work has just arrived.
192276
case <-is.stopper.ShouldQuiesce():
193277
return
278+
case <-cw.stopperCh:
279+
return
194280
}
195281
}
196282
})
197-
return is
198283
}
199284

200285
// newInfo allocates and returns a new info object using the specified
@@ -312,17 +397,24 @@ func (is *infoStore) registerCallback(
312397
opt.apply(cb)
313398
}
314399

400+
cb.cw = newCallbackWork()
401+
is.launchCallbackWorker(is.AmbientContext, cb.cw)
315402
is.callbacks = append(is.callbacks, cb)
403+
316404
if err := is.visitInfos(func(key string, i *Info) error {
317405
if matcher.MatchString(key) {
318-
is.runCallbacks(key, i.Value, method)
406+
is.runCallbacks(key, i.Value, cb)
319407
}
320408
return nil
321409
}, true /* deleteExpired */); err != nil {
322410
panic(err)
323411
}
324412

325413
return func() {
414+
// Stop the callback worker's goroutine.
415+
cb.cw.stopperCh <- struct{}{}
416+
417+
// Remove the callback from the list.
326418
for i, targetCB := range is.callbacks {
327419
if targetCB == cb {
328420
numCBs := len(is.callbacks)
@@ -339,51 +431,41 @@ func (is *infoStore) registerCallback(
339431
// matching each callback's regular expression against the key and invoking
340432
// the corresponding callback method on a match.
341433
func (is *infoStore) processCallbacks(key string, content roachpb.Value, changed bool) {
342-
var matches []Callback
434+
var callbacks []*callback
343435
for _, cb := range is.callbacks {
344436
if (changed || cb.redundant) && cb.matcher.MatchString(key) {
345-
matches = append(matches, cb.method)
437+
callbacks = append(callbacks, cb)
346438
}
347439
}
348-
is.runCallbacks(key, content, matches...)
440+
is.runCallbacks(key, content, callbacks...)
349441
}
350442

351-
func (is *infoStore) runCallbacks(key string, content roachpb.Value, callbacks ...Callback) {
443+
// runCallbacks receives a list of callbacks and contents that match the key.
444+
// It adds work to the callback work slices, and signals the associated callback
445+
// workers to execute the work.
446+
func (is *infoStore) runCallbacks(key string, content roachpb.Value, callbacks ...*callback) {
352447
// Add the callbacks to the callback work list.
353448
beforeQueue := timeutil.Now()
354449
is.metrics.CallbacksPending.Inc(int64(len(callbacks)))
355-
f := func() {
356-
afterQueue := timeutil.Now()
357-
for _, method := range callbacks {
358-
queueDur := afterQueue.Sub(beforeQueue)
359-
is.metrics.CallbacksPending.Dec(1)
360-
if queueDur >= minCallbackDurationToRecord {
361-
is.metrics.CallbacksPendingDuration.RecordValue(queueDur.Nanoseconds())
362-
}
363-
364-
method(key, content)
365-
366-
afterProcess := timeutil.Now()
367-
processDur := afterProcess.Sub(afterQueue)
368-
is.metrics.CallbacksProcessed.Inc(1)
369-
if processDur > minCallbackDurationToRecord {
370-
is.metrics.CallbacksProcessingDuration.RecordValue(processDur.Nanoseconds())
371-
}
372-
afterQueue = afterProcess // update for next iteration
450+
for _, cb := range callbacks {
451+
cb.cw.mu.Lock()
452+
cb.cw.mu.workQueue = append(cb.cw.mu.workQueue, callbackWorkItem{
453+
schedulingTime: beforeQueue,
454+
method: cb.method,
455+
key: key,
456+
content: content,
457+
})
458+
cb.cw.mu.Unlock()
459+
460+
// Signal the associated callback worker goroutine. Callbacks run in a
461+
// goroutine to avoid mutex reentry. We also guarantee callbacks are run
462+
// in order such that if a key is updated twice in succession, the second
463+
// callback will never be run before the first.
464+
select {
465+
case cb.cw.callbackCh <- struct{}{}:
466+
default:
373467
}
374468
}
375-
is.callbackWorkMu.Lock()
376-
is.callbackWork = append(is.callbackWork, f)
377-
is.callbackWorkMu.Unlock()
378-
379-
// Signal the callback goroutine. Callbacks run in a goroutine to avoid mutex
380-
// reentry. We also guarantee callbacks are run in order such that if a key
381-
// is updated twice in succession, the second callback will never be run
382-
// before the first.
383-
select {
384-
case is.callbackCh <- struct{}{}:
385-
default:
386-
}
387469
}
388470

389471
// visitInfos implements a visitor pattern to run the visitInfo function against

pkg/gossip/infostore_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -688,8 +688,6 @@ func TestCallbacksCalledSequentially(t *testing.T) {
688688
// BenchmarkCallbackParallelism benchmarks the parallelism of the callback
689689
// worker. It registers multiple callbacks, and executes a fake workload
690690
// that sleeps for a short duration to simulate work done in the callback.
691-
// If we implement a parallel execution of the callback workers, we should
692-
// see a significant speedup.
693691
func BenchmarkCallbackParallelism(b *testing.B) {
694692
ctx := context.Background()
695693
is, stopper := newTestInfoStore()

pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9272,10 +9272,6 @@ func exampleRebalancing(
92729272
}, nil)
92739273

92749274
var wg sync.WaitGroup
9275-
g.RegisterCallback(gossip.MakePrefixPattern(gossip.KeyStoreDescPrefix),
9276-
func(_ string, _ roachpb.Value) { wg.Done() },
9277-
// Redundant callbacks are required by this test.
9278-
gossip.Redundant)
92799275

92809276
// Initialize testStores.
92819277
initTestStores(
@@ -9298,13 +9294,12 @@ func exampleRebalancing(
92989294
const generations = 100
92999295
for i := 0; i < generations; i++ {
93009296
// First loop through test stores and add data.
9301-
wg.Add(len(testStores))
93029297
for j := 0; j < len(testStores); j++ {
93039298
// Add a pretend range to the testStore if there's already one.
93049299
if testStores[j].Capacity.RangeCount > 0 {
93059300
testStores[j].add(alloc.randGen.Int63n(1<<20), 0)
93069301
}
9307-
if err := g.AddInfoProto(
9302+
if err := g.TestingAddInfoProtoAndWaitForAllCallbacks(
93089303
gossip.MakeStoreDescKey(roachpb.StoreID(j)),
93099304
&testStores[j].StoreDescriptor,
93109305
0,

pkg/testutils/gossiputil/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,5 @@ go_library(
99
deps = [
1010
"//pkg/gossip",
1111
"//pkg/roachpb",
12-
"//pkg/util/syncutil",
1312
],
1413
)

0 commit comments

Comments
 (0)