Skip to content

Commit 90ae985

Browse files
committed
Simplify how we handle ping and pong
Signed-off-by: Šimon Brandner <[email protected]>
1 parent 098be79 commit 90ae985

File tree

5 files changed

+52
-72
lines changed

5 files changed

+52
-72
lines changed

pkg/conference/matrix_message_processor.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,24 @@ func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent *
5252
} else {
5353
messageSink := common.NewMessageSink(participantID, c.peerMessages)
5454

55-
pingInterval := time.Duration(c.config.PingInterval) * time.Second
56-
keepAliveDeadline := time.Duration(c.config.KeepAliveTimeout) * time.Second
57-
sendPing := func() {
58-
participant.sendDataChannelMessage(event.Event{
59-
Type: event.FocusCallPing,
60-
Content: event.Content{},
61-
})
62-
}
63-
onDeadLine := func() {
64-
messageSink.Send(peer.LeftTheCall{Reason: event.CallHangupKeepAliveTimeout})
65-
}
6655
peer, answer, err := peer.NewPeer(
6756
inviteEvent.Offer.SDP,
6857
messageSink,
6958
logger,
70-
pingInterval,
71-
keepAliveDeadline,
72-
sendPing,
73-
onDeadLine,
59+
peer.PingPongConfig{
60+
Interval: time.Duration(c.config.PingInterval) * time.Second,
61+
Deadline: time.Duration(c.config.KeepAliveTimeout) * time.Second,
62+
PongChannel: make(chan peer.Pong, common.UnboundedChannelSize),
63+
SendPing: func() {
64+
participant.sendDataChannelMessage(event.Event{
65+
Type: event.FocusCallPing,
66+
Content: event.Content{},
67+
})
68+
},
69+
OnDeadLine: func() {
70+
messageSink.Send(peer.LeftTheCall{Reason: event.CallHangupKeepAliveTimeout})
71+
},
72+
},
7473
)
7574
if err != nil {
7675
logger.WithError(err).Errorf("Failed to process SDP offer")

pkg/peer/keepalive.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

pkg/peer/peer.go

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"errors"
55
"io"
66
"sync"
7-
"time"
87

98
"github.com/matrix-org/waterfall/pkg/common"
109
"github.com/pion/rtcp"
@@ -33,12 +32,7 @@ type Peer[ID comparable] struct {
3332
logger *logrus.Entry
3433
peerConnection *webrtc.PeerConnection
3534
sink *common.MessageSink[ID, MessageContent]
36-
37-
pong chan Pong
38-
sendPing func()
39-
onDeadLine func()
40-
pingInterval time.Duration
41-
keepAliveDeadline time.Duration
35+
pingPongConfig PingPongConfig
4236

4337
dataChannelMutex sync.Mutex
4438
dataChannel *webrtc.DataChannel
@@ -49,10 +43,7 @@ func NewPeer[ID comparable](
4943
sdpOffer string,
5044
sink *common.MessageSink[ID, MessageContent],
5145
logger *logrus.Entry,
52-
pingInterval time.Duration,
53-
keepAliveDeadline time.Duration,
54-
sendPing func(),
55-
onDeadLine func(),
46+
pingPongConfig PingPongConfig,
5647
) (*Peer[ID], *webrtc.SessionDescription, error) {
5748
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{})
5849
if err != nil {
@@ -64,12 +55,7 @@ func NewPeer[ID comparable](
6455
logger: logger,
6556
peerConnection: peerConnection,
6657
sink: sink,
67-
68-
pong: make(chan Pong, common.UnboundedChannelSize),
69-
pingInterval: pingInterval,
70-
keepAliveDeadline: keepAliveDeadline,
71-
sendPing: sendPing,
72-
onDeadLine: onDeadLine,
58+
pingPongConfig: pingPongConfig,
7359
}
7460

7561
peerConnection.OnTrack(peer.onRtpTrackReceived)
@@ -84,6 +70,7 @@ func NewPeer[ID comparable](
8470
if sdpAnswer, err := peer.ProcessSDPOffer(sdpOffer); err != nil {
8571
return nil, nil, err
8672
} else {
73+
startPingPong(pingPongConfig)
8774
return peer, sdpAnswer, nil
8875
}
8976
}
@@ -263,5 +250,5 @@ func (p *Peer[ID]) ProcessSDPOffer(sdpOffer string) (*webrtc.SessionDescription,
263250
// We need to update the last heartbeat time. If the peer is not active for too long, we will
264251
// consider peer's connection as stalled and will close it.
265252
func (p *Peer[ID]) ProcessPong() {
266-
p.pong <- Pong{}
253+
p.pingPongConfig.PongChannel <- Pong{}
267254
}

pkg/peer/ping_pong.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package peer
2+
3+
import "time"
4+
5+
type Pong struct{}
6+
7+
type PingPongConfig struct {
8+
Interval time.Duration
9+
Deadline time.Duration
10+
PongChannel chan Pong
11+
SendPing func()
12+
OnDeadLine func()
13+
}
14+
15+
// Starts a goroutine that will execute `onDeadLine` closure in case nothing has been published
16+
// to the `heartBeat` channel for `deadline` duration. The goroutine stops once the channel is closed.
17+
func startPingPong(config PingPongConfig) {
18+
go func() {
19+
for range time.Tick(config.Interval) {
20+
config.SendPing()
21+
22+
select {
23+
case <-time.After(config.Deadline):
24+
config.OnDeadLine()
25+
return
26+
case _, ok := <-config.PongChannel:
27+
if !ok {
28+
return
29+
}
30+
}
31+
}
32+
}()
33+
}

pkg/peer/webrtc.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,6 @@ func (p *Peer[ID]) onDataChannelReady(dc *webrtc.DataChannel) {
133133
dc.OnOpen(func() {
134134
p.logger.Info("Data channel opened")
135135
p.sink.Send(DataChannelAvailable{})
136-
137-
startKeepAlive(
138-
p.pingInterval,
139-
p.keepAliveDeadline,
140-
p.pong,
141-
p.sendPing,
142-
p.onDeadLine,
143-
)
144136
})
145137

146138
dc.OnMessage(func(msg webrtc.DataChannelMessage) {

0 commit comments

Comments
 (0)