@@ -21,8 +21,10 @@ import (
2121 "tailscale.com/ipn/ipnext"
2222 "tailscale.com/ipn/localapi"
2323 "tailscale.com/net/udprelay"
24+ "tailscale.com/net/udprelay/endpoint"
2425 "tailscale.com/net/udprelay/status"
2526 "tailscale.com/tailcfg"
27+ "tailscale.com/types/key"
2628 "tailscale.com/types/logger"
2729 "tailscale.com/types/ptr"
2830 "tailscale.com/util/eventbus"
@@ -68,25 +70,41 @@ func servePeerRelayDebugSessions(h *localapi.Handler, w http.ResponseWriter, r *
6870// extension. It is registered with [ipnext.RegisterExtension] if the package is
6971// imported.
7072func newExtension (logf logger.Logf , sb ipnext.SafeBackend ) (ipnext.Extension , error ) {
71- return & extension {
73+ e := & extension {
74+ newServerFn : func (logf logger.Logf , port int , overrideAddrs []netip.Addr ) (relayServer , error ) {
75+ return udprelay .NewServer (logf , port , overrideAddrs )
76+ },
7277 logf : logger .WithPrefix (logf , featureName + ": " ),
73- bus : sb .Sys ().Bus .Get (),
74- }, nil
78+ }
79+ e .ec = sb .Sys ().Bus .Get ().Client ("relayserver.extension" )
80+ e .respPub = eventbus.Publish [magicsock.UDPRelayAllocResp ](e .ec )
81+ eventbus .SubscribeFunc (e .ec , e .onDERPMapView )
82+ eventbus .SubscribeFunc (e .ec , e .onAllocReq )
83+ return e , nil
84+ }
85+
86+ // relayServer is an interface for [udprelay.Server].
87+ type relayServer interface {
88+ Close () error
89+ AllocateEndpoint (discoA , discoB key.DiscoPublic ) (endpoint.ServerEndpoint , error )
90+ GetSessions () []status.ServerSession
91+ SetDERPMapView (tailcfg.DERPMapView )
7592}
7693
7794// extension is an [ipnext.Extension] managing the relay server on platforms
7895// that import this package.
7996type extension struct {
80- logf logger.Logf
81- bus * eventbus.Bus
97+ newServerFn func (logf logger.Logf , port int , overrideAddrs []netip.Addr ) (relayServer , error ) // swappable for tests
98+ logf logger.Logf
99+ ec * eventbus.Client
100+ respPub * eventbus.Publisher [magicsock.UDPRelayAllocResp ]
82101
83- mu sync.Mutex // guards the following fields
84- shutdown bool
85-
86- port * int // ipn.Prefs.RelayServerPort, nil if disabled
87- eventSubs * eventbus.Monitor // nil if not connected to eventbus
88- debugSessionsCh chan chan []status.ServerSession // non-nil if consumeEventbusTopics is running
89- hasNodeAttrDisableRelayServer bool // tailcfg.NodeAttrDisableRelayServer
102+ mu sync.Mutex // guards the following fields
103+ shutdown bool // true if Shutdown() has been called
104+ rs relayServer // nil when disabled
105+ port * int // ipn.Prefs.RelayServerPort, nil if disabled
106+ derpMapView tailcfg.DERPMapView // latest seen over the eventbus
107+ hasNodeAttrDisableRelayServer bool // [tailcfg.NodeAttrDisableRelayServer]
90108}
91109
92110// Name implements [ipnext.Extension].
@@ -104,26 +122,83 @@ func (e *extension) Init(host ipnext.Host) error {
104122 return nil
105123}
106124
107- // handleBusLifetimeLocked handles the lifetime of consumeEventbusTopics.
108- func (e * extension ) handleBusLifetimeLocked () {
109- busShouldBeRunning := ! e .shutdown && e .port != nil && ! e .hasNodeAttrDisableRelayServer
110- if ! busShouldBeRunning {
111- e .disconnectFromBusLocked ()
125+ func (e * extension ) onDERPMapView (view tailcfg.DERPMapView ) {
126+ e .mu .Lock ()
127+ defer e .mu .Unlock ()
128+ e .derpMapView = view
129+ if e .rs != nil {
130+ e .rs .SetDERPMapView (view )
131+ }
132+ }
133+
134+ func (e * extension ) onAllocReq (req magicsock.UDPRelayAllocReq ) {
135+ e .mu .Lock ()
136+ defer e .mu .Unlock ()
137+ if e .shutdown {
138+ return
139+ }
140+ if e .rs == nil {
141+ if ! e .relayServerShouldBeRunningLocked () {
142+ return
143+ }
144+ e .tryStartRelayServerLocked ()
145+ if e .rs == nil {
146+ return
147+ }
148+ }
149+ se , err := e .rs .AllocateEndpoint (req .Message .ClientDisco [0 ], req .Message .ClientDisco [1 ])
150+ if err != nil {
151+ e .logf ("error allocating endpoint: %v" , err )
152+ return
153+ }
154+ e .respPub .Publish (magicsock.UDPRelayAllocResp {
155+ ReqRxFromNodeKey : req .RxFromNodeKey ,
156+ ReqRxFromDiscoKey : req .RxFromDiscoKey ,
157+ Message : & disco.AllocateUDPRelayEndpointResponse {
158+ Generation : req .Message .Generation ,
159+ UDPRelayEndpoint : disco.UDPRelayEndpoint {
160+ ServerDisco : se .ServerDisco ,
161+ ClientDisco : se .ClientDisco ,
162+ LamportID : se .LamportID ,
163+ VNI : se .VNI ,
164+ BindLifetime : se .BindLifetime .Duration ,
165+ SteadyStateLifetime : se .SteadyStateLifetime .Duration ,
166+ AddrPorts : se .AddrPorts ,
167+ },
168+ },
169+ })
170+ }
171+
172+ func (e * extension ) tryStartRelayServerLocked () {
173+ rs , err := e .newServerFn (e .logf , * e .port , overrideAddrs ())
174+ if err != nil {
175+ e .logf ("error initializing server: %v" , err )
112176 return
113- } else if e .eventSubs != nil {
114- return // already running
115177 }
178+ e .rs = rs
179+ e .rs .SetDERPMapView (e .derpMapView )
180+ }
116181
117- ec := e .bus .Client ("relayserver.extension" )
118- e .debugSessionsCh = make (chan chan []status.ServerSession )
119- e .eventSubs = ptr .To (ec .Monitor (e .consumeEventbusTopics (ec , * e .port )))
182+ func (e * extension ) relayServerShouldBeRunningLocked () bool {
183+ return ! e .shutdown && e .port != nil && ! e .hasNodeAttrDisableRelayServer
184+ }
185+
186+ // handleRelayServerLifetimeLocked handles the lifetime of [e.rs].
187+ func (e * extension ) handleRelayServerLifetimeLocked () {
188+ if ! e .relayServerShouldBeRunningLocked () {
189+ e .stopRelayServerLocked ()
190+ return
191+ } else if e .rs != nil {
192+ return // already running
193+ }
194+ e .tryStartRelayServerLocked ()
120195}
121196
122197func (e * extension ) selfNodeViewChanged (nodeView tailcfg.NodeView ) {
123198 e .mu .Lock ()
124199 defer e .mu .Unlock ()
125200 e .hasNodeAttrDisableRelayServer = nodeView .HasCap (tailcfg .NodeAttrDisableRelayServer )
126- e .handleBusLifetimeLocked ()
201+ e .handleRelayServerLifetimeLocked ()
127202}
128203
129204func (e * extension ) profileStateChanged (_ ipn.LoginProfileView , prefs ipn.PrefsView , sameNode bool ) {
@@ -133,13 +208,13 @@ func (e *extension) profileStateChanged(_ ipn.LoginProfileView, prefs ipn.PrefsV
133208 enableOrDisableServer := ok != (e .port != nil )
134209 portChanged := ok && e .port != nil && newPort != * e .port
135210 if enableOrDisableServer || portChanged || ! sameNode {
136- e .disconnectFromBusLocked ()
211+ e .stopRelayServerLocked ()
137212 e .port = nil
138213 if ok {
139214 e .port = ptr .To (newPort )
140215 }
141216 }
142- e .handleBusLifetimeLocked ()
217+ e .handleRelayServerLifetimeLocked ()
143218}
144219
145220// overrideAddrs returns TS_DEBUG_RELAY_SERVER_ADDRS as []netip.Addr, if set. It
@@ -162,88 +237,23 @@ var overrideAddrs = sync.OnceValue(func() (ret []netip.Addr) {
162237 return
163238})
164239
165- // consumeEventbusTopics serves endpoint allocation requests over the eventbus.
166- // It also serves [relayServer] debug information on a channel.
167- // consumeEventbusTopics must never acquire [extension.mu], which can be held
168- // by other goroutines while waiting to receive on [extension.eventSubs] or the
169- // inner [extension.debugSessionsCh] channel.
170- func (e * extension ) consumeEventbusTopics (ec * eventbus.Client , port int ) func (* eventbus.Client ) {
171- reqSub := eventbus.Subscribe [magicsock.UDPRelayAllocReq ](ec )
172- respPub := eventbus.Publish [magicsock.UDPRelayAllocResp ](ec )
173- debugSessionsCh := e .debugSessionsCh
174-
175- return func (ec * eventbus.Client ) {
176- rs , err := udprelay .NewServer (e .logf , port , overrideAddrs ())
177- if err != nil {
178- e .logf ("error initializing server: %v" , err )
179- }
180-
181- defer func () {
182- if rs != nil {
183- rs .Close ()
184- }
185- }()
186- for {
187- select {
188- case <- ec .Done ():
189- return
190- case respCh := <- debugSessionsCh :
191- if rs == nil {
192- respCh <- nil
193- continue
194- }
195- sessions := rs .GetSessions ()
196- respCh <- sessions
197- case req := <- reqSub .Events ():
198- if rs == nil {
199- // The server may have previously failed to initialize if
200- // the configured port was in use, try again.
201- rs , err = udprelay .NewServer (e .logf , port , overrideAddrs ())
202- if err != nil {
203- e .logf ("error initializing server: %v" , err )
204- continue
205- }
206- }
207- se , err := rs .AllocateEndpoint (req .Message .ClientDisco [0 ], req .Message .ClientDisco [1 ])
208- if err != nil {
209- e .logf ("error allocating endpoint: %v" , err )
210- continue
211- }
212- respPub .Publish (magicsock.UDPRelayAllocResp {
213- ReqRxFromNodeKey : req .RxFromNodeKey ,
214- ReqRxFromDiscoKey : req .RxFromDiscoKey ,
215- Message : & disco.AllocateUDPRelayEndpointResponse {
216- Generation : req .Message .Generation ,
217- UDPRelayEndpoint : disco.UDPRelayEndpoint {
218- ServerDisco : se .ServerDisco ,
219- ClientDisco : se .ClientDisco ,
220- LamportID : se .LamportID ,
221- VNI : se .VNI ,
222- BindLifetime : se .BindLifetime .Duration ,
223- SteadyStateLifetime : se .SteadyStateLifetime .Duration ,
224- AddrPorts : se .AddrPorts ,
225- },
226- },
227- })
228- }
229- }
230- }
231- }
232-
233- func (e * extension ) disconnectFromBusLocked () {
234- if e .eventSubs != nil {
235- e .eventSubs .Close ()
236- e .eventSubs = nil
237- e .debugSessionsCh = nil
240+ func (e * extension ) stopRelayServerLocked () {
241+ if e .rs != nil {
242+ e .rs .Close ()
238243 }
244+ e .rs = nil
239245}
240246
241247// Shutdown implements [ipnlocal.Extension].
242248func (e * extension ) Shutdown () error {
249+ // [extension.mu] must not be held when closing the [eventbus.Client]. Close
250+ // blocks until all [eventbus.SubscribeFunc]'s have returned, and the ones
251+ // used in this package also acquire [extension.mu]. See #17894.
252+ e .ec .Close ()
243253 e .mu .Lock ()
244254 defer e .mu .Unlock ()
245- e .disconnectFromBusLocked ()
246255 e .shutdown = true
256+ e .stopRelayServerLocked ()
247257 return nil
248258}
249259
@@ -253,23 +263,14 @@ func (e *extension) Shutdown() error {
253263func (e * extension ) serverStatus () status.ServerStatus {
254264 e .mu .Lock ()
255265 defer e .mu .Unlock ()
256-
257266 st := status.ServerStatus {
258267 UDPPort : nil ,
259268 Sessions : nil ,
260269 }
261- if e .port == nil || e . eventSubs == nil {
270+ if e .rs == nil {
262271 return st
263272 }
264273 st .UDPPort = ptr .To (* e .port )
265-
266- ch := make (chan []status.ServerSession )
267- select {
268- case e .debugSessionsCh <- ch :
269- resp := <- ch
270- st .Sessions = resp
271- return st
272- case <- e .eventSubs .Done ():
273- return st
274- }
274+ st .Sessions = e .rs .GetSessions ()
275+ return st
275276}
0 commit comments