Skip to content

Commit 12cad85

Browse files
authored
[client] Fix/ice handshake (#4281)
In this PR, speed up the GRPC message processing, force the recreation of the ICE agent when getting a new, remote offer (do not wait for local STUN timeout).
1 parent 6a3846a commit 12cad85

File tree

13 files changed

+544
-338
lines changed

13 files changed

+544
-338
lines changed

client/internal/engine.go

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ import (
5555
nbssh "github.com/netbirdio/netbird/client/ssh"
5656
"github.com/netbirdio/netbird/client/system"
5757
nbdns "github.com/netbirdio/netbird/dns"
58+
"github.com/netbirdio/netbird/route"
5859
mgm "github.com/netbirdio/netbird/shared/management/client"
5960
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
6061
auth "github.com/netbirdio/netbird/shared/relay/auth/hmac"
6162
relayClient "github.com/netbirdio/netbird/shared/relay/client"
62-
"github.com/netbirdio/netbird/route"
6363
signal "github.com/netbirdio/netbird/shared/signal/client"
6464
sProto "github.com/netbirdio/netbird/shared/signal/proto"
6565
"github.com/netbirdio/netbird/util"
@@ -254,6 +254,7 @@ func NewEngine(
254254
}
255255
engine.stateManager = statemanager.New(path)
256256

257+
log.Infof("I am: %s", config.WgPrivateKey.PublicKey().String())
257258
return engine
258259
}
259260

@@ -1330,52 +1331,17 @@ func (e *Engine) receiveSignalEvents() {
13301331
}
13311332

13321333
switch msg.GetBody().Type {
1333-
case sProto.Body_OFFER:
1334-
remoteCred, err := signal.UnMarshalCredential(msg)
1335-
if err != nil {
1336-
return err
1337-
}
1338-
1339-
var rosenpassPubKey []byte
1340-
rosenpassAddr := ""
1341-
if msg.GetBody().GetRosenpassConfig() != nil {
1342-
rosenpassPubKey = msg.GetBody().GetRosenpassConfig().GetRosenpassPubKey()
1343-
rosenpassAddr = msg.GetBody().GetRosenpassConfig().GetRosenpassServerAddr()
1344-
}
1345-
conn.OnRemoteOffer(peer.OfferAnswer{
1346-
IceCredentials: peer.IceCredentials{
1347-
UFrag: remoteCred.UFrag,
1348-
Pwd: remoteCred.Pwd,
1349-
},
1350-
WgListenPort: int(msg.GetBody().GetWgListenPort()),
1351-
Version: msg.GetBody().GetNetBirdVersion(),
1352-
RosenpassPubKey: rosenpassPubKey,
1353-
RosenpassAddr: rosenpassAddr,
1354-
RelaySrvAddress: msg.GetBody().GetRelayServerAddress(),
1355-
})
1356-
case sProto.Body_ANSWER:
1357-
remoteCred, err := signal.UnMarshalCredential(msg)
1334+
case sProto.Body_OFFER, sProto.Body_ANSWER:
1335+
offerAnswer, err := convertToOfferAnswer(msg)
13581336
if err != nil {
13591337
return err
13601338
}
13611339

1362-
var rosenpassPubKey []byte
1363-
rosenpassAddr := ""
1364-
if msg.GetBody().GetRosenpassConfig() != nil {
1365-
rosenpassPubKey = msg.GetBody().GetRosenpassConfig().GetRosenpassPubKey()
1366-
rosenpassAddr = msg.GetBody().GetRosenpassConfig().GetRosenpassServerAddr()
1340+
if msg.Body.Type == sProto.Body_OFFER {
1341+
conn.OnRemoteOffer(*offerAnswer)
1342+
} else {
1343+
conn.OnRemoteAnswer(*offerAnswer)
13671344
}
1368-
conn.OnRemoteAnswer(peer.OfferAnswer{
1369-
IceCredentials: peer.IceCredentials{
1370-
UFrag: remoteCred.UFrag,
1371-
Pwd: remoteCred.Pwd,
1372-
},
1373-
WgListenPort: int(msg.GetBody().GetWgListenPort()),
1374-
Version: msg.GetBody().GetNetBirdVersion(),
1375-
RosenpassPubKey: rosenpassPubKey,
1376-
RosenpassAddr: rosenpassAddr,
1377-
RelaySrvAddress: msg.GetBody().GetRelayServerAddress(),
1378-
})
13791345
case sProto.Body_CANDIDATE:
13801346
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
13811347
if err != nil {
@@ -2073,3 +2039,44 @@ func createFile(path string) error {
20732039
}
20742040
return file.Close()
20752041
}
2042+
2043+
func convertToOfferAnswer(msg *sProto.Message) (*peer.OfferAnswer, error) {
2044+
remoteCred, err := signal.UnMarshalCredential(msg)
2045+
if err != nil {
2046+
return nil, err
2047+
}
2048+
2049+
var (
2050+
rosenpassPubKey []byte
2051+
rosenpassAddr string
2052+
)
2053+
if cfg := msg.GetBody().GetRosenpassConfig(); cfg != nil {
2054+
rosenpassPubKey = cfg.GetRosenpassPubKey()
2055+
rosenpassAddr = cfg.GetRosenpassServerAddr()
2056+
}
2057+
2058+
// Handle optional SessionID
2059+
var sessionID *peer.ICESessionID
2060+
if sessionBytes := msg.GetBody().GetSessionId(); sessionBytes != nil {
2061+
if id, err := peer.ICESessionIDFromBytes(sessionBytes); err != nil {
2062+
log.Warnf("Invalid session ID in message: %v", err)
2063+
sessionID = nil // Set to nil if conversion fails
2064+
} else {
2065+
sessionID = &id
2066+
}
2067+
}
2068+
2069+
offerAnswer := peer.OfferAnswer{
2070+
IceCredentials: peer.IceCredentials{
2071+
UFrag: remoteCred.UFrag,
2072+
Pwd: remoteCred.Pwd,
2073+
},
2074+
WgListenPort: int(msg.GetBody().GetWgListenPort()),
2075+
Version: msg.GetBody().GetNetBirdVersion(),
2076+
RosenpassPubKey: rosenpassPubKey,
2077+
RosenpassAddr: rosenpassAddr,
2078+
RelaySrvAddress: msg.GetBody().GetRelayServerAddress(),
2079+
SessionID: sessionID,
2080+
}
2081+
return &offerAnswer, nil
2082+
}

client/internal/peer/conn.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import (
2424
"github.com/netbirdio/netbird/client/internal/peer/id"
2525
"github.com/netbirdio/netbird/client/internal/peer/worker"
2626
"github.com/netbirdio/netbird/client/internal/stdnet"
27-
relayClient "github.com/netbirdio/netbird/shared/relay/client"
2827
"github.com/netbirdio/netbird/route"
28+
relayClient "github.com/netbirdio/netbird/shared/relay/client"
2929
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
3030
)
3131

@@ -200,19 +200,11 @@ func (conn *Conn) Open(engineCtx context.Context) error {
200200
conn.wg.Add(1)
201201
go func() {
202202
defer conn.wg.Done()
203+
203204
conn.waitInitialRandomSleepTime(conn.ctx)
204205
conn.semaphore.Done(conn.ctx)
205206

206-
conn.dumpState.SendOffer()
207-
if err := conn.handshaker.sendOffer(); err != nil {
208-
conn.Log.Errorf("failed to send initial offer: %v", err)
209-
}
210-
211-
conn.wg.Add(1)
212-
go func() {
213-
conn.guard.Start(conn.ctx, conn.onGuardEvent)
214-
conn.wg.Done()
215-
}()
207+
conn.guard.Start(conn.ctx, conn.onGuardEvent)
216208
}()
217209
conn.opened = true
218210
return nil
@@ -274,10 +266,10 @@ func (conn *Conn) Close(signalToRemote bool) {
274266

275267
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
276268
// doesn't block, discards the message if connection wasn't ready
277-
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool {
269+
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) {
278270
conn.dumpState.RemoteAnswer()
279271
conn.Log.Infof("OnRemoteAnswer, priority: %s, status ICE: %s, status relay: %s", conn.currentConnPriority, conn.statusICE, conn.statusRelay)
280-
return conn.handshaker.OnRemoteAnswer(answer)
272+
conn.handshaker.OnRemoteAnswer(answer)
281273
}
282274

283275
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
@@ -296,10 +288,10 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) {
296288
conn.onDisconnected = handler
297289
}
298290

299-
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool {
291+
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) {
300292
conn.dumpState.RemoteOffer()
301293
conn.Log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
302-
return conn.handshaker.OnRemoteOffer(offer)
294+
conn.handshaker.OnRemoteOffer(offer)
303295
}
304296

305297
// WgConfig returns the WireGuard config
@@ -548,7 +540,6 @@ func (conn *Conn) onRelayDisconnected() {
548540
}
549541

550542
func (conn *Conn) onGuardEvent() {
551-
conn.Log.Debugf("send offer to peer")
552543
conn.dumpState.SendOffer()
553544
if err := conn.handshaker.SendOffer(); err != nil {
554545
conn.Log.Errorf("failed to send offer: %v", err)
@@ -672,7 +663,7 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
672663
}
673664
}()
674665

675-
if conn.statusICE.Get() == worker.StatusDisconnected {
666+
if conn.statusICE.Get() == worker.StatusDisconnected && !conn.workerICE.InProgress() {
676667
return false
677668
}
678669

client/internal/peer/conn_test.go

Lines changed: 46 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package peer
22

33
import (
4+
"context"
45
"fmt"
56
"os"
6-
"sync"
77
"testing"
88
"time"
99

@@ -79,31 +79,30 @@ func TestConn_OnRemoteOffer(t *testing.T) {
7979
return
8080
}
8181

82-
wg := sync.WaitGroup{}
83-
wg.Add(2)
84-
go func() {
85-
<-conn.handshaker.remoteOffersCh
86-
wg.Done()
87-
}()
88-
89-
go func() {
90-
for {
91-
accepted := conn.OnRemoteOffer(OfferAnswer{
92-
IceCredentials: IceCredentials{
93-
UFrag: "test",
94-
Pwd: "test",
95-
},
96-
WgListenPort: 0,
97-
Version: "",
98-
})
99-
if accepted {
100-
wg.Done()
101-
return
102-
}
103-
}
104-
}()
82+
onNewOffeChan := make(chan struct{})
83+
84+
conn.handshaker.AddOnNewOfferListener(func(remoteOfferAnswer *OfferAnswer) {
85+
onNewOffeChan <- struct{}{}
86+
})
87+
88+
conn.OnRemoteOffer(OfferAnswer{
89+
IceCredentials: IceCredentials{
90+
UFrag: "test",
91+
Pwd: "test",
92+
},
93+
WgListenPort: 0,
94+
Version: "",
95+
})
96+
97+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
98+
defer cancel()
10599

106-
wg.Wait()
100+
select {
101+
case <-onNewOffeChan:
102+
// success
103+
case <-ctx.Done():
104+
t.Error("expected to receive a new offer notification, but timed out")
105+
}
107106
}
108107

109108
func TestConn_OnRemoteAnswer(t *testing.T) {
@@ -119,31 +118,29 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
119118
return
120119
}
121120

122-
wg := sync.WaitGroup{}
123-
wg.Add(2)
124-
go func() {
125-
<-conn.handshaker.remoteAnswerCh
126-
wg.Done()
127-
}()
128-
129-
go func() {
130-
for {
131-
accepted := conn.OnRemoteAnswer(OfferAnswer{
132-
IceCredentials: IceCredentials{
133-
UFrag: "test",
134-
Pwd: "test",
135-
},
136-
WgListenPort: 0,
137-
Version: "",
138-
})
139-
if accepted {
140-
wg.Done()
141-
return
142-
}
143-
}
144-
}()
121+
onNewOffeChan := make(chan struct{})
122+
123+
conn.handshaker.AddOnNewOfferListener(func(remoteOfferAnswer *OfferAnswer) {
124+
onNewOffeChan <- struct{}{}
125+
})
145126

146-
wg.Wait()
127+
conn.OnRemoteAnswer(OfferAnswer{
128+
IceCredentials: IceCredentials{
129+
UFrag: "test",
130+
Pwd: "test",
131+
},
132+
WgListenPort: 0,
133+
Version: "",
134+
})
135+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
136+
defer cancel()
137+
138+
select {
139+
case <-onNewOffeChan:
140+
// success
141+
case <-ctx.Done():
142+
t.Error("expected to receive a new offer notification, but timed out")
143+
}
147144
}
148145

149146
func TestConn_presharedKey(t *testing.T) {

0 commit comments

Comments
 (0)