Skip to content

Commit 4346615

Browse files
authored
logtail: avoid racing eventbus subscriptions with Shutdown (tailscale#17639)
When the eventbus is enabled, set up the subscription for change deltas at the beginning when the client is created, rather than waiting for the first awaitInternetUp check. Otherwise, it is possible for a check to race with the client close in Shutdown, which triggers a panic. Updates tailscale#17638 Change-Id: I461c07939eca46699072b14b1814ecf28eec750c Signed-off-by: M. J. Fromberger <[email protected]>
1 parent fd0e541 commit 4346615

File tree

1 file changed

+19
-20
lines changed

1 file changed

+19
-20
lines changed

logtail/logtail.go

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
124124

125125
if cfg.Bus != nil {
126126
l.eventClient = cfg.Bus.Client("logtail.Logger")
127+
l.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](l.eventClient)
127128
}
128129
l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
129130
l.compressLogs = cfg.CompressLogs
@@ -162,6 +163,7 @@ type Logger struct {
162163
httpDoCalls atomic.Int32
163164
sockstatsLabel atomicSocktatsLabel
164165
eventClient *eventbus.Client
166+
changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta]
165167

166168
procID uint32
167169
includeProcSequence bool
@@ -427,8 +429,23 @@ func (l *Logger) internetUp() bool {
427429

428430
func (l *Logger) awaitInternetUp(ctx context.Context) {
429431
if l.eventClient != nil {
430-
l.awaitInternetUpBus(ctx)
431-
return
432+
for {
433+
if l.internetUp() {
434+
return
435+
}
436+
select {
437+
case <-ctx.Done():
438+
return // give up
439+
case <-l.changeDeltaSub.Done():
440+
return // give up (closing down)
441+
case delta := <-l.changeDeltaSub.Events():
442+
if delta.New.AnyInterfaceUp() || l.internetUp() {
443+
fmt.Fprintf(l.stderr, "logtail: internet back up\n")
444+
return
445+
}
446+
fmt.Fprintf(l.stderr, "logtail: network changed, but is not up")
447+
}
448+
}
432449
}
433450
upc := make(chan bool, 1)
434451
defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
@@ -449,24 +466,6 @@ func (l *Logger) awaitInternetUp(ctx context.Context) {
449466
}
450467
}
451468

452-
func (l *Logger) awaitInternetUpBus(ctx context.Context) {
453-
if l.internetUp() {
454-
return
455-
}
456-
sub := eventbus.Subscribe[netmon.ChangeDelta](l.eventClient)
457-
defer sub.Close()
458-
select {
459-
case delta := <-sub.Events():
460-
if delta.New.AnyInterfaceUp() {
461-
fmt.Fprintf(l.stderr, "logtail: internet back up\n")
462-
return
463-
}
464-
fmt.Fprintf(l.stderr, "logtail: network changed, but is not up")
465-
case <-ctx.Done():
466-
return
467-
}
468-
}
469-
470469
// upload uploads body to the log server.
471470
// origlen indicates the pre-compression body length.
472471
// origlen of -1 indicates that the body is not compressed.

0 commit comments

Comments
 (0)