Skip to content

Commit f425870

Browse files
authored
[client] Avoid duplicated agent close (#4383)
1 parent f9d64a0 commit f425870

File tree

2 files changed

+32
-11
lines changed

2 files changed

+32
-11
lines changed

client/internal/peer/ice/agent.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ice
22

33
import (
4+
"sync"
45
"time"
56

67
"github.com/pion/ice/v3"
@@ -23,7 +24,20 @@ const (
2324
iceRelayAcceptanceMinWaitDefault = 2 * time.Second
2425
)
2526

26-
func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ice.Agent, error) {
27+
type ThreadSafeAgent struct {
28+
*ice.Agent
29+
once sync.Once
30+
}
31+
32+
func (a *ThreadSafeAgent) Close() error {
33+
var err error
34+
a.once.Do(func() {
35+
err = a.Agent.Close()
36+
})
37+
return err
38+
}
39+
40+
func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ThreadSafeAgent, error) {
2741
iceKeepAlive := iceKeepAlive()
2842
iceDisconnectedTimeout := iceDisconnectedTimeout()
2943
iceFailedTimeout := iceFailedTimeout()
@@ -61,7 +75,12 @@ func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candida
6175
agentConfig.NetworkTypes = []ice.NetworkType{ice.NetworkTypeUDP4}
6276
}
6377

64-
return ice.NewAgent(agentConfig)
78+
agent, err := ice.NewAgent(agentConfig)
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
return &ThreadSafeAgent{Agent: agent}, nil
6584
}
6685

6786
func GenerateICECredentials() (string, string, error) {

client/internal/peer/worker_ice.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type WorkerICE struct {
4242
statusRecorder *Status
4343
hasRelayOnLocally bool
4444

45-
agent *ice.Agent
45+
agent *icemaker.ThreadSafeAgent
4646
agentDialerCancel context.CancelFunc
4747
agentConnecting bool // while it is true, drop all incoming offers
4848
lastSuccess time.Time // with this avoid the too frequent ICE agent recreation
@@ -121,6 +121,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
121121
if err := w.agent.Close(); err != nil {
122122
w.log.Warnf("failed to close ICE agent: %s", err)
123123
}
124+
w.agent = nil
124125
// todo consider to switch to Relay connection while establishing a new ICE connection
125126
}
126127

@@ -195,7 +196,7 @@ func (w *WorkerICE) Close() {
195196
w.agent = nil
196197
}
197198

198-
func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*ice.Agent, error) {
199+
func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*icemaker.ThreadSafeAgent, error) {
199200
agent, err := icemaker.NewAgent(w.iFaceDiscover, w.config.ICEConfig, candidates, w.localUfrag, w.localPwd)
200201
if err != nil {
201202
return nil, fmt.Errorf("create agent: %w", err)
@@ -230,7 +231,7 @@ func (w *WorkerICE) SessionID() ICESessionID {
230231
// will block until connection succeeded
231232
// but it won't release if ICE Agent went into Disconnected or Failed state,
232233
// so we have to cancel it with the provided context once agent detected a broken connection
233-
func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAnswer *OfferAnswer) {
234+
func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent, remoteOfferAnswer *OfferAnswer) {
234235
w.log.Debugf("gather candidates")
235236
if err := agent.GatherCandidates(); err != nil {
236237
w.log.Warnf("failed to gather candidates: %s", err)
@@ -239,7 +240,7 @@ func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAn
239240
}
240241

241242
w.log.Debugf("turn agent dial")
242-
remoteConn, err := w.turnAgentDial(ctx, remoteOfferAnswer)
243+
remoteConn, err := w.turnAgentDial(ctx, agent, remoteOfferAnswer)
243244
if err != nil {
244245
w.log.Debugf("failed to dial the remote peer: %s", err)
245246
w.closeAgent(agent, w.agentDialerCancel)
@@ -290,13 +291,14 @@ func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAn
290291
w.conn.onICEConnectionIsReady(selectedPriority(pair), ci)
291292
}
292293

293-
func (w *WorkerICE) closeAgent(agent *ice.Agent, cancel context.CancelFunc) {
294+
func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.CancelFunc) {
294295
cancel()
295296
if err := agent.Close(); err != nil {
296297
w.log.Warnf("failed to close ICE agent: %s", err)
297298
}
298299

299300
w.muxAgent.Lock()
301+
// todo review does it make sense to generate new session ID all the time when w.agent==agent
300302
sessionID, err := NewICESessionID()
301303
if err != nil {
302304
w.log.Errorf("failed to create new session ID: %s", err)
@@ -379,7 +381,7 @@ func (w *WorkerICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidat
379381
w.config.Key)
380382
}
381383

382-
func (w *WorkerICE) onConnectionStateChange(agent *ice.Agent, dialerCancel context.CancelFunc) func(ice.ConnectionState) {
384+
func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dialerCancel context.CancelFunc) func(ice.ConnectionState) {
383385
return func(state ice.ConnectionState) {
384386
w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
385387
switch state {
@@ -412,12 +414,12 @@ func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool
412414
return false
413415
}
414416

415-
func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
417+
func (w *WorkerICE) turnAgentDial(ctx context.Context, agent *icemaker.ThreadSafeAgent, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
416418
isControlling := w.config.LocalKey > w.config.Key
417419
if isControlling {
418-
return w.agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
420+
return agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
419421
} else {
420-
return w.agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
422+
return agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
421423
}
422424
}
423425

0 commit comments

Comments
 (0)