Skip to content

Commit b6eabd4

Browse files
creachadairtomhjp
authored andcommitted
util/eventbus: allow logging of slow subscribers (tailscale#17705)
Add options to the eventbus.Bus to plumb in a logger. Route that logger in to the subscriber machinery, and trigger a log message to it when a subscriber fails to respond to its delivered events for 5s or more. The log message includes the package, filename, and line number of the call site that created the subscription. Add tests that verify this works. Updates tailscale#17680 Change-Id: I0546516476b1e13e6a9cf79f19db2fe55e56c698 Signed-off-by: M. J. Fromberger <[email protected]> (cherry picked from commit 061e626)
1 parent 6e2f2bb commit b6eabd4

File tree

10 files changed

+185
-13
lines changed

10 files changed

+185
-13
lines changed

flake.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,5 +151,5 @@
151151
});
152152
};
153153
}
154-
# nix-direnv cache busting line: sha256-rV3C2Vi48FCifGt58OdEO4+Av0HRIs8sUJVvp/gEBLw=
154+
# nix-direnv cache busting line: sha256-AUOjLomba75qfzb9Vxc0Sktyeces6hBSuOMgboWcDnE=
155155

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ require (
4242
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
4343
github.com/golang/snappy v0.0.4
4444
github.com/golangci/golangci-lint v1.57.1
45-
github.com/google/go-cmp v0.6.0
45+
github.com/google/go-cmp v0.7.0
4646
github.com/google/go-containerregistry v0.20.3
4747
github.com/google/go-tpm v0.9.4
4848
github.com/google/gopacket v1.1.19

go.mod.sri

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sha256-rV3C2Vi48FCifGt58OdEO4+Av0HRIs8sUJVvp/gEBLw=
1+
sha256-AUOjLomba75qfzb9Vxc0Sktyeces6hBSuOMgboWcDnE=

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -490,8 +490,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
490490
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
491491
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
492492
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
493-
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
494-
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
493+
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
494+
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
495495
github.com/google/go-containerregistry v0.20.3 h1:oNx7IdTI936V8CQRveCjaxOiegWwvM7kqkbXTpyiovI=
496496
github.com/google/go-containerregistry v0.20.3/go.mod h1:w00pIgBRDVUDFM6bq+Qx8lwNWK+cxgCuX1vd3PIBDNI=
497497
github.com/google/go-github/v66 v66.0.0 h1:ADJsaXj9UotwdgK8/iFZtv7MLc8E8WBl62WLd/D/9+M=

shell.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616
) {
1717
src = ./.;
1818
}).shellNix
19-
# nix-direnv cache busting line: sha256-rV3C2Vi48FCifGt58OdEO4+Av0HRIs8sUJVvp/gEBLw=
19+
# nix-direnv cache busting line: sha256-AUOjLomba75qfzb9Vxc0Sktyeces6hBSuOMgboWcDnE=

util/eventbus/bus.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ package eventbus
55

66
import (
77
"context"
8+
"log"
89
"reflect"
910
"slices"
1011
"sync"
1112

13+
"tailscale.com/types/logger"
1214
"tailscale.com/util/set"
1315
)
1416

@@ -30,6 +32,7 @@ type Bus struct {
3032
write chan PublishedEvent
3133
snapshot chan chan []PublishedEvent
3234
routeDebug hook[RoutedEvent]
35+
logf logger.Logf
3336

3437
topicsMu sync.Mutex
3538
topics map[reflect.Type][]*subscribeState
@@ -40,19 +43,42 @@ type Bus struct {
4043
clients set.Set[*Client]
4144
}
4245

43-
// New returns a new bus. Use [Publish] to make event publishers,
44-
// and [Subscribe] and [SubscribeFunc] to make event subscribers.
45-
func New() *Bus {
46+
// New returns a new bus with default options. It is equivalent to
47+
// calling [NewWithOptions] with zero [BusOptions].
48+
func New() *Bus { return NewWithOptions(BusOptions{}) }
49+
50+
// NewWithOptions returns a new [Bus] with the specified [BusOptions].
51+
// Use [Bus.Client] to construct clients on the bus.
52+
// Use [Publish] to make event publishers.
53+
// Use [Subscribe] and [SubscribeFunc] to make event subscribers.
54+
func NewWithOptions(opts BusOptions) *Bus {
4655
ret := &Bus{
4756
write: make(chan PublishedEvent),
4857
snapshot: make(chan chan []PublishedEvent),
4958
topics: map[reflect.Type][]*subscribeState{},
5059
clients: set.Set[*Client]{},
60+
logf: opts.logger(),
5161
}
5262
ret.router = runWorker(ret.pump)
5363
return ret
5464
}
5565

66+
// BusOptions are optional parameters for a [Bus]. A zero value is ready for
67+
// use and provides defaults as described.
68+
type BusOptions struct {
69+
// Logf, if non-nil, is used for debug logs emitted by the bus and clients,
70+
// publishers, and subscribers under its care. If it is nil, logs are sent
71+
// to [log.Printf].
72+
Logf logger.Logf
73+
}
74+
75+
func (o BusOptions) logger() logger.Logf {
76+
if o.Logf == nil {
77+
return log.Printf
78+
}
79+
return o.Logf
80+
}
81+
5682
// Client returns a new client with no subscriptions. Use [Subscribe]
5783
// to receive events, and [Publish] to emit events.
5884
//
@@ -166,6 +192,9 @@ func (b *Bus) pump(ctx context.Context) {
166192
}
167193
}
168194

195+
// logger returns a [logger.Logf] to which logs related to bus activity should be written.
196+
func (b *Bus) logger() logger.Logf { return b.logf }
197+
169198
func (b *Bus) dest(t reflect.Type) []*subscribeState {
170199
b.topicsMu.Lock()
171200
defer b.topicsMu.Unlock()

util/eventbus/bus_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
package eventbus_test
55

66
import (
7+
"bytes"
78
"errors"
89
"fmt"
10+
"log"
11+
"regexp"
912
"testing"
1013
"testing/synctest"
1114
"time"
@@ -436,6 +439,76 @@ func TestMonitor(t *testing.T) {
436439
t.Run("Wait", testMon(t, func(c *eventbus.Client, m eventbus.Monitor) { c.Close(); m.Wait() }))
437440
}
438441

442+
func TestSlowSubs(t *testing.T) {
443+
swapLogBuf := func(t *testing.T) *bytes.Buffer {
444+
logBuf := new(bytes.Buffer)
445+
save := log.Writer()
446+
log.SetOutput(logBuf)
447+
t.Cleanup(func() { log.SetOutput(save) })
448+
return logBuf
449+
}
450+
451+
t.Run("Subscriber", func(t *testing.T) {
452+
synctest.Test(t, func(t *testing.T) {
453+
buf := swapLogBuf(t)
454+
455+
b := eventbus.New()
456+
defer b.Close()
457+
458+
pc := b.Client("pub")
459+
p := eventbus.Publish[EventA](pc)
460+
461+
sc := b.Client("sub")
462+
s := eventbus.Subscribe[EventA](sc)
463+
464+
go func() {
465+
time.Sleep(6 * time.Second) // trigger the slow check at 5s.
466+
t.Logf("Subscriber accepted %v", <-s.Events())
467+
}()
468+
469+
p.Publish(EventA{12345})
470+
471+
time.Sleep(7 * time.Second) // advance time...
472+
synctest.Wait() // subscriber is done
473+
474+
want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` +
475+
`subscriber for eventbus_test.EventA is slow.*`)
476+
if got := buf.String(); !want.MatchString(got) {
477+
t.Errorf("Wrong log output\ngot: %q\nwant: %s", got, want)
478+
}
479+
})
480+
})
481+
482+
t.Run("SubscriberFunc", func(t *testing.T) {
483+
synctest.Test(t, func(t *testing.T) {
484+
buf := swapLogBuf(t)
485+
486+
b := eventbus.New()
487+
defer b.Close()
488+
489+
pc := b.Client("pub")
490+
p := eventbus.Publish[EventB](pc)
491+
492+
sc := b.Client("sub")
493+
eventbus.SubscribeFunc[EventB](sc, func(e EventB) {
494+
time.Sleep(6 * time.Second) // trigger the slow check at 5s.
495+
t.Logf("SubscriberFunc processed %v", e)
496+
})
497+
498+
p.Publish(EventB{67890})
499+
500+
time.Sleep(7 * time.Second) // advance time...
501+
synctest.Wait() // subscriber is done
502+
503+
want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` +
504+
`subscriber for eventbus_test.EventB is slow.*`)
505+
if got := buf.String(); !want.MatchString(got) {
506+
t.Errorf("Wrong log output\ngot: %q\nwant: %s", got, want)
507+
}
508+
})
509+
})
510+
}
511+
439512
func TestRegression(t *testing.T) {
440513
bus := eventbus.New()
441514
t.Cleanup(bus.Close)

util/eventbus/client.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"reflect"
88
"sync"
99

10+
"tailscale.com/types/logger"
1011
"tailscale.com/util/set"
1112
)
1213

@@ -29,6 +30,8 @@ type Client struct {
2930

3031
func (c *Client) Name() string { return c.name }
3132

33+
func (c *Client) logger() logger.Logf { return c.bus.logger() }
34+
3235
// Close closes the client. It implicitly closes all publishers and
3336
// subscribers obtained from this client.
3437
func (c *Client) Close() {
@@ -142,7 +145,7 @@ func Subscribe[T any](c *Client) *Subscriber[T] {
142145
}
143146

144147
r := c.subscribeStateLocked()
145-
s := newSubscriber[T](r)
148+
s := newSubscriber[T](r, logfForCaller(c.logger()))
146149
r.addSubscriber(s)
147150
return s
148151
}
@@ -165,7 +168,7 @@ func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] {
165168
}
166169

167170
r := c.subscribeStateLocked()
168-
s := newSubscriberFunc[T](r, f)
171+
s := newSubscriberFunc[T](r, f, logfForCaller(c.logger()))
169172
r.addSubscriber(s)
170173
return s
171174
}

util/eventbus/debug.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,22 @@ package eventbus
66
import (
77
"cmp"
88
"fmt"
9+
"path/filepath"
910
"reflect"
11+
"runtime"
1012
"slices"
13+
"strings"
1114
"sync"
1215
"sync/atomic"
16+
"time"
17+
18+
"tailscale.com/types/logger"
1319
)
1420

21+
// slowSubscriberTimeout is a timeout after which a subscriber that does not
22+
// accept a pending event will be flagged as being slow.
23+
const slowSubscriberTimeout = 5 * time.Second
24+
1525
// A Debugger offers access to a bus's privileged introspection and
1626
// debugging facilities.
1727
//
@@ -204,3 +214,29 @@ type DebugTopic struct {
204214
Publisher string
205215
Subscribers []string
206216
}
217+
218+
// logfForCaller returns a [logger.Logf] that prefixes its output with the
219+
// package, filename, and line number of the caller's caller.
220+
// If logf == nil, it returns [logger.Discard].
221+
// If the caller location could not be determined, it returns logf unmodified.
222+
func logfForCaller(logf logger.Logf) logger.Logf {
223+
if logf == nil {
224+
return logger.Discard
225+
}
226+
pc, fpath, line, _ := runtime.Caller(2) // +1 for my caller, +1 for theirs
227+
if f := runtime.FuncForPC(pc); f != nil {
228+
return logger.WithPrefix(logf, fmt.Sprintf("%s %s:%d: ", funcPackageName(f.Name()), filepath.Base(fpath), line))
229+
}
230+
return logf
231+
}
232+
233+
func funcPackageName(funcName string) string {
234+
ls := max(strings.LastIndex(funcName, "/"), 0)
235+
for {
236+
i := strings.LastIndex(funcName, ".")
237+
if i <= ls {
238+
return funcName
239+
}
240+
funcName = funcName[:i]
241+
}
242+
}

util/eventbus/subscribe.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
"fmt"
99
"reflect"
1010
"sync"
11+
"time"
12+
13+
"tailscale.com/types/logger"
1114
)
1215

1316
type DeliveredEvent struct {
@@ -182,12 +185,18 @@ type Subscriber[T any] struct {
182185
stop stopFlag
183186
read chan T
184187
unregister func()
188+
logf logger.Logf
189+
slow *time.Timer // used to detect slow subscriber service
185190
}
186191

187-
func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
192+
func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] {
193+
slow := time.NewTimer(0)
194+
slow.Stop() // reset in dispatch
188195
return &Subscriber[T]{
189196
read: make(chan T),
190197
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
198+
logf: logf,
199+
slow: slow,
191200
}
192201
}
193202

@@ -212,6 +221,11 @@ func (s *Subscriber[T]) monitor(debugEvent T) {
212221

213222
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
214223
t := vals.Peek().Event.(T)
224+
225+
start := time.Now()
226+
s.slow.Reset(slowSubscriberTimeout)
227+
defer s.slow.Stop()
228+
215229
for {
216230
// Keep the cases in this select in sync with subscribeState.pump
217231
// above. The only difference should be that this select
@@ -226,6 +240,9 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent
226240
return false
227241
case ch := <-snapshot:
228242
ch <- vals.Snapshot()
243+
case <-s.slow.C:
244+
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
245+
s.slow.Reset(slowSubscriberTimeout)
229246
}
230247
}
231248
}
@@ -260,12 +277,18 @@ type SubscriberFunc[T any] struct {
260277
stop stopFlag
261278
read func(T)
262279
unregister func()
280+
logf logger.Logf
281+
slow *time.Timer // used to detect slow subscriber service
263282
}
264283

265-
func newSubscriberFunc[T any](r *subscribeState, f func(T)) *SubscriberFunc[T] {
284+
func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
285+
slow := time.NewTimer(0)
286+
slow.Stop() // reset in dispatch
266287
return &SubscriberFunc[T]{
267288
read: f,
268289
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
290+
logf: logf,
291+
slow: slow,
269292
}
270293
}
271294

@@ -285,6 +308,11 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE
285308
t := vals.Peek().Event.(T)
286309
callDone := make(chan struct{})
287310
go s.runCallback(t, callDone)
311+
312+
start := time.Now()
313+
s.slow.Reset(slowSubscriberTimeout)
314+
defer s.slow.Stop()
315+
288316
// Keep the cases in this select in sync with subscribeState.pump
289317
// above. The only difference should be that this select
290318
// delivers a value by calling s.read.
@@ -299,6 +327,9 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE
299327
return false
300328
case ch := <-snapshot:
301329
ch <- vals.Snapshot()
330+
case <-s.slow.C:
331+
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
332+
s.slow.Reset(slowSubscriberTimeout)
302333
}
303334
}
304335
}

0 commit comments

Comments
 (0)