Skip to content

Commit eaabfe6

Browse files
committed
use futures-channel for mailbox
1 parent ca83671 commit eaabfe6

File tree

10 files changed

+78
-83
lines changed

10 files changed

+78
-83
lines changed

Makefile

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,38 @@
11
TARGET_DIR = target/profiling
22
BINARY = $(TARGET_DIR)/pulsebeam
33

4-
# Rust and perf settings
4+
# Rust settings
55
CARGO = cargo
66
RUSTFLAGS = -C force-frame-pointers=yes
7-
PERF = perf
8-
PERF_RECORD_FLAGS = -F 4999 --call-graph dwarf
97

10-
# Default: Build and capture perf data
8+
# Default: Build and capture flamegraph
119
.PHONY: all
12-
all: build profile
10+
all: build flamegraph
1311

1412
# Build with profiling profile
1513
.PHONY: build
1614
build:
1715
$(CARGO) build --profile profiling
1816

19-
# Capture perf data
20-
.PHONY: profile
21-
profile: $(BINARY)
22-
sudo $(PERF) record $(PERF_RECORD_FLAGS) -- $(BINARY)
17+
# Capture flamegraph
18+
.PHONY: flamegraph
19+
flamegraph: build
20+
taskset -c 2-5 cargo flamegraph --profile profiling -p pulsebeam --bin pulsebeam
2321

2422
brew-deps:
2523
brew install git-cliff axodotdev/tap/cargo-dist
2624

2725
cargo-deps:
2826
cargo install cargo-smart-release --features allow-emoji
27+
cargo install flamegraph
2928

3029
release:
3130
cargo smart-release --execute
3231

33-
# Clean build artifacts and perf data
32+
# Clean build artifacts and flamegraph data
3433
.PHONY: clean
3534
clean:
3635
$(CARGO) clean
3736
rm -f perf.data
37+
rm -f flamegraph.svg
38+

pulsebeam-runtime/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ tokio = { workspace = true }
1919
tracing = { workspace = true }
2020
rand = { workspace = true }
2121
bytes = { workspace = true }
22+
futures.workspace = true
2223
tokio-rustls = "0.26.2"
2324
rustls = "0.23.31"
2425
turmoil = "0.6.6"
25-
futures.workspace = true
2626
quinn-udp = "0.5.14"
2727
socket2 = "0.6.0"
2828

pulsebeam-runtime/src/actor.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,9 @@ impl<A: Actor> ActorHandle<A> {
214214
/// Returns an error if the actor's mailbox is closed.
215215
#[inline]
216216
pub async fn send_high(
217-
&self,
217+
&mut self,
218218
message: A::HighPriorityMsg,
219-
) -> Result<(), mailbox::SendError<A::HighPriorityMsg>> {
219+
) -> Result<(), mailbox::SendError> {
220220
self.hi_tx.send(message).await
221221
}
222222

@@ -225,7 +225,7 @@ impl<A: Actor> ActorHandle<A> {
225225
/// Returns an error if the mailbox is full or closed.
226226
#[inline]
227227
pub fn try_send_high(
228-
&self,
228+
&mut self,
229229
message: A::HighPriorityMsg,
230230
) -> Result<(), mailbox::TrySendError<A::HighPriorityMsg>> {
231231
self.hi_tx.try_send(message)
@@ -235,10 +235,7 @@ impl<A: Actor> ActorHandle<A> {
235235
///
236236
/// Returns an error if the actor's mailbox is closed.
237237
#[inline]
238-
pub async fn send_low(
239-
&self,
240-
message: A::LowPriorityMsg,
241-
) -> Result<(), mailbox::SendError<A::LowPriorityMsg>> {
238+
pub async fn send_low(&mut self, message: A::LowPriorityMsg) -> Result<(), mailbox::SendError> {
242239
self.lo_tx.send(message).await
243240
}
244241

@@ -247,13 +244,13 @@ impl<A: Actor> ActorHandle<A> {
247244
/// Returns an error if the mailbox is full or closed.
248245
#[inline]
249246
pub fn try_send_low(
250-
&self,
247+
&mut self,
251248
message: A::LowPriorityMsg,
252249
) -> Result<(), mailbox::TrySendError<A::LowPriorityMsg>> {
253250
self.lo_tx.try_send(message)
254251
}
255252

256-
pub async fn get_state(&self) -> Result<A::ObservableState, ActorError> {
253+
pub async fn get_state(&mut self) -> Result<A::ObservableState, ActorError> {
257254
let (tx, rx) = tokio::sync::oneshot::channel();
258255
let msg = SystemMsg::GetState(tx);
259256

@@ -267,7 +264,7 @@ impl<A: Actor> ActorHandle<A> {
267264
rx.await.map_err(|_| ActorError::SystemError)
268265
}
269266

270-
pub async fn terminate(&self) -> Result<(), ActorError> {
267+
pub async fn terminate(&mut self) -> Result<(), ActorError> {
271268
self.sys_tx
272269
.send(SystemMsg::Terminate)
273270
.await

pulsebeam-runtime/src/mailbox.rs

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
1+
use futures::channel::mpsc;
2+
use futures::{SinkExt, StreamExt};
13
use std::error::Error;
24
use std::fmt;
3-
use tokio::sync::mpsc;
45

56
/// An error returned when sending on a closed mailbox.
67
///
78
/// This error is returned by the asynchronous `send` method. It contains
89
/// the message that could not be sent.
910
#[derive(Debug, PartialEq, Eq)]
10-
pub struct SendError<T>(pub T);
11+
pub enum SendError {
12+
Closed,
13+
}
1114

12-
impl<T> fmt::Display for SendError<T> {
15+
impl fmt::Display for SendError {
1316
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1417
write!(f, "sending on a closed mailbox")
1518
}
1619
}
1720

18-
impl<T: fmt::Debug> Error for SendError<T> {}
19-
2021
/// An error returned from the `try_send` method on a `Sender`.
2122
#[derive(Debug, PartialEq, Eq)]
2223
pub enum TrySendError<T> {
@@ -37,14 +38,6 @@ impl<T> fmt::Display for TrySendError<T> {
3738

3839
impl<T: fmt::Debug> Error for TrySendError<T> {}
3940

40-
// For convenience, we can convert a SendError into a TrySendError,
41-
// as a 'Closed' state is a subset of 'TrySend' failures.
42-
impl<T> From<SendError<T>> for TrySendError<T> {
43-
fn from(err: SendError<T>) -> Self {
44-
TrySendError::Closed(err.0)
45-
}
46-
}
47-
4841
/// A handle to send messages to an actor's mailbox.
4942
pub struct Sender<T> {
5043
sender: mpsc::Sender<T>,
@@ -62,22 +55,25 @@ impl<T> Sender<T> {
6255
/// Sends a message asynchronously, waiting if the mailbox is full.
6356
///
6457
/// Returns a `SendError` only if the receiving actor has terminated.
65-
pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
66-
self.sender.send(message).await.map_err(|e| SendError(e.0))
58+
pub async fn send(&mut self, message: T) -> Result<(), SendError> {
59+
self.sender
60+
.send(message)
61+
.await
62+
.map_err(|_e| SendError::Closed)
6763
}
6864

6965
/// Attempts to immediately send a message.
7066
///
7167
/// Returns a `TrySendError` if the mailbox is full or if the
7268
/// receiving actor has terminated.
73-
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
74-
use mpsc::error::TrySendError as TokioTrySendError;
75-
self.sender.try_send(message).map_err(|e| match e {
76-
TokioTrySendError::Full(m) => {
69+
pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
70+
self.sender.try_send(message).map_err(|e| {
71+
if e.is_disconnected() {
72+
TrySendError::Closed(e.into_inner())
73+
} else {
7774
tracing::info!("try_send dropped a packet due to full queue");
78-
TrySendError::Full(m)
75+
TrySendError::Full(e.into_inner())
7976
}
80-
TokioTrySendError::Closed(m) => TrySendError::Closed(m),
8177
})
8278
}
8379
}
@@ -90,7 +86,7 @@ pub struct Receiver<T> {
9086
impl<T> Receiver<T> {
9187
/// Receives the next message from the mailbox.
9288
pub async fn recv(&mut self) -> Option<T> {
93-
self.receiver.recv().await
89+
self.receiver.next().await
9490
}
9591
}
9692

@@ -107,7 +103,7 @@ mod tests {
107103

108104
#[tokio::test]
109105
async fn send_and_recv_single_message() {
110-
let (sender, mut mailbox) = mailbox::new(10);
106+
let (mut sender, mut mailbox) = mailbox::new(10);
111107
let message = "hello".to_string();
112108

113109
sender.send(message.clone()).await.unwrap();
@@ -118,7 +114,7 @@ mod tests {
118114

119115
#[tokio::test]
120116
async fn recv_returns_none_when_all_senders_are_dropped() {
121-
let (sender, mut mailbox) = mailbox::new::<i32>(10);
117+
let (mut sender, mut mailbox) = mailbox::new::<i32>(10);
122118
sender.send(1).await.unwrap();
123119
sender.send(2).await.unwrap();
124120

@@ -135,7 +131,7 @@ mod tests {
135131

136132
#[tokio::test]
137133
async fn async_send_fails_when_receiver_is_dropped() {
138-
let (sender, mailbox) = mailbox::new::<String>(10);
134+
let (mut sender, mailbox) = mailbox::new::<String>(10);
139135

140136
// Drop the mailbox immediately
141137
drop(mailbox);
@@ -144,27 +140,27 @@ mod tests {
144140
assert!(result.is_err());
145141

146142
// Check that the error is our custom SendError and we can get the message back
147-
if let Err(mailbox::SendError(msg)) = result {
148-
assert_eq!(msg, "should fail");
143+
if let Err(mailbox::SendError::Closed) = result {
149144
} else {
150145
panic!("Expected a SendError");
151146
}
152147
}
153148

154149
#[test]
155150
fn try_send_success_on_capacity() {
156-
let (sender, _mailbox) = mailbox::new::<i32>(1);
151+
let (mut sender, _mailbox) = mailbox::new::<i32>(1);
157152
let result = sender.try_send(123);
158153
assert!(result.is_ok());
159154
}
160155

161156
#[test]
162157
fn try_send_fails_when_full() {
163158
// Create a mailbox with a buffer of 1
164-
let (sender, _mailbox) = mailbox::new::<i32>(1);
159+
let (mut sender, _mailbox) = mailbox::new::<i32>(1);
165160

166161
// The first send succeeds and fills the buffer
167162
sender.try_send(1).unwrap();
163+
sender.try_send(1).unwrap();
168164

169165
// The second send should fail because the buffer is full
170166
let result = sender.try_send(2);
@@ -179,7 +175,7 @@ mod tests {
179175

180176
#[test]
181177
fn try_send_fails_when_receiver_is_dropped() {
182-
let (sender, mailbox) = mailbox::new::<i32>(1);
178+
let (mut sender, mailbox) = mailbox::new::<i32>(1);
183179

184180
// Drop the receiver
185181
drop(mailbox);
@@ -196,14 +192,14 @@ mod tests {
196192

197193
#[tokio::test]
198194
async fn async_send_waits_when_full() {
199-
let (sender, mut mailbox) = mailbox::new::<i32>(1);
195+
let (mut sender, mut mailbox) = mailbox::new::<i32>(1);
200196

201197
// Fill the buffer
202198
sender.send(1).await.unwrap();
203199

204200
// This send should wait. We spawn it in a separate task.
205201
let send_task = tokio::spawn({
206-
let sender = sender.clone();
202+
let mut sender = sender.clone();
207203
async move {
208204
sender.send(2).await.unwrap();
209205
}
@@ -227,8 +223,8 @@ mod tests {
227223

228224
#[tokio::test]
229225
async fn cloned_sender_works_and_channel_stays_open() {
230-
let (sender1, mut mailbox) = mailbox::new::<i32>(10);
231-
let sender2 = sender1.clone();
226+
let (mut sender1, mut mailbox) = mailbox::new::<i32>(10);
227+
let mut sender2 = sender1.clone();
232228

233229
sender1.send(1).await.unwrap();
234230
sender2.send(2).await.unwrap();

pulsebeam/src/controller.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl actor::Actor for ControllerActor {
8383
}
8484

8585
ControllerMessage::RemoveParticipant(room_id, participant_id) => {
86-
if let Some(room_handle) = self.rooms.get(&room_id) {
86+
if let Some(room_handle) = self.rooms.get_mut(&room_id) {
8787
// if the room has exited, the participants have already cleaned up too.
8888
let _ = room_handle
8989
.send_high(room::RoomMessage::RemoveParticipant(participant_id))
@@ -141,7 +141,7 @@ impl ControllerActor {
141141
.accept_offer(offer)
142142
.map_err(ControllerError::OfferRejected)?;
143143

144-
let room_handle = self.get_or_create_room(room_id);
144+
let mut room_handle = self.get_or_create_room(room_id);
145145

146146
// TODO: probably retry? Or, let the client to retry instead?
147147
// Each room will always have a graceful timeout before closing.

pulsebeam/src/gateway.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ impl GatewayActor {
127127

128128
tracing::trace!("received {count} packets from socket");
129129
for packet in self.recv_batch.drain(..) {
130-
let participant_handle = if let Some(participant_handle) = self.mapping.get(&packet.src)
130+
let mut participant_handle = if let Some(participant_handle) =
131+
self.mapping.get(&packet.src)
131132
{
132133
tracing::trace!(
133134
"found connection from mapping: {} -> {:?}",

pulsebeam/src/participant.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -487,12 +487,12 @@ impl ParticipantActor {
487487
}
488488
}
489489
Event::MediaData(e) => {
490-
if let Some(track) = self.published_video_tracks.get(&e.mid) {
490+
if let Some(track) = self.published_video_tracks.get_mut(&e.mid) {
491491
tracing::debug!("forwarded media: participant -> track, {track:?}");
492492
let _ = track
493493
.send_low(track::TrackDataMessage::ForwardMedia(Arc::new(e)))
494494
.await;
495-
} else if let Some(track) = self.published_audio_tracks.get(&e.mid) {
495+
} else if let Some(track) = self.published_audio_tracks.get_mut(&e.mid) {
496496
let _ = track
497497
.send_low(track::TrackDataMessage::ForwardMedia(Arc::new(e)))
498498
.await;
@@ -515,7 +515,7 @@ impl ParticipantActor {
515515
return;
516516
};
517517

518-
let Some(track) = self.available_video_tracks.get(&track_id.internal) else {
518+
let Some(track) = self.available_video_tracks.get_mut(&track_id.internal) else {
519519
return;
520520
};
521521

0 commit comments

Comments
 (0)