Skip to content

Commit 786ca6f

Browse files
authored
Do not block Offer processing from relay worker (#4435)
- do not miss ICE offers when relay worker busy - close p2p connection before recreate agent
1 parent dfebdf1 commit 786ca6f

File tree

4 files changed

+110
-13
lines changed

4 files changed

+110
-13
lines changed

client/internal/peer/handshaker.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,21 +43,14 @@ type OfferAnswer struct {
4343
SessionID *ICESessionID
4444
}
4545

46-
func (oa *OfferAnswer) SessionIDString() string {
47-
if oa.SessionID == nil {
48-
return "unknown"
49-
}
50-
return oa.SessionID.String()
51-
}
52-
5346
type Handshaker struct {
5447
mu sync.Mutex
5548
log *log.Entry
5649
config ConnConfig
5750
signaler *Signaler
5851
ice *WorkerICE
5952
relay *WorkerRelay
60-
onNewOfferListeners []func(*OfferAnswer)
53+
onNewOfferListeners []*OfferListener
6154

6255
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
6356
remoteOffersCh chan OfferAnswer
@@ -78,7 +71,8 @@ func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *W
7871
}
7972

8073
func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) {
81-
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
74+
l := NewOfferListener(offer)
75+
h.onNewOfferListeners = append(h.onNewOfferListeners, l)
8276
}
8377

8478
func (h *Handshaker) Listen(ctx context.Context) {
@@ -91,13 +85,13 @@ func (h *Handshaker) Listen(ctx context.Context) {
9185
continue
9286
}
9387
for _, listener := range h.onNewOfferListeners {
94-
listener(&remoteOfferAnswer)
88+
listener.Notify(&remoteOfferAnswer)
9589
}
9690
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
9791
case remoteOfferAnswer := <-h.remoteAnswerCh:
9892
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
9993
for _, listener := range h.onNewOfferListeners {
100-
listener(&remoteOfferAnswer)
94+
listener.Notify(&remoteOfferAnswer)
10195
}
10296
case <-ctx.Done():
10397
h.log.Infof("stop listening for remote offers and answers")
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package peer
2+
3+
import (
4+
"sync"
5+
)
6+
7+
type callbackFunc func(remoteOfferAnswer *OfferAnswer)
8+
9+
func (oa *OfferAnswer) SessionIDString() string {
10+
if oa.SessionID == nil {
11+
return "unknown"
12+
}
13+
return oa.SessionID.String()
14+
}
15+
16+
type OfferListener struct {
17+
fn callbackFunc
18+
running bool
19+
latest *OfferAnswer
20+
mu sync.Mutex
21+
}
22+
23+
func NewOfferListener(fn callbackFunc) *OfferListener {
24+
return &OfferListener{
25+
fn: fn,
26+
}
27+
}
28+
29+
func (o *OfferListener) Notify(remoteOfferAnswer *OfferAnswer) {
30+
o.mu.Lock()
31+
defer o.mu.Unlock()
32+
33+
// Store the latest offer
34+
o.latest = remoteOfferAnswer
35+
36+
// If already running, the running goroutine will pick up this latest value
37+
if o.running {
38+
return
39+
}
40+
41+
// Start processing
42+
o.running = true
43+
44+
// Process in a goroutine to avoid blocking the caller
45+
go func(remoteOfferAnswer *OfferAnswer) {
46+
for {
47+
o.fn(remoteOfferAnswer)
48+
49+
o.mu.Lock()
50+
if o.latest == nil {
51+
// No more work to do
52+
o.running = false
53+
o.mu.Unlock()
54+
return
55+
}
56+
remoteOfferAnswer = o.latest
57+
// Clear the latest to mark it as being processed
58+
o.latest = nil
59+
o.mu.Unlock()
60+
}
61+
}(remoteOfferAnswer)
62+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package peer
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func Test_newOfferListener(t *testing.T) {
9+
dummyOfferAnswer := &OfferAnswer{}
10+
runChan := make(chan struct{}, 10)
11+
12+
longRunningFn := func(remoteOfferAnswer *OfferAnswer) {
13+
time.Sleep(1 * time.Second)
14+
runChan <- struct{}{}
15+
}
16+
17+
hl := NewOfferListener(longRunningFn)
18+
19+
hl.Notify(dummyOfferAnswer)
20+
hl.Notify(dummyOfferAnswer)
21+
hl.Notify(dummyOfferAnswer)
22+
23+
// Wait for exactly 2 callbacks
24+
for i := 0; i < 2; i++ {
25+
select {
26+
case <-runChan:
27+
case <-time.After(3 * time.Second):
28+
t.Fatal("Timeout waiting for callback")
29+
}
30+
}
31+
32+
// Verify no additional callbacks happen
33+
select {
34+
case <-runChan:
35+
t.Fatal("Unexpected additional callback")
36+
case <-time.After(100 * time.Millisecond):
37+
t.Log("Correctly received exactly 2 callbacks")
38+
}
39+
}

client/internal/peer/worker_ice.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
122122
w.log.Warnf("failed to close ICE agent: %s", err)
123123
}
124124
w.agent = nil
125-
// todo consider to switch to Relay connection while establishing a new ICE connection
126125
}
127126

128127
var preferredCandidateTypes []ice.CandidateType
@@ -410,7 +409,10 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia
410409
case ice.ConnectionStateConnected:
411410
w.lastKnownState = ice.ConnectionStateConnected
412411
return
413-
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
412+
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected, ice.ConnectionStateClosed:
413+
// ice.ConnectionStateClosed happens when we recreate the agent. For the P2P to TURN switch important to
414+
// notify the conn.onICEStateDisconnected changes to update the current used priority
415+
414416
if w.lastKnownState == ice.ConnectionStateConnected {
415417
w.lastKnownState = ice.ConnectionStateDisconnected
416418
w.conn.onICEStateDisconnected()

0 commit comments

Comments
 (0)