Skip to content

Commit 2829682

Browse files
peer: implement heartbeat handling for keepalive
Note that we start the keepalive routine once the peer is created. We do it like this since the keepalive deadline is actually quite high and I would say that if within that deadline no heartbeat messages were sent, then we can consider the connection as stalled. I.e. starting the keepalive timer only once the peer is connected is like sparing a second that a peer normally needs to establish a connection?
1 parent 6e6b4d5 commit 2829682

File tree

7 files changed

+52
-8
lines changed

7 files changed

+52
-8
lines changed

pkg/conference/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ package conference
33
// Configuration for the group conferences (calls).
44
type Config struct {
55
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received
6-
// from the client for this duration, the connection is considered dead.
6+
// from the client for this duration, the connection is considered dead (in seconds).
77
KeepAliveTimeout int `yaml:"timeout"`
88
}

pkg/conference/matrix.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package conference
22

33
import (
4+
"time"
5+
46
"github.com/matrix-org/waterfall/pkg/common"
57
"github.com/matrix-org/waterfall/pkg/peer"
68
"github.com/pion/webrtc/v3"
@@ -50,7 +52,8 @@ func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent *
5052
} else {
5153
messageSink := common.NewMessageSink(participantID, c.peerMessages)
5254

53-
peer, answer, err := peer.NewPeer(inviteEvent.Offer.SDP, messageSink, logger)
55+
keepAliveDeadline := time.Duration(c.config.KeepAliveTimeout) * time.Second
56+
peer, answer, err := peer.NewPeer(inviteEvent.Offer.SDP, messageSink, logger, keepAliveDeadline)
5457
if err != nil {
5558
logger.WithError(err).Errorf("Failed to process SDP offer")
5659
return err

pkg/conference/processor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ func (c *Conference) processPeerMessage(message common.Message[ParticipantID, pe
5151
participant.logger.Info("Joined the call")
5252

5353
case peer.LeftTheCall:
54-
participant.logger.Info("Left the call")
54+
participant.logger.Info("Left the call: %s", msg.Reason)
5555
c.removeParticipant(message.Sender)
56-
c.signaling.SendHangup(participant.asMatrixRecipient(), event.CallHangupUnknownError)
56+
c.signaling.SendHangup(participant.asMatrixRecipient(), msg.Reason)
5757

5858
case peer.NewTrackPublished:
5959
participant.logger.Infof("Published new track: %s", msg.Track.ID())
@@ -195,9 +195,9 @@ func (c *Conference) handleDataChannelMessage(participant *Participant, sfuMessa
195195
case event.SFUOperationUnpublish:
196196
participant.logger.Info("Received unpublish over DC")
197197

198-
// TODO: Clarify the semantics of unpublish.
199198
case event.SFUOperationAlive:
200-
// FIXME: Handle the heartbeat message here (updating the last timestamp etc).
199+
participant.peer.ProcessHeartbeat()
200+
201201
case event.SFUOperationMetadata:
202202
participant.streamMetadata = sfuMessage.Metadata
203203
c.resendMetadataToAllExcept(participant.id)

pkg/peer/keepalive.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package peer
2+
3+
import "time"
4+
5+
type HeartBeat struct{}
6+
7+
// Starts a goroutine that will execute `onDeadLine` closure in case nothing has been published
8+
// to the `heartBeat` channel for `deadline` duration. The goroutine stops once the channel is closed.
9+
func startKeepAlive(deadline time.Duration, heartBeat <-chan HeartBeat, onDeadLine func()) {
10+
go func() {
11+
for {
12+
select {
13+
case <-time.After(deadline):
14+
onDeadLine()
15+
return
16+
case _, ok := <-heartBeat:
17+
if !ok {
18+
return
19+
}
20+
}
21+
}
22+
}()
23+
}

pkg/peer/messages.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package peer
22

33
import (
44
"github.com/pion/webrtc/v3"
5+
"maunium.net/go/mautrix/event"
56
)
67

78
// Due to the limitation of Go, we're using the `interface{}` to be able to use switch the actual
@@ -10,7 +11,9 @@ type MessageContent = interface{}
1011

1112
type JoinedTheCall struct{}
1213

13-
type LeftTheCall struct{}
14+
type LeftTheCall struct {
15+
Reason event.CallHangupReason
16+
}
1417

1518
type NewTrackPublished struct {
1619
Track *webrtc.TrackLocalStaticRTP

pkg/peer/peer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package peer
33
import (
44
"errors"
55
"sync"
6+
"time"
67

78
"github.com/matrix-org/waterfall/pkg/common"
89
"github.com/pion/webrtc/v3"
910
"github.com/sirupsen/logrus"
11+
"maunium.net/go/mautrix/event"
1012
)
1113

1214
var (
@@ -28,6 +30,7 @@ type Peer[ID comparable] struct {
2830
logger *logrus.Entry
2931
peerConnection *webrtc.PeerConnection
3032
sink *common.MessageSink[ID, MessageContent]
33+
heartbeat chan HeartBeat
3134

3235
dataChannelMutex sync.Mutex
3336
dataChannel *webrtc.DataChannel
@@ -38,6 +41,7 @@ func NewPeer[ID comparable](
3841
sdpOffer string,
3942
sink *common.MessageSink[ID, MessageContent],
4043
logger *logrus.Entry,
44+
keepAliveDeadline time.Duration,
4145
) (*Peer[ID], *webrtc.SessionDescription, error) {
4246
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{})
4347
if err != nil {
@@ -49,6 +53,7 @@ func NewPeer[ID comparable](
4953
logger: logger,
5054
peerConnection: peerConnection,
5155
sink: sink,
56+
heartbeat: make(chan HeartBeat, common.UnboundedChannelSize),
5257
}
5358

5459
peerConnection.OnTrack(peer.onRtpTrackReceived)
@@ -63,6 +68,8 @@ func NewPeer[ID comparable](
6368
if sdpAnswer, err := peer.ProcessSDPOffer(sdpOffer); err != nil {
6469
return nil, nil, err
6570
} else {
71+
onDeadline := func() { peer.sink.Send(LeftTheCall{event.CallHangupKeepAliveTimeout}) }
72+
startKeepAlive(keepAliveDeadline, peer.heartbeat, onDeadline)
6673
return peer, sdpAnswer, nil
6774
}
6875
}
@@ -192,3 +199,10 @@ func (p *Peer[ID]) ProcessSDPOffer(sdpOffer string) (*webrtc.SessionDescription,
192199

193200
return &answer, nil
194201
}
202+
203+
// New heartbeat received (keep-alive message that is periodically sent by the remote peer).
204+
// We need to update the last heartbeat time. If the peer is not active for too long, we will
205+
// consider peer's connection as stalled and will close it.
206+
func (p *Peer[ID]) ProcessHeartbeat() {
207+
p.heartbeat <- HeartBeat{}
208+
}

pkg/peer/webrtc.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/pion/rtcp"
99
"github.com/pion/webrtc/v3"
10+
"maunium.net/go/mautrix/event"
1011
)
1112

1213
// A callback that is called once we receive first RTP packets from a track, i.e.
@@ -127,7 +128,7 @@ func (p *Peer[ID]) onConnectionStateChanged(state webrtc.PeerConnectionState) {
127128

128129
switch state {
129130
case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateClosed:
130-
p.sink.Send(LeftTheCall{})
131+
p.sink.Send(LeftTheCall{event.CallHangupUserHangup})
131132
case webrtc.PeerConnectionStateConnected:
132133
p.sink.Send(JoinedTheCall{})
133134
}

0 commit comments

Comments
 (0)