Skip to content

Commit ef1aca7

Browse files
committed
Merge tag 'v1.90.9' into sunos-1.90
Release 1.90.9
2 parents 77dd234 + 66826a4 commit ef1aca7

File tree

5 files changed

+122
-9
lines changed

5 files changed

+122
-9
lines changed

VERSION.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.90.8
1+
1.90.9

util/eventbus/bus.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,14 @@ func (b *Bus) Close() {
120120
}
121121

122122
func (b *Bus) pump(ctx context.Context) {
123-
var vals queue[PublishedEvent]
123+
// Limit how many published events we can buffer in the PublishedEvent queue.
124+
//
125+
// Subscribers have unbounded DeliveredEvent queues (see tailscale/tailscale#18020),
126+
// so this queue doesn't need to be unbounded. Keeping it bounded may also help
127+
// catch cases where subscribers stop pumping events completely, such as due to a bug
128+
// in [subscribeState.pump], [Subscriber.dispatch], or [SubscriberFunc.dispatch]).
129+
const maxPublishedEvents = 16
130+
vals := queue[PublishedEvent]{capacity: maxPublishedEvents}
124131
acceptCh := func() chan PublishedEvent {
125132
if vals.Full() {
126133
return nil

util/eventbus/bus_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"log"
1111
"regexp"
12+
"sync"
1213
"testing"
1314
"testing/synctest"
1415
"time"
@@ -546,6 +547,105 @@ func TestRegression(t *testing.T) {
546547
})
547548
}
548549

550+
func TestPublishWithMutex(t *testing.T) {
551+
testPublishWithMutex(t, 1024) // arbitrary large number of events
552+
}
553+
554+
// testPublishWithMutex publishes the specified number of events,
555+
// acquiring and releasing a mutex around each publish and each
556+
// subscriber event receive.
557+
//
558+
// The test fails if it loses any events or times out due to a deadlock.
559+
// Unfortunately, a goroutine waiting on a mutex held by a durably blocked
560+
// goroutine is not itself considered durably blocked, so [synctest] cannot
561+
// detect this deadlock on its own.
562+
func testPublishWithMutex(t *testing.T, n int) {
563+
synctest.Test(t, func(t *testing.T) {
564+
b := eventbus.New()
565+
defer b.Close()
566+
567+
c := b.Client("TestClient")
568+
569+
evts := make([]any, n)
570+
for i := range evts {
571+
evts[i] = EventA{Counter: i}
572+
}
573+
exp := expectEvents(t, evts...)
574+
575+
var mu sync.Mutex
576+
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
577+
// Acquire the same mutex as the publisher.
578+
mu.Lock()
579+
mu.Unlock()
580+
581+
// Mark event as received, so we can check for lost events.
582+
exp.Got(e)
583+
})
584+
585+
p := eventbus.Publish[EventA](c)
586+
go func() {
587+
// Publish events, acquiring the mutex around each publish.
588+
for i := range n {
589+
mu.Lock()
590+
p.Publish(EventA{Counter: i})
591+
mu.Unlock()
592+
}
593+
}()
594+
595+
synctest.Wait()
596+
597+
if !exp.Empty() {
598+
t.Errorf("unexpected extra events: %+v", exp.want)
599+
}
600+
})
601+
}
602+
603+
func TestPublishFromSubscriber(t *testing.T) {
604+
testPublishFromSubscriber(t, 1024) // arbitrary large number of events
605+
}
606+
607+
// testPublishFromSubscriber publishes the specified number of EventA events.
608+
// Each EventA causes the subscriber to publish an EventB.
609+
// The test fails if it loses any events or if a deadlock occurs.
610+
func testPublishFromSubscriber(t *testing.T, n int) {
611+
synctest.Test(t, func(t *testing.T) {
612+
b := eventbus.New()
613+
defer b.Close()
614+
615+
c := b.Client("TestClient")
616+
617+
// Ultimately we expect to receive n EventB events
618+
// published as a result of receiving n EventA events.
619+
evts := make([]any, n)
620+
for i := range evts {
621+
evts[i] = EventB{Counter: i}
622+
}
623+
exp := expectEvents(t, evts...)
624+
625+
pubA := eventbus.Publish[EventA](c)
626+
pubB := eventbus.Publish[EventB](c)
627+
628+
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
629+
// Upon receiving EventA, publish EventB.
630+
pubB.Publish(EventB{Counter: e.Counter})
631+
})
632+
eventbus.SubscribeFunc[EventB](c, func(e EventB) {
633+
// Mark EventB as received.
634+
exp.Got(e)
635+
})
636+
637+
for i := range n {
638+
pubA.Publish(EventA{Counter: i})
639+
}
640+
641+
synctest.Wait()
642+
643+
if !exp.Empty() {
644+
t.Errorf("unexpected extra events: %+v", exp.want)
645+
}
646+
})
647+
}
648+
549649
type queueChecker struct {
550650
t *testing.T
551651
want []any

util/eventbus/queue.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@ import (
77
"slices"
88
)
99

10-
const maxQueuedItems = 16
11-
12-
// queue is an ordered queue of length up to maxQueuedItems.
10+
// queue is an ordered queue of length up to capacity,
11+
// if capacity is non-zero. Otherwise it is unbounded.
1312
type queue[T any] struct {
14-
vals []T
15-
start int
13+
vals []T
14+
start int
15+
capacity int // zero means unbounded
1616
}
1717

1818
// canAppend reports whether a value can be appended to q.vals without
1919
// shifting values around.
2020
func (q *queue[T]) canAppend() bool {
21-
return cap(q.vals) < maxQueuedItems || len(q.vals) < cap(q.vals)
21+
return q.capacity == 0 || cap(q.vals) < q.capacity || len(q.vals) < cap(q.vals)
2222
}
2323

2424
func (q *queue[T]) Full() bool {

wgengine/userspace.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"tailscale.com/util/checkchange"
5151
"tailscale.com/util/clientmetric"
5252
"tailscale.com/util/eventbus"
53+
"tailscale.com/util/execqueue"
5354
"tailscale.com/util/mak"
5455
"tailscale.com/util/set"
5556
"tailscale.com/util/testenv"
@@ -97,6 +98,8 @@ type userspaceEngine struct {
9798
eventBus *eventbus.Bus
9899
eventClient *eventbus.Client
99100

101+
linkChangeQueue execqueue.ExecQueue
102+
100103
logf logger.Logf
101104
wgLogger *wglog.Logger // a wireguard-go logging wrapper
102105
reqCh chan struct{}
@@ -543,7 +546,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
543546
if f, ok := feature.HookProxyInvalidateCache.GetOk(); ok {
544547
f()
545548
}
546-
e.linkChange(&cd)
549+
e.linkChangeQueue.Add(func() { e.linkChange(&cd) })
547550
})
548551
e.eventClient = ec
549552
e.logf("Engine created.")
@@ -1258,6 +1261,9 @@ func (e *userspaceEngine) RequestStatus() {
12581261

12591262
func (e *userspaceEngine) Close() {
12601263
e.eventClient.Close()
1264+
// TODO(cmol): Should we wait for it too?
1265+
// Same question raised in appconnector.go.
1266+
e.linkChangeQueue.Shutdown()
12611267
e.mu.Lock()
12621268
if e.closing {
12631269
e.mu.Unlock()

0 commit comments

Comments
 (0)