Skip to content

Commit 2b0597d

Browse files
Merge pull request #52 from matrix-org/refactoring
SFU Refactoring
2 parents 50b12b2 + 537f4c0 commit 2b0597d

39 files changed

+1928
-1583
lines changed

.golangci.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,6 @@ linters:
2929
- gomnd # we use status code numbers and for our use case it's not practical
3030
- godox # we have TODOs at this stage of the project, enable in future
3131
- forbidigo # we use things like fmt.Printf for debugging, enable in future
32+
- wsl # somehow this plugin causes more harm than use as it enables lots of things to be configured without causing spaghetti-code (grouping similar things together)
33+
- nlreturn # not always practical, it was disabled before strict lints were introduced, then added, now it's clear why it was disabled at the first place :)
3234
fast: true

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ COPY go.sum ./
1313
# source code do not invalidate our downloaded layer.
1414
RUN go mod download
1515

16-
COPY ./src ./src
16+
COPY ./pkg ./pkg
1717

18-
RUN go build -o /waterfall ./src
18+
RUN go build -o /waterfall ./pkg
1919

2020

2121
##

config.yaml.sample

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
homeserverurl: "http://localhost:8008"
2-
userid: "@sfu:shadowfax"
3-
accesstoken: "..."
4-
timeout: 30
1+
matrix:
2+
homeserverurl: "http://localhost:8008"
3+
userid: "@sfu:shadowfax"
4+
accesstoken: "..."
5+
conference:
6+
timeout: 30

docker-compose.yaml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ services:
99
environment:
1010
# Set the `CONFIG` to the configuration you want.
1111
CONFIG: |
12-
homeserverurl: "http://localhost:8008"
13-
userid: "@sfu:shadowfax"
14-
accesstoken: "..."
15-
timeout: 30
12+
matrix:
13+
homeserverurl: "http://localhost:8008"
14+
userid: "@sfu:shadowfax"
15+
accesstoken: "..."
16+
conference:
17+
timeout: 30

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require github.com/pion/webrtc/v3 v3.1.31
77
require (
88
github.com/pion/rtcp v1.2.9
99
github.com/sirupsen/logrus v1.9.0
10+
golang.org/x/exp v0.0.0-20221114191408-850992195362
1011
gopkg.in/yaml.v3 v3.0.1
1112
maunium.net/go/mautrix v0.11.0
1213
)
@@ -34,7 +35,7 @@ require (
3435
github.com/tidwall/sjson v1.2.4 // indirect
3536
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
3637
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
37-
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
38+
golang.org/x/sys v0.1.0 // indirect
3839
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
3940
)
4041

go.sum

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
103103
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
104104
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
105105
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
106+
golang.org/x/exp v0.0.0-20221114191408-850992195362 h1:NoHlPRbyl1VFI6FjwHtPQCN7wAMXI6cKcqrmXhOOfBQ=
107+
golang.org/x/exp v0.0.0-20221114191408-850992195362/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
106108
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
107109
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
108110
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@@ -134,8 +136,9 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w
134136
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
135137
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
136138
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
137-
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
138139
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
140+
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
141+
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
139142
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
140143
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
141144
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

pkg/common/channel.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package common
2+
3+
import "sync/atomic"
4+
5+
// In Go, unbounded channel means something different than what it means in Rust.
6+
// I.e. unlike Rust, "unbounded" in Go means that the channel has **no buffer**,
7+
// meaning that each attempt to send will block the channel until the receiver
8+
// reads it. Majority of primitives here in `waterfall` are designed under assumption
9+
// that sending is not blocking.
10+
const UnboundedChannelSize = 128
11+
12+
// Creates a new channel, returns two counterparts of it where one can only send and another can only receive.
13+
// Unlike traditional Go channels, these allow the receiver to mark the channel as closed which would then fail
14+
// to send any messages to the channel over `Send“.
15+
func NewChannel[M any]() (Sender[M], Receiver[M]) {
16+
channel := make(chan M, UnboundedChannelSize)
17+
closed := &atomic.Bool{}
18+
sender := Sender[M]{channel, closed}
19+
receiver := Receiver[M]{channel, closed}
20+
return sender, receiver
21+
}
22+
23+
// Sender counterpart of the channel.
24+
type Sender[M any] struct {
25+
// The channel itself.
26+
channel chan<- M
27+
// Atomic variable that indicates whether the channel is closed.
28+
receiverClosed *atomic.Bool
29+
}
30+
31+
// Tries to send a message if the channel is not closed.
32+
// Returns the message back if the channel is closed.
33+
func (s *Sender[M]) Send(message M) *M {
34+
if !s.receiverClosed.Load() {
35+
s.channel <- message
36+
return nil
37+
} else {
38+
return &message
39+
}
40+
}
41+
42+
// The receiver counterpart of the channel.
43+
type Receiver[M any] struct {
44+
// The channel itself. It's public, so that we can combine it in `select` statements.
45+
Channel <-chan M
46+
// Atomic variable that indicates whether the channel is closed.
47+
receiverClosed *atomic.Bool
48+
}
49+
50+
// Marks the channel as closed, which means that no messages could be sent via this channel.
51+
// Any attempt to send a message would result in an error. This is similar to closing the
52+
// channel except that we don't close the underlying channel (since in Go receivers can't
53+
// close the channel).
54+
//
55+
// This function reads (in a non-blocking way) all pending messages until blocking. Otherwise,
56+
// they will stay forver in a channel and get lost.
57+
func (r *Receiver[M]) Close() []M {
58+
r.receiverClosed.Store(true)
59+
60+
messages := make([]M, 0)
61+
for {
62+
msg, ok := <-r.Channel
63+
if !ok {
64+
break
65+
}
66+
messages = append(messages, msg)
67+
}
68+
69+
return messages
70+
}

pkg/common/message_sink.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package common
2+
3+
import (
4+
"errors"
5+
"sync/atomic"
6+
)
7+
8+
// MessageSink is a helper struct that allows to send messages to a message sink.
9+
// The MessageSink abstracts the message sink which has a certain sender, so that
10+
// the sender does not have to be specified every time a message is sent.
11+
// At the same it guarantees that the caller can't alter the `sender`, which means that
12+
// the sender can't impersonate another sender (and we guarantee this on a compile-time).
13+
type MessageSink[SenderType comparable, MessageType any] struct {
14+
// The sender of the messages. This is useful for multiple-producer-single-consumer scenarios.
15+
sender SenderType
16+
// The message sink to which the messages are sent.
17+
messageSink chan<- Message[SenderType, MessageType]
18+
// Atomic variable that indicates whether the message sink is sealed.
19+
// This is used to prevent sending messages to a sealed message sink.
20+
// The variable is atomic because it may be accessed from multiple goroutines.
21+
sealed atomic.Bool
22+
}
23+
24+
// Creates a new MessageSink. The function is generic allowing us to use it for multiple use cases.
25+
func NewMessageSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *MessageSink[S, M] {
26+
return &MessageSink[S, M]{
27+
sender: sender,
28+
messageSink: messageSink,
29+
}
30+
}
31+
32+
// Sends a message to the message sink.
33+
func (s *MessageSink[S, M]) Send(message M) error {
34+
if s.sealed.Load() {
35+
return errors.New("The channel is sealed, you can't send any messages over it")
36+
}
37+
38+
s.messageSink <- Message[S, M]{
39+
Sender: s.sender,
40+
Content: message,
41+
}
42+
43+
return nil
44+
}
45+
46+
// Seals the channel, which means that no messages could be sent via this channel.
47+
// Any attempt to send a message would result in an error. This is similar to closing the
48+
// channel except that we don't close the underlying channel (since there might be other
49+
// senders that may want to use it).
50+
func (s *MessageSink[S, M]) Seal() {
51+
s.sealed.Store(true)
52+
}
53+
54+
// Messages that are sent from the peer to the conference in order to communicate with other peers.
55+
// Since each peer is isolated from others, it can't influence the state of other peers directly.
56+
type Message[SenderType comparable, MessageType any] struct {
57+
// The sender of the message.
58+
Sender SenderType
59+
// The content of the message.
60+
Content MessageType
61+
}

pkg/conference/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package conference
2+
3+
// Configuration for the group conferences (calls).
4+
type Config struct {
5+
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received
6+
// from the client for this duration, the connection is considered dead (in seconds).
7+
KeepAliveTimeout int `yaml:"timeout"`
8+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package conference
2+
3+
import (
4+
"github.com/pion/webrtc/v3"
5+
"golang.org/x/exp/slices"
6+
"maunium.net/go/mautrix/event"
7+
)
8+
9+
// Handle the `SFUMessage` event from the DataChannel message.
10+
func (c *Conference) processSelectDCMessage(participant *Participant, msg event.SFUMessage) {
11+
participant.logger.Info("Received select request over DC")
12+
13+
// Find tracks based on what we were asked for.
14+
tracks := c.getTracks(msg.Start)
15+
16+
// Let's check if we have all the tracks that we were asked for are there.
17+
// If not, we will list which are not available (later on we must inform participant
18+
// about it unless the participant retries it).
19+
if len(tracks) != len(msg.Start) {
20+
for _, expected := range msg.Start {
21+
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool {
22+
return track.StreamID() == expected.StreamID && track.ID() == expected.TrackID
23+
})
24+
25+
if found == -1 {
26+
c.logger.Warnf("Track not found: %s", expected.TrackID)
27+
}
28+
}
29+
}
30+
31+
// Subscribe to the found tracks.
32+
for _, track := range tracks {
33+
if err := participant.peer.SubscribeTo(track); err != nil {
34+
participant.logger.Errorf("Failed to subscribe to track: %v", err)
35+
return
36+
}
37+
}
38+
}
39+
40+
func (c *Conference) processAnswerDCMessage(participant *Participant, msg event.SFUMessage) {
41+
participant.logger.Info("Received SDP answer over DC")
42+
43+
if err := participant.peer.ProcessSDPAnswer(msg.SDP); err != nil {
44+
participant.logger.Errorf("Failed to set SDP answer: %v", err)
45+
return
46+
}
47+
}
48+
49+
func (c *Conference) processPublishDCMessage(participant *Participant, msg event.SFUMessage) {
50+
participant.logger.Info("Received SDP offer over DC")
51+
52+
answer, err := participant.peer.ProcessSDPOffer(msg.SDP)
53+
if err != nil {
54+
participant.logger.Errorf("Failed to set SDP offer: %v", err)
55+
return
56+
}
57+
58+
participant.streamMetadata = msg.Metadata
59+
60+
participant.sendDataChannelMessage(event.SFUMessage{
61+
Op: event.SFUOperationAnswer,
62+
SDP: answer.SDP,
63+
Metadata: c.getAvailableStreamsFor(participant.id),
64+
})
65+
}
66+
67+
func (c *Conference) processUnpublishDCMessage(participant *Participant) {
68+
participant.logger.Info("Received unpublish over DC")
69+
}
70+
71+
func (c *Conference) processAliveDCMessage(participant *Participant) {
72+
participant.peer.ProcessHeartbeat()
73+
}
74+
75+
func (c *Conference) processMetadataDCMessage(participant *Participant, msg event.SFUMessage) {
76+
participant.streamMetadata = msg.Metadata
77+
c.resendMetadataToAllExcept(participant.id)
78+
}

0 commit comments

Comments
 (0)