Skip to content

Commit 778dab8

Browse files
conference: fix unsoundness in channel usage
1 parent c998449 commit 778dab8

File tree

4 files changed

+18
-17
lines changed

4 files changed

+18
-17
lines changed

pkg/common/channel.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import "sync/atomic"
66
// Unlike traditional Go channels, these allow the receiver to mark the channel as closed which would then fail
77
// to send any messages to the channel over `Send“.
88
func NewChannel[M any]() (Sender[M], Receiver[M]) {
9-
channel := make(chan M)
9+
channel := make(chan M, 128)
1010
closed := &atomic.Bool{}
1111
sender := Sender[M]{channel, closed}
1212
receiver := Receiver[M]{channel, closed}

pkg/conference/processor.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,19 @@ func (c *Conference) processMessages() {
2929
c.matrixMessages.Close()
3030

3131
// Let's read remaining messages from the channel (otherwise the caller will be
32-
// blocked in case of unbuffered channels).
33-
var message *MatrixMessage
34-
select {
35-
case msg := <-c.matrixMessages.Channel:
36-
*message = msg
37-
default:
38-
// Ok, no messages in the queue, nice.
32+
// blocked in case of unbuffered channels). We must read **all** pending messages.
33+
messages := make([]MatrixMessage, 0)
34+
for {
35+
msg, ok := <-c.matrixMessages.Channel
36+
if !ok {
37+
break
38+
}
39+
messages = append(messages, msg)
3940
}
4041

4142
// Send the information that we ended to the owner and pass the message
4243
// that we did not process (so that we don't drop it silently).
43-
c.endNotifier.Notify(message)
44+
c.endNotifier.Notify(messages)
4445
return
4546
}
4647
}

pkg/conference/start.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func StartConference(
4343
matrixMessages: receiver,
4444
endNotifier: conferenceEndNotifier,
4545
participants: make(map[ParticipantID]*Participant),
46-
peerMessages: make(chan common.Message[ParticipantID, peer.MessageContent]),
46+
peerMessages: make(chan common.Message[ParticipantID, peer.MessageContent], 128),
4747
logger: logrus.WithFields(logrus.Fields{"conf_id": confID}),
4848
}
4949

@@ -60,5 +60,5 @@ func StartConference(
6060

6161
type ConferenceEndNotifier interface {
6262
// Called when the conference ends.
63-
Notify(unread *MatrixMessage)
63+
Notify(unread []MatrixMessage)
6464
}

pkg/router.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func newRouter(matrix *signaling.MatrixClient, config conf.Config) chan<- Router
4444
matrix: matrix,
4545
conferenceSinks: make(map[string]*common.Sender[conf.MatrixMessage]),
4646
config: config,
47-
channel: make(chan RouterMessage),
47+
channel: make(chan RouterMessage, 128),
4848
}
4949

5050
// Start the main loop of the Router.
@@ -59,9 +59,9 @@ func newRouter(matrix *signaling.MatrixClient, config conf.Config) chan<- Router
5959
// Remove the conference that ended from the list.
6060
delete(router.conferenceSinks, msg.conferenceID)
6161
// Process the message that was not read by the conference.
62-
if msg.unread != nil {
63-
// TODO: We must handle this message to avoid glare on session end.
64-
// router.handleMatrixEvent(*msg.unread)
62+
if len(msg.unread) > 0 {
63+
// FIXME: We must handle these messages!
64+
logrus.Warnf("Unread messages: %v", len(msg.unread))
6565
}
6666
}
6767
}
@@ -162,7 +162,7 @@ type ConferenceEndedMessage struct {
162162
// The ID of the conference that has ended.
163163
conferenceID string
164164
// A message (or messages in future) that has not been processed (if any).
165-
unread *conf.MatrixMessage
165+
unread []conf.MatrixMessage
166166
}
167167

168168
// A simple wrapper around channel that contains the ID of the conference that sent the message.
@@ -180,7 +180,7 @@ func createConferenceEndNotifier(conferenceID string, channel chan<- RouterMessa
180180
}
181181

182182
// A function that a conference calls when it is ended.
183-
func (c *ConferenceEndNotifier) Notify(unread *conf.MatrixMessage) {
183+
func (c *ConferenceEndNotifier) Notify(unread []conf.MatrixMessage) {
184184
c.channel <- ConferenceEndedMessage{
185185
conferenceID: c.conferenceID,
186186
unread: unread,

0 commit comments

Comments
 (0)