Skip to content

Commit 673a0aa

Browse files
channel: improve documentation and API surface
1 parent bcdfa49 commit 673a0aa

File tree

4 files changed

+41
-19
lines changed

4 files changed

+41
-19
lines changed

pkg/common/channel.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,34 @@ package common
22

33
import "sync/atomic"
44

5+
// In Go, unbounded channel means something different than what it means in Rust.
6+
// I.e. unlike Rust, "unbounded" in Go means that the channel has **no buffer**,
7+
// meaning that each attempt to send will block the channel until the receiver
8+
// reads it. Majority of primitives here in `waterfall` are designed under assumption
9+
// that sending is not blocking.
10+
const UnboundedChannelSize = 128
11+
512
// Creates a new channel, returns two counterparts of it where one can only send and another can only receive.
613
// Unlike traditional Go channels, these allow the receiver to mark the channel as closed which would then fail
714
// to send any messages to the channel over `Send“.
815
func NewChannel[M any]() (Sender[M], Receiver[M]) {
9-
channel := make(chan M, 128)
16+
channel := make(chan M, UnboundedChannelSize)
1017
closed := &atomic.Bool{}
1118
sender := Sender[M]{channel, closed}
1219
receiver := Receiver[M]{channel, closed}
1320
return sender, receiver
1421
}
1522

23+
// Sender counterpart of the channel.
1624
type Sender[M any] struct {
17-
channel chan<- M
25+
// The channel itself.
26+
channel chan<- M
27+
// Atomic variable that indicates whether the channel is closed.
1828
receiverClosed *atomic.Bool
1929
}
2030

31+
// Tries to send a message if the channel is not closed.
32+
// Returns the message back if the channel is closed.
2133
func (s *Sender[M]) Send(message M) *M {
2234
if !s.receiverClosed.Load() {
2335
s.channel <- message
@@ -27,11 +39,32 @@ func (s *Sender[M]) Send(message M) *M {
2739
}
2840
}
2941

42+
// The receiver counterpart of the channel.
3043
type Receiver[M any] struct {
31-
Channel <-chan M
44+
// The channel itself. It's public, so that we can combine it in `select` statements.
45+
Channel <-chan M
46+
// Atomic variable that indicates whether the channel is closed.
3247
receiverClosed *atomic.Bool
3348
}
3449

35-
func (r *Receiver[M]) Close() {
50+
// Marks the channel as closed, which means that no messages could be sent via this channel.
51+
// Any attempt to send a message would result in an error. This is similar to closing the
52+
// channel except that we don't close the underlying channel (since in Go receivers can't
53+
// close the channel).
54+
//
55+
// This function reads (in a non-blocking way) all pending messages until blocking. Otherwise,
56+
// they will stay forver in a channel and get lost.
57+
func (r *Receiver[M]) Close() []M {
3658
r.receiverClosed.Store(true)
59+
60+
messages := make([]M, 0)
61+
for {
62+
msg, ok := <-r.Channel
63+
if !ok {
64+
break
65+
}
66+
messages = append(messages, msg)
67+
}
68+
69+
return messages
3770
}

pkg/conference/processor.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,11 @@ func (c *Conference) processMessages() {
2626
if len(c.participants) == 0 {
2727
c.logger.Info("No more participants, stopping the conference")
2828
// Close the channel so that the sender can't push any messages.
29-
c.matrixMessages.Close()
30-
31-
// Let's read remaining messages from the channel (otherwise the caller will be
32-
// blocked in case of unbuffered channels). We must read **all** pending messages.
33-
messages := make([]MatrixMessage, 0)
34-
for {
35-
msg, ok := <-c.matrixMessages.Channel
36-
if !ok {
37-
break
38-
}
39-
messages = append(messages, msg)
40-
}
29+
unreadMessages := c.matrixMessages.Close()
4130

4231
// Send the information that we ended to the owner and pass the message
4332
// that we did not process (so that we don't drop it silently).
44-
c.endNotifier.Notify(messages)
33+
c.endNotifier.Notify(unreadMessages)
4534
return
4635
}
4736
}

pkg/conference/start.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func StartConference(
4343
matrixMessages: receiver,
4444
endNotifier: conferenceEndNotifier,
4545
participants: make(map[ParticipantID]*Participant),
46-
peerMessages: make(chan common.Message[ParticipantID, peer.MessageContent], 128),
46+
peerMessages: make(chan common.Message[ParticipantID, peer.MessageContent], common.UnboundedChannelSize),
4747
logger: logrus.WithFields(logrus.Fields{"conf_id": confID}),
4848
}
4949

pkg/router.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func newRouter(matrix *signaling.MatrixClient, config conf.Config) chan<- Router
4545
matrix: matrix,
4646
conferenceSinks: make(map[string]*common.Sender[conf.MatrixMessage]),
4747
config: config,
48-
channel: make(chan RouterMessage, 128),
48+
channel: make(chan RouterMessage, common.UnboundedChannelSize),
4949
}
5050

5151
// Start the main loop of the Router.

0 commit comments

Comments
 (0)