- 
                Notifications
    
You must be signed in to change notification settings  - Fork 5
 
SFU Refactoring #52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SFU Refactoring #52
Changes from all commits
bf6b987
              c5c6206
              ba2fa4b
              85aa12c
              ba6eb3f
              abef0c5
              532773f
              4d7970c
              83a49b5
              ce32481
              b47600c
              a8312b0
              0b75dfc
              e0f4cb7
              34ae746
              877f7c2
              e2d73f9
              bd023b6
              b53d197
              40cb5fe
              c629c7e
              3274194
              6b31a98
              b991a3a
              9d45674
              1c3b702
              82dac24
              415e242
              cc09692
              db900b2
              2f809b1
              01fef5c
              479863d
              7fbacad
              a4f420e
              67e469e
              ff687e2
              d5ffe81
              5563dd5
              ab53c8e
              0c8d528
              bd313c0
              1b7bdbe
              39deed0
              74014f0
              5fe188d
              88997da
              d2cce02
              ec67906
              4ef47e0
              ce110b5
              ae5da43
              73a6947
              5a78cca
              eecdde2
              e208b4b
              70a4ede
              fd545d0
              2649e1f
              0369070
              dc09318
              537f4c0
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -1,4 +1,6 @@ | ||
| homeserverurl: "http://localhost:8008" | ||
| userid: "@sfu:shadowfax" | ||
| accesstoken: "..." | ||
| timeout: 30 | ||
| matrix: | ||
| homeserverurl: "http://localhost:8008" | ||
| userid: "@sfu:shadowfax" | ||
| accesstoken: "..." | ||
| conference: | ||
| timeout: 30 | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| package common | ||
| 
     | 
||
| import "sync/atomic" | ||
| 
     | 
||
| // In Go, unbounded channel means something different than what it means in Rust. | ||
| // I.e. unlike Rust, "unbounded" in Go means that the channel has **no buffer**, | ||
| // meaning that each attempt to send will block the channel until the receiver | ||
| // reads it. Majority of primitives here in `waterfall` are designed under assumption | ||
| // that sending is not blocking. | ||
| const UnboundedChannelSize = 128 | ||
| 
     | 
||
| // Creates a new channel, returns two counterparts of it where one can only send and another can only receive. | ||
| // Unlike traditional Go channels, these allow the receiver to mark the channel as closed which would then fail | ||
| // to send any messages to the channel over `Send“. | ||
| func NewChannel[M any]() (Sender[M], Receiver[M]) { | ||
| channel := make(chan M, UnboundedChannelSize) | ||
| closed := &atomic.Bool{} | ||
| sender := Sender[M]{channel, closed} | ||
| receiver := Receiver[M]{channel, closed} | ||
| return sender, receiver | ||
| } | ||
| 
     | 
||
| // Sender counterpart of the channel. | ||
| type Sender[M any] struct { | ||
| // The channel itself. | ||
| channel chan<- M | ||
| // Atomic variable that indicates whether the channel is closed. | ||
| receiverClosed *atomic.Bool | ||
| } | ||
| 
     | 
||
| // Tries to send a message if the channel is not closed. | ||
| // Returns the message back if the channel is closed. | ||
| func (s *Sender[M]) Send(message M) *M { | ||
| if !s.receiverClosed.Load() { | ||
| s.channel <- message | ||
| return nil | ||
| } else { | ||
| return &message | ||
| } | ||
| } | ||
| 
     | 
||
| // The receiver counterpart of the channel. | ||
| type Receiver[M any] struct { | ||
| // The channel itself. It's public, so that we can combine it in `select` statements. | ||
| Channel <-chan M | ||
| // Atomic variable that indicates whether the channel is closed. | ||
| receiverClosed *atomic.Bool | ||
| } | ||
| 
     | 
||
| // Marks the channel as closed, which means that no messages could be sent via this channel. | ||
| // Any attempt to send a message would result in an error. This is similar to closing the | ||
| // channel except that we don't close the underlying channel (since in Go receivers can't | ||
| // close the channel). | ||
| // | ||
| // This function reads (in a non-blocking way) all pending messages until blocking. Otherwise, | ||
| // they will stay forver in a channel and get lost. | ||
| func (r *Receiver[M]) Close() []M { | ||
| r.receiverClosed.Store(true) | ||
| 
     | 
||
| messages := make([]M, 0) | ||
| for { | ||
| msg, ok := <-r.Channel | ||
| if !ok { | ||
| break | ||
| } | ||
| messages = append(messages, msg) | ||
| } | ||
| 
     | 
||
| return messages | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| package common | ||
| 
     | 
||
| import ( | ||
| "errors" | ||
| "sync/atomic" | ||
| ) | ||
| 
     | 
||
| // MessageSink is a helper struct that allows to send messages to a message sink. | ||
| // The MessageSink abstracts the message sink which has a certain sender, so that | ||
| // the sender does not have to be specified every time a message is sent. | ||
| // At the same it guarantees that the caller can't alter the `sender`, which means that | ||
| // the sender can't impersonate another sender (and we guarantee this on a compile-time). | ||
| type MessageSink[SenderType comparable, MessageType any] struct { | ||
| // The sender of the messages. This is useful for multiple-producer-single-consumer scenarios. | ||
| sender SenderType | ||
| // The message sink to which the messages are sent. | ||
| messageSink chan<- Message[SenderType, MessageType] | ||
| // Atomic variable that indicates whether the message sink is sealed. | ||
| // This is used to prevent sending messages to a sealed message sink. | ||
| // The variable is atomic because it may be accessed from multiple goroutines. | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For clarity's sake, could we define what 'sealed' means here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, will add a comment about that! Perhaps I need to rewrite the comment for this variable altogether.  | 
||
| sealed atomic.Bool | ||
| } | ||
| 
     | 
||
| // Creates a new MessageSink. The function is generic allowing us to use it for multiple use cases. | ||
| func NewMessageSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *MessageSink[S, M] { | ||
| return &MessageSink[S, M]{ | ||
| sender: sender, | ||
| messageSink: messageSink, | ||
| } | ||
| } | ||
| 
     | 
||
| // Sends a message to the message sink. | ||
| func (s *MessageSink[S, M]) Send(message M) error { | ||
| if s.sealed.Load() { | ||
| return errors.New("The channel is sealed, you can't send any messages over it") | ||
| } | ||
| 
     | 
||
| s.messageSink <- Message[S, M]{ | ||
| Sender: s.sender, | ||
| Content: message, | ||
| } | ||
| 
     | 
||
| return nil | ||
| } | ||
| 
     | 
||
| // Seals the channel, which means that no messages could be sent via this channel. | ||
| // Any attempt to send a message would result in an error. This is similar to closing the | ||
| // channel except that we don't close the underlying channel (since there might be other | ||
| // senders that may want to use it). | ||
| func (s *MessageSink[S, M]) Seal() { | ||
| s.sealed.Store(true) | ||
| } | ||
| 
     | 
||
| // Messages that are sent from the peer to the conference in order to communicate with other peers. | ||
| // Since each peer is isolated from others, it can't influence the state of other peers directly. | ||
| type Message[SenderType comparable, MessageType any] struct { | ||
| // The sender of the message. | ||
| Sender SenderType | ||
| // The content of the message. | ||
| Content MessageType | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| package conference | ||
| 
     | 
||
| // Configuration for the group conferences (calls). | ||
| type Config struct { | ||
| // Keep-alive timeout for WebRTC connections. If no keep-alive has been received | ||
| // from the client for this duration, the connection is considered dead (in seconds). | ||
| KeepAliveTimeout int `yaml:"timeout"` | ||
| } | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| package conference | ||
| 
     | 
||
| import ( | ||
| "github.com/pion/webrtc/v3" | ||
| "golang.org/x/exp/slices" | ||
| "maunium.net/go/mautrix/event" | ||
| ) | ||
| 
     | 
||
| // Handle the `SFUMessage` event from the DataChannel message. | ||
| func (c *Conference) processSelectDCMessage(participant *Participant, msg event.SFUMessage) { | ||
| participant.logger.Info("Received select request over DC") | ||
| 
     | 
||
| // Find tracks based on what we were asked for. | ||
| tracks := c.getTracks(msg.Start) | ||
| 
     | 
||
| // Let's check if we have all the tracks that we were asked for are there. | ||
| // If not, we will list which are not available (later on we must inform participant | ||
| // about it unless the participant retries it). | ||
| if len(tracks) != len(msg.Start) { | ||
| for _, expected := range msg.Start { | ||
| found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool { | ||
| return track.StreamID() == expected.StreamID && track.ID() == expected.TrackID | ||
| }) | ||
| 
     | 
||
| if found == -1 { | ||
| c.logger.Warnf("Track not found: %s", expected.TrackID) | ||
| } | ||
| } | ||
| } | ||
| 
     | 
||
| // Subscribe to the found tracks. | ||
| for _, track := range tracks { | ||
| if err := participant.peer.SubscribeTo(track); err != nil { | ||
| participant.logger.Errorf("Failed to subscribe to track: %v", err) | ||
| return | ||
| } | ||
| } | ||
| } | ||
| 
     | 
||
| func (c *Conference) processAnswerDCMessage(participant *Participant, msg event.SFUMessage) { | ||
| participant.logger.Info("Received SDP answer over DC") | ||
| 
     | 
||
| if err := participant.peer.ProcessSDPAnswer(msg.SDP); err != nil { | ||
| participant.logger.Errorf("Failed to set SDP answer: %v", err) | ||
| return | ||
| } | ||
| } | ||
| 
     | 
||
| func (c *Conference) processPublishDCMessage(participant *Participant, msg event.SFUMessage) { | ||
| participant.logger.Info("Received SDP offer over DC") | ||
| 
     | 
||
| answer, err := participant.peer.ProcessSDPOffer(msg.SDP) | ||
| if err != nil { | ||
| participant.logger.Errorf("Failed to set SDP offer: %v", err) | ||
| return | ||
| } | ||
| 
     | 
||
| participant.streamMetadata = msg.Metadata | ||
| 
     | 
||
| participant.sendDataChannelMessage(event.SFUMessage{ | ||
| Op: event.SFUOperationAnswer, | ||
| SDP: answer.SDP, | ||
| Metadata: c.getAvailableStreamsFor(participant.id), | ||
| }) | ||
| } | ||
| 
     | 
||
| func (c *Conference) processUnpublishDCMessage(participant *Participant) { | ||
| participant.logger.Info("Received unpublish over DC") | ||
| } | ||
| 
     | 
||
| func (c *Conference) processAliveDCMessage(participant *Participant) { | ||
| participant.peer.ProcessHeartbeat() | ||
| } | ||
| 
     | 
||
| func (c *Conference) processMetadataDCMessage(participant *Participant, msg event.SFUMessage) { | ||
| participant.streamMetadata = msg.Metadata | ||
| c.resendMetadataToAllExcept(participant.id) | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that this is considered a bit of a code smell in Go, since a sender should never be trying to write to a closed channel. I guess this means we have something where we can't know this and need to check each time?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good that you've noticed it: tbh I did not like the part of writing this particular structure as it felt like it's not very elegant, but unfortunately I did not find a better way to write it.
The problem that I'm trying to solve is to indeed not write to the closed channel, but the problem is that from Go's standpoint the channel is not closed since the channels could only be closed in Go from the sender's side, but not from the receiver's side.
A practical example of where it happens in our code:
conf_id,and simply sends the message to the conference that is responsible for this message. So there is a channel between the router and a particular conference.The problem occurs because at the moment the conference is considered ended, the channel remains open (the conference can't close the channel because the conference only holds a receiver part of the channel). And in Go there is no way to check if the channel is alive or if there is someone listening on the channel. So this means that the router might have sent certain messages to the conference expecting that the conference will read them whereas, in reality, the conference has stopped listening on the channel by the moment the router gets to know that the conference is dead.
To sum up:
That's pretty much the problem that I tried to solve with this logic, i.e. create a wrapper around the channel where I could inform the sender that the receiver is not listening to the changes, so that if the sender tries to send the message to such a channel, it gets the message back and knows that there are no receivers anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed explanation - I've also found Go is a bit light on actually explaining how you would write correct & non-racy code sticking to the given paradigms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! I've also spent some time trying to understand how to do things safely. In Rust, for instance, we always know if there is a sender and if there is a receiver, but in Go, an elegant implementation of this does not work because of garbage collection: the receivers are not deleted immediately after leaving the scope of a function, but only get removed when the GC kicks in which means that the lifetime of the objects is not strictly defined, hence the consequence of us not knowing when the other counterpart of a channel is dead.