@@ -36,11 +36,76 @@ func (allMatcher) MatchString(string) bool {
3636 return true
3737}
3838
39- // callback holds regexp pattern match and GossipCallback method.
39+ // This is how multiple callback registrations are handled:
40+ /*
41+ +------------------+ +------------------+ +------------------+
42+ | Callback Reg 1 | | Callback Reg 2 | ... | Callback Reg N |
43+ | Pattern: "key1.*"| | Pattern: "key2.*"| ... | Pattern: "keyN.*"|
44+ +------------------+ +------------------+ +------------------+
45+ | | |
46+ v v v
47+ +------------------+ +------------------+ +------------------+
48+ | callbackWorker 1 | | callbackWorker 2 | ... | callbackWorker N |
49+ | - callbackCh | | - callbackCh | ... | - callbackCh |
50+ | - stopperCh | | - stopperCh | ... | - stopperCh |
51+ | - workQueue | | - workQueue | ... | - workQueue |
52+ +------------------+ +------------------+ +------------------+
53+ | | |
54+ v v v
55+ +------------------+ +------------------+ +------------------+
56+ | Worker Goroutine | | Worker Goroutine | ... | Worker Goroutine |
57+ | 1 | | 2 | ... | N |
58+ +------------------+ +------------------+ +------------------+
59+ */
60+ // When a new info is added, it is checked against all the callback matchers.
61+ // The work is added inside each workQueue which matches the info key.
62+ // Each worker goroutine independently processes its own workQueue in FIFO
63+ // order.
64+
65+ // callback holds regexp pattern match, GossipCallback method, and a queue of
66+ // remaining work items.
4067type callback struct {
4168 matcher stringMatcher
4269 method Callback
4370 redundant bool
71+ // cw contains all the information needed to orchestrate, schedule, and run
72+ // callbacks for this specific matcher.
73+ cw * callbackWork
74+ }
75+
76+ // callbackWorkItem is a struct that contains the information needed to run
77+ // a callback.
78+ type callbackWorkItem struct {
79+ // schedulingTime is the time when the callback was scheduled.
80+ schedulingTime time.Time
81+ method Callback
82+ key string
83+ content roachpb.Value
84+ }
85+
86+ type callbackWork struct {
87+ // callbackCh is used to signal the callback worker to execute the work.
88+ callbackCh chan struct {}
89+ // stopperCh is used to signal the callback worker to stop.
90+ stopperCh chan struct {}
91+ mu struct {
92+ syncutil.Mutex
93+ // workQueue contains the queue of callbacks that need to be called.
94+ workQueue []callbackWorkItem
95+ }
96+ }
97+
98+ func newCallbackWork () * callbackWork {
99+ return & callbackWork {
100+ callbackCh : make (chan struct {}, 1 ),
101+ stopperCh : make (chan struct {}, 1 ),
102+ mu : struct {
103+ syncutil.Mutex
104+ workQueue []callbackWorkItem
105+ }{
106+ workQueue : make ([]callbackWorkItem , 0 ),
107+ },
108+ }
44109}
45110
46111// infoStore objects manage maps of Info objects. They maintain a
@@ -64,10 +129,6 @@ type infoStore struct {
64129 NodeAddr util.UnresolvedAddr `json:"-"` // Address of node owning this info store: "host:port"
65130 highWaterStamps map [roachpb.NodeID ]int64 // Per-node information for gossip peers
66131 callbacks []* callback
67-
68- callbackWorkMu syncutil.Mutex // Protects callbackWork
69- callbackWork []func ()
70- callbackCh chan struct {} // Channel to signal the callback goroutine
71132}
72133
73134var monoTime struct {
@@ -167,34 +228,58 @@ func newInfoStore(
167228 Infos : make (infoMap ),
168229 NodeAddr : nodeAddr ,
169230 highWaterStamps : map [roachpb.NodeID ]int64 {},
170- callbackCh : make (chan struct {}, 1 ),
171231 }
232+ return is
233+ }
172234
235+ // launchCallbackWorker launches a worker goroutine that is responsible for
236+ // executing callbacks for one registered callback pattern.
237+ func (is * infoStore ) launchCallbackWorker (ambient log.AmbientContext , cw * callbackWork ) {
173238 bgCtx := ambient .AnnotateCtx (context .Background ())
174- _ = is .stopper .RunAsyncTask (bgCtx , "infostore " , func (ctx context.Context ) {
239+ _ = is .stopper .RunAsyncTask (bgCtx , "callback worker " , func (ctx context.Context ) {
175240 for {
176241 for {
177- is . callbackWorkMu .Lock ()
178- work := is . callbackWork
179- is . callbackWork = nil
180- is . callbackWorkMu .Unlock ()
242+ cw . mu .Lock ()
243+ wq := cw . mu . workQueue
244+ cw . mu . workQueue = nil
245+ cw . mu .Unlock ()
181246
182- if len (work ) == 0 {
247+ if len (wq ) == 0 {
183248 break
184249 }
185- for _ , w := range work {
186- w ()
250+
251+ // Execute all the callbacks in the queue, making sure to update the
252+ // metrics accordingly.
253+ for _ , work := range wq {
254+ afterQueue := timeutil .Now ()
255+ queueDur := afterQueue .Sub (work .schedulingTime )
256+ is .metrics .CallbacksPending .Dec (1 )
257+ if queueDur >= minCallbackDurationToRecord {
258+ is .metrics .CallbacksPendingDuration .RecordValue (queueDur .Nanoseconds ())
259+ }
260+
261+ work .method (work .key , work .content )
262+
263+ afterProcess := timeutil .Now ()
264+ processDur := afterProcess .Sub (afterQueue )
265+ is .metrics .CallbacksProcessed .Inc (1 )
266+ if processDur > minCallbackDurationToRecord {
267+ is .metrics .CallbacksProcessingDuration .RecordValue (processDur .Nanoseconds ())
268+ }
269+ afterQueue = afterProcess // update for next iteration
187270 }
188271 }
189272
190273 select {
191- case <- is .callbackCh :
274+ case <- cw .callbackCh :
275+ // New work has just arrived.
192276 case <- is .stopper .ShouldQuiesce ():
193277 return
278+ case <- cw .stopperCh :
279+ return
194280 }
195281 }
196282 })
197- return is
198283}
199284
200285// newInfo allocates and returns a new info object using the specified
@@ -312,17 +397,24 @@ func (is *infoStore) registerCallback(
312397 opt .apply (cb )
313398 }
314399
400+ cb .cw = newCallbackWork ()
401+ is .launchCallbackWorker (is .AmbientContext , cb .cw )
315402 is .callbacks = append (is .callbacks , cb )
403+
316404 if err := is .visitInfos (func (key string , i * Info ) error {
317405 if matcher .MatchString (key ) {
318- is .runCallbacks (key , i .Value , method )
406+ is .runCallbacks (key , i .Value , cb )
319407 }
320408 return nil
321409 }, true /* deleteExpired */ ); err != nil {
322410 panic (err )
323411 }
324412
325413 return func () {
414+ // Stop the callback worker's goroutine.
415+ cb .cw .stopperCh <- struct {}{}
416+
417+ // Remove the callback from the list.
326418 for i , targetCB := range is .callbacks {
327419 if targetCB == cb {
328420 numCBs := len (is .callbacks )
@@ -339,51 +431,41 @@ func (is *infoStore) registerCallback(
339431// matching each callback's regular expression against the key and invoking
340432// the corresponding callback method on a match.
341433func (is * infoStore ) processCallbacks (key string , content roachpb.Value , changed bool ) {
342- var matches []Callback
434+ var callbacks []* callback
343435 for _ , cb := range is .callbacks {
344436 if (changed || cb .redundant ) && cb .matcher .MatchString (key ) {
345- matches = append (matches , cb . method )
437+ callbacks = append (callbacks , cb )
346438 }
347439 }
348- is .runCallbacks (key , content , matches ... )
440+ is .runCallbacks (key , content , callbacks ... )
349441}
350442
351- func (is * infoStore ) runCallbacks (key string , content roachpb.Value , callbacks ... Callback ) {
443+ // runCallbacks receives a list of callbacks and contents that match the key.
444+ // It adds work to the callback work slices, and signals the associated callback
445+ // workers to execute the work.
446+ func (is * infoStore ) runCallbacks (key string , content roachpb.Value , callbacks ... * callback ) {
352447 // Add the callbacks to the callback work list.
353448 beforeQueue := timeutil .Now ()
354449 is .metrics .CallbacksPending .Inc (int64 (len (callbacks )))
355- f := func () {
356- afterQueue := timeutil .Now ()
357- for _ , method := range callbacks {
358- queueDur := afterQueue .Sub (beforeQueue )
359- is .metrics .CallbacksPending .Dec (1 )
360- if queueDur >= minCallbackDurationToRecord {
361- is .metrics .CallbacksPendingDuration .RecordValue (queueDur .Nanoseconds ())
362- }
363-
364- method (key , content )
365-
366- afterProcess := timeutil .Now ()
367- processDur := afterProcess .Sub (afterQueue )
368- is .metrics .CallbacksProcessed .Inc (1 )
369- if processDur > minCallbackDurationToRecord {
370- is .metrics .CallbacksProcessingDuration .RecordValue (processDur .Nanoseconds ())
371- }
372- afterQueue = afterProcess // update for next iteration
450+ for _ , cb := range callbacks {
451+ cb .cw .mu .Lock ()
452+ cb .cw .mu .workQueue = append (cb .cw .mu .workQueue , callbackWorkItem {
453+ schedulingTime : beforeQueue ,
454+ method : cb .method ,
455+ key : key ,
456+ content : content ,
457+ })
458+ cb .cw .mu .Unlock ()
459+
460+ // Signal the associated callback worker goroutine. Callbacks run in a
461+ // goroutine to avoid mutex reentry. We also guarantee callbacks are run
462+ // in order such that if a key is updated twice in succession, the second
463+ // callback will never be run before the first.
464+ select {
465+ case cb .cw .callbackCh <- struct {}{}:
466+ default :
373467 }
374468 }
375- is .callbackWorkMu .Lock ()
376- is .callbackWork = append (is .callbackWork , f )
377- is .callbackWorkMu .Unlock ()
378-
379- // Signal the callback goroutine. Callbacks run in a goroutine to avoid mutex
380- // reentry. We also guarantee callbacks are run in order such that if a key
381- // is updated twice in succession, the second callback will never be run
382- // before the first.
383- select {
384- case is .callbackCh <- struct {}{}:
385- default :
386- }
387469}
388470
389471// visitInfos implements a visitor pattern to run the visitInfo function against
0 commit comments