Skip to content

Commit d3b837c

Browse files
committed
only poll when necessary
1 parent b45e65e commit d3b837c

File tree

3 files changed

+36
-20
lines changed

3 files changed

+36
-20
lines changed

pulsebeam/src/participant/actor.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
use std::pin::Pin;
12
use std::time::Duration;
23
use std::{collections::HashMap, sync::Arc};
34

45
use pulsebeam_runtime::actor::ActorKind;
56
use pulsebeam_runtime::prelude::*;
67
use pulsebeam_runtime::{actor, mailbox, net};
78
use str0m::{Rtc, RtcError, error::SdpError};
8-
use tokio::time::Instant;
9+
use tokio::time::{Instant, Sleep};
910
use tokio_metrics::TaskMonitor;
1011

1112
use crate::participant::core::{CoreEvent, ParticipantCore};
@@ -81,16 +82,19 @@ impl actor::Actor<ParticipantMessageSet> for ParticipantActor {
8182
.await;
8283
let mut stats_interval = tokio::time::interval(Duration::from_millis(200));
8384
let mut current_deadline = Instant::now() + Duration::from_secs(1);
85+
let mut new_deadline = Some(current_deadline);
86+
8487
let rtc_timer = tokio::time::sleep_until(current_deadline);
8588
tokio::pin!(rtc_timer);
8689

8790
loop {
88-
let Some(new_deadline) = self.core.poll_rtc() else {
89-
break;
90-
};
91-
if new_deadline != current_deadline {
92-
rtc_timer.as_mut().reset(new_deadline);
93-
current_deadline = new_deadline;
91+
match new_deadline {
92+
Some(new_deadline) if new_deadline != current_deadline => {
93+
rtc_timer.as_mut().reset(new_deadline);
94+
current_deadline = new_deadline;
95+
}
96+
None => break,
97+
_ => {}
9498
}
9599

96100
let events: Vec<_> = self.core.drain_events().collect();
@@ -99,7 +103,6 @@ impl actor::Actor<ParticipantMessageSet> for ParticipantActor {
99103
}
100104

101105
tokio::select! {
102-
biased;
103106
res = ctx.sys_rx.recv() => {
104107
match res {
105108
Some(msg) => match msg {
@@ -115,15 +118,22 @@ impl actor::Actor<ParticipantMessageSet> for ParticipantActor {
115118
Ok(_) = self.egress.writable(), if !self.core.batcher.is_empty() => {
116119
self.core.batcher.flush(&self.egress);
117120
},
118-
Some(batch) = gateway_rx.recv() => self.core.handle_udp_packet_batch(batch),
119121
Some((meta, pkt)) = self.core.downstream.next() => {
120122
self.core.handle_forward_rtp(meta, pkt);
123+
while let Some((mid, pkt)) = self.core.downstream.next().now_or_never().flatten() {
124+
self.core.handle_forward_rtp(mid, pkt);
125+
}
126+
127+
new_deadline = self.core.poll_rtc();
128+
},
129+
Some(batch) = gateway_rx.recv() => {
130+
new_deadline = self.core.handle_udp_packet_batch(batch);
121131
},
122132
now = stats_interval.tick() => {
123133
self.core.poll_stats(now);
124134
}
125135
_ = &mut rtc_timer => {
126-
self.core.handle_timeout();
136+
new_deadline = self.core.handle_timeout();
127137
},
128138
}
129139
}

pulsebeam/src/participant/core.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ impl ParticipantCore {
6767
self.events.drain(..)
6868
}
6969

70-
pub fn handle_udp_packet_batch(&mut self, batch: net::RecvPacketBatch) {
70+
pub fn handle_udp_packet_batch(&mut self, batch: net::RecvPacketBatch) -> Option<Instant> {
71+
let mut last_deadline = None;
7172
for pkt in batch.into_iter() {
7273
if let Ok(contents) = (*pkt).try_into() {
7374
let recv = str0m::net::Receive {
@@ -79,15 +80,18 @@ impl ParticipantCore {
7980
let _ = self
8081
.rtc
8182
.handle_input(Input::Receive(Instant::now().into(), recv));
82-
self.poll_rtc();
83+
last_deadline = self.poll_rtc();
8384
} else {
8485
tracing::warn!(src = %batch.src, "Dropping malformed UDP packet");
8586
}
8687
}
88+
89+
last_deadline
8790
}
8891

89-
pub fn handle_timeout(&mut self) {
92+
pub fn handle_timeout(&mut self) -> Option<Instant> {
9093
let _ = self.rtc.handle_input(Input::Timeout(Instant::now().into()));
94+
self.poll_rtc()
9195
}
9296

9397
pub fn handle_available_tracks(

pulsebeam/src/participant/downstream/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,20 @@ impl DownstreamAllocator {
6565
pub fn handle_keyframe_request(&mut self, req: KeyframeRequest) {
6666
self.video.handle_keyframe_request(req);
6767
}
68+
69+
pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<(Mid, RtpPacket)>> {
70+
if let Poll::Ready(item) = self.audio.poll_next(cx) {
71+
return Poll::Ready(item);
72+
}
73+
74+
self.video.poll_next(cx)
75+
}
6876
}
6977

7078
impl Stream for DownstreamAllocator {
7179
type Item = (Mid, RtpPacket);
7280

7381
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74-
let this = self.get_mut();
75-
76-
if let Poll::Ready(item) = this.audio.poll_next(cx) {
77-
return Poll::Ready(item);
78-
}
79-
80-
this.video.poll_next(cx)
82+
self.get_mut().poll_next_unpin(cx)
8183
}
8284
}

0 commit comments

Comments
 (0)