Skip to content

Commit 88d7a6b

Browse files
committed
use rt mpsc channel
1 parent 7fd3a4b commit 88d7a6b

File tree

4 files changed

+17
-14
lines changed

4 files changed

+17
-14
lines changed

pulsebeam-runtime/src/sync/mpsc.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ struct Slot<T> {
2525

2626
#[derive(Debug)]
2727
struct Ring<T> {
28+
// This is mspc, but we expect very low contention on the producers.
29+
// Mutex is generally cheaper than RWLock. So, no reason to pay
30+
// RWLock overhead.
2831
slots: Vec<Mutex<Slot<T>>>, // per-slot mutex
2932
mask: usize,
3033
head: AtomicU64, // next free sequence number

pulsebeam/src/gateway/actor.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use crate::gateway::demux::Demuxer;
22
use pulsebeam_runtime::actor::{ActorKind, ActorStatus, RunnerConfig};
33
use pulsebeam_runtime::prelude::*;
4-
use pulsebeam_runtime::{actor, mailbox, net};
4+
use pulsebeam_runtime::{actor, net};
55
use std::{io, sync::Arc};
66
use tokio::task::JoinSet;
77

88
#[derive(Clone)]
99
pub enum GatewayControlMessage {
10-
AddParticipant(String, mailbox::Sender<net::RecvPacketBatch>),
10+
AddParticipant(
11+
String,
12+
pulsebeam_runtime::sync::mpsc::Sender<net::RecvPacketBatch>,
13+
),
1114
RemoveParticipant(String),
1215
}
1316

@@ -154,7 +157,8 @@ impl GatewayWorkerActor {
154157
}
155158

156159
async fn read_socket(&mut self) -> io::Result<()> {
157-
const COOP_BUDGET: usize = 128;
160+
// ~1,000 yields per second = ~99% CPU for other work
161+
const COOP_BUDGET: usize = 256;
158162
let mut spent_budget: usize = 0;
159163

160164
loop {
@@ -171,7 +175,7 @@ impl GatewayWorkerActor {
171175
};
172176

173177
let src = batch.src;
174-
if !self.demuxer.demux(&mut self.socket, batch).await {
178+
if !self.demuxer.demux(&mut self.socket, batch) {
175179
// In case there's a malicious actor, close immediately as there's no
176180
// associated participant.
177181
self.socket.close_peer(&src);

pulsebeam/src/gateway/demux.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
use pulsebeam_runtime::net;
12
use pulsebeam_runtime::net::UnifiedSocketReader;
2-
use pulsebeam_runtime::{mailbox, net};
33

44
use crate::gateway::ice;
55
use std::collections::HashMap;
66
use std::net::SocketAddr;
77

8-
pub type ParticipantHandle = mailbox::Sender<net::RecvPacketBatch>;
8+
pub type ParticipantHandle = pulsebeam_runtime::sync::mpsc::Sender<net::RecvPacketBatch>;
99

1010
/// A UDP demuxer that maps packets to participants based on source address and STUN ufrag.
1111
///
@@ -57,11 +57,7 @@ impl Demuxer {
5757

5858
/// Routes a packet to the correct participant.
5959
/// Returns `true` if sent, `false` if dropped
60-
pub async fn demux(
61-
&mut self,
62-
socket: &mut UnifiedSocketReader,
63-
batch: net::RecvPacketBatch,
64-
) -> bool {
60+
pub fn demux(&mut self, socket: &mut UnifiedSocketReader, batch: net::RecvPacketBatch) -> bool {
6561
let src = batch.src;
6662

6763
let handle = if let Some(h) = self.addr_map.get_mut(&src) {
@@ -83,7 +79,7 @@ impl Demuxer {
8379
return false;
8480
};
8581

86-
if let Err(_) = handle.send(batch).await {
82+
if let Err(_) = handle.try_send(batch) {
8783
// Handle is closed! Clean up everything related to this participant.
8884
if let Some(ufrag) = self.addr_to_ufrag.get(&src).cloned() {
8985
tracing::info!("Participant handle closed, cleaning up ufrag: {:?}", ufrag);

pulsebeam/src/participant/actor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl actor::Actor<ParticipantMessageSet> for ParticipantActor {
7070
ctx: &mut actor::ActorContext<ParticipantMessageSet>,
7171
) -> Result<(), actor::ActorError> {
7272
let ufrag = self.core.rtc.direct_api().local_ice_credentials().ufrag;
73-
let (gateway_tx, mut gateway_rx) = mailbox::new(64);
73+
let (gateway_tx, mut gateway_rx) = pulsebeam_runtime::sync::mpsc::channel(64);
7474
let room_handle = self.room_handle.clone();
7575

7676
let _ = self
@@ -159,7 +159,7 @@ impl actor::Actor<ParticipantMessageSet> for ParticipantActor {
159159
},
160160

161161
// Priority 3: Ingress Work
162-
Some(batch) = gateway_rx.recv() => {
162+
Ok(batch) = gateway_rx.recv() => {
163163
maybe_deadline = self.core.handle_udp_packet_batch(batch);
164164
},
165165

0 commit comments

Comments
 (0)