Skip to content

Commit 7f381af

Browse files
committed
Remove futures crate
Squashed commit of the following: commit eb8cabbf7aa3aa09cffdf8d5d5e0d1e56994cacd Author: Lukas Herman <[email protected]> Date: Fri Dec 12 15:41:19 2025 -0500 compilable commit 8207e71 Author: Lukas Herman <[email protected]> Date: Fri Dec 12 14:39:21 2025 -0500 init futures removal
1 parent 4bbc406 commit 7f381af

File tree

21 files changed

+159
-471
lines changed

21 files changed

+159
-471
lines changed

Cargo.lock

Lines changed: 3 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ serde = { version = "1", features = ["derive"]}
3232
thiserror = "2.0"
3333
tokio = { version = "1", features = ["full"] }
3434
tracing = { version = "0.1", features = ["release_max_level_debug"] }
35-
futures = "0.3.31"
35+
futures-lite = "2.6.1"
3636
rand = "0.9"
3737
bytes = "1.10.1"
3838
str0m = { git = "https://github.com/PulseBeamDev/str0m.git", branch = "patch/main", default-features = false, features = ["openssl", "vendored", "sha1"] }

pulsebeam-agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ ffmpeg-static = ["ffmpeg-next/build"]
1919
ffmpeg-dynamic = []
2020

2121
[dependencies]
22-
futures = { workspace = true }
22+
futures-lite = { workspace = true }
2323
thiserror = { workspace = true }
2424
tracing = { workspace = true }
2525
str0m = { workspace = true }

pulsebeam-runtime/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ tokio = { workspace = true }
2525
tracing = { workspace = true }
2626
rand = { workspace = true }
2727
bytes = { workspace = true }
28-
futures.workspace = true
28+
futures-lite = { workspace = true }
2929
turmoil = "0.6.6"
3030
quinn-udp = "0.5.14"
3131
socket2 = "0.6.0"

pulsebeam-runtime/benches/sync.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use criterion::{Criterion, criterion_group, criterion_main};
2-
use futures::{StreamExt, future::join_all};
32
use futures_concurrency::stream::Merge;
3+
use futures_lite::StreamExt;
44
use rand::seq::index;
55
use std::time::{Duration, Instant};
6-
use tokio::runtime::Runtime;
76
use tokio::sync::mpsc;
87
use tokio::task;
8+
use tokio::{runtime::Runtime, task::JoinSet};
99

1010
// Use the specified import path for the SPMC channel implementation.
1111
use pulsebeam_runtime::sync::spmc::{RecvError, Sender, channel};
@@ -111,7 +111,7 @@ async fn run_interactive_room_mesh_mpsc_fanout_test() {
111111
}
112112

113113
let simulation_start = Instant::now();
114-
join_all(publisher_tasks).await;
114+
JoinSet::from_iter(publisher_tasks).join_all().await;
115115
let total_send_duration = simulation_start.elapsed();
116116

117117
let all_latencies = aggregate_latencies(subscriber_tasks).await;
@@ -210,7 +210,7 @@ async fn run_interactive_room_mesh_spawn_test() {
210210
}
211211

212212
let simulation_start = Instant::now();
213-
join_all(publisher_tasks).await;
213+
JoinSet::from_iter(publisher_tasks).join_all().await;
214214
let total_send_duration = simulation_start.elapsed();
215215

216216
let all_latencies = aggregate_latencies(subscriber_tasks).await;
@@ -298,7 +298,7 @@ async fn run_interactive_room_mesh_futures_unordered_test() {
298298
}
299299

300300
let simulation_start = Instant::now();
301-
join_all(publisher_tasks).await;
301+
JoinSet::from_iter(publisher_tasks).join_all().await;
302302
let total_send_duration = simulation_start.elapsed();
303303

304304
let all_latencies = aggregate_latencies(subscriber_tasks).await;
@@ -416,12 +416,11 @@ async fn create_publisher_load_mpsc(tx: mpsc::Sender<(usize, Instant)>, num_pack
416416
}
417417

418418
async fn aggregate_latencies(handles: Vec<task::JoinHandle<Vec<Duration>>>) -> Vec<Duration> {
419-
let results = join_all(handles).await;
419+
let results = JoinSet::from_iter(handles).join_all().await;
420+
let results = results.iter().flatten();
420421
let mut all_latencies = Vec::new();
421422
for result in results {
422-
if let Ok(subscriber_latencies) = result {
423-
all_latencies.extend(subscriber_latencies);
424-
}
423+
all_latencies.extend(result);
425424
}
426425
all_latencies
427426
}

pulsebeam-runtime/src/actor.rs

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use crate::actor_loop;
2-
use futures::FutureExt;
2+
use futures_lite::FutureExt;
33
use std::any::Any;
44
use std::cell::RefCell;
55
use std::collections::HashMap;
66
use std::fmt::{Debug, Display, Formatter};
7+
use std::future::Future;
78
use std::hash::Hash;
89
use std::panic::AssertUnwindSafe;
910
use std::pin::Pin;
@@ -25,7 +26,7 @@ tokio::task_local! {
2526
}
2627

2728
pub struct JoinHandle<M: MessageSet> {
28-
inner: Pin<Box<dyn futures::Future<Output = (M::Meta, ActorStatus)> + Send>>,
29+
inner: Pin<Box<dyn Future<Output = (M::Meta, ActorStatus)> + Send>>,
2930
abort_handle: tokio::task::AbortHandle,
3031
}
3132

@@ -41,7 +42,7 @@ impl<M: MessageSet> Drop for JoinHandle<M> {
4142
}
4243
}
4344

44-
impl<M: MessageSet> futures::Future for JoinHandle<M> {
45+
impl<M: MessageSet> Future for JoinHandle<M> {
4546
type Output = (M::Meta, ActorStatus);
4647

4748
fn poll(
@@ -117,7 +118,7 @@ pub trait Actor<M: MessageSet>: Sized + Send + 'static {
117118
fn run(
118119
&mut self,
119120
ctx: &mut ActorContext<M>,
120-
) -> impl futures::Future<Output = Result<(), ActorError>> + Send {
121+
) -> impl Future<Output = Result<(), ActorError>> + Send {
121122
async {
122123
actor_loop!(self, ctx);
123124
Ok(())
@@ -128,15 +129,15 @@ pub trait Actor<M: MessageSet>: Sized + Send + 'static {
128129
&mut self,
129130
_ctx: &mut ActorContext<M>,
130131
_msg: SystemMsg<M::ObservableState>,
131-
) -> impl futures::Future<Output = ()> + Send {
132+
) -> impl Future<Output = ()> + Send {
132133
async move {}
133134
}
134135

135136
fn on_msg(
136137
&mut self,
137138
_ctx: &mut ActorContext<M>,
138139
_msg: M::Msg,
139-
) -> impl futures::Future<Output = ()> + Send {
140+
) -> impl Future<Output = ()> + Send {
140141
async move {}
141142
}
142143
}
@@ -254,40 +255,12 @@ where
254255
let runnable = tracing::Instrument::instrument(runnable, span);
255256

256257
let actor_id = handle.meta.clone();
257-
let fut = runnable.map(move |status| (actor_id, status));
258-
259-
(handle, fut)
260-
}
261-
262-
pub fn spawn<A, M>(a: A, config: RunnerConfig) -> (ActorHandle<M>, JoinHandle<M>)
263-
where
264-
M: MessageSet,
265-
A: Actor<M>,
266-
{
267-
let (handle, runnable) = prepare(a, config);
268-
let join = tokio::spawn(runnable);
269-
let abort_handle = join.abort_handle();
270-
271-
let actor_id = handle.meta.clone();
272-
let join = join
273-
.map(|res| res.unwrap_or((actor_id, ActorStatus::ShutDown)))
274-
.boxed();
275-
276-
(
277-
handle,
278-
JoinHandle {
279-
inner: join,
280-
abort_handle,
281-
},
282-
)
283-
}
258+
let runnable = async move {
259+
let status = runnable.await;
260+
(actor_id, status)
261+
};
284262

285-
pub fn spawn_default<A, M>(a: A) -> (ActorHandle<M>, JoinHandle<M>)
286-
where
287-
M: MessageSet,
288-
A: Actor<M>,
289-
{
290-
spawn(a, RunnerConfig::default())
263+
(handle, runnable)
291264
}
292265

293266
async fn run<A, M>(mut a: A, mut ctx: ActorContext<M>) -> ActorStatus

pulsebeam-runtime/src/prelude.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
pub use crate::actor::{Actor, ActorHandle};
22
pub use crate::rand::{RngCore, SeedableRng};
3-
pub use futures::{FutureExt, StreamExt};
3+
pub use futures_lite::{FutureExt, StreamExt};
44
pub use once_cell::sync::Lazy;
55
pub use tokio_metrics::TaskMonitor;

pulsebeam-runtime/src/sync/spmc.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
use crossbeam_utils::CachePadded;
22
use event_listener::{Event, EventListener};
3+
use futures_lite::Stream;
34
use parking_lot::RwLock;
45
use std::future::Future;
56
use std::pin::Pin;
67
use std::sync::Arc;
78
use std::sync::atomic::{AtomicU64, Ordering};
8-
use std::task::{Context, Poll};
9+
use std::task::{Context, Poll, ready};
910

1011
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1112
pub enum RecvError {
1213
Lagged(u64),
1314
Closed,
1415
}
1516

17+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18+
pub enum StreamRecvError {
19+
Lagged(u64),
20+
}
21+
1622
#[derive(Debug)]
1723
struct Slot<T> {
1824
seq: u64,
@@ -171,6 +177,21 @@ impl<T: Clone> Receiver<T> {
171177
}
172178
}
173179

180+
impl<T: Clone> Stream for Receiver<T> {
181+
type Item = Result<T, StreamRecvError>;
182+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
183+
let this = self.get_mut();
184+
185+
let res = match ready!(this.poll_recv(cx)) {
186+
Ok(item) => Some(Ok(item)),
187+
Err(RecvError::Lagged(n)) => Some(Err(StreamRecvError::Lagged(n))),
188+
Err(RecvError::Closed) => None,
189+
};
190+
191+
Poll::Ready(res)
192+
}
193+
}
194+
174195
impl<T: Clone> Clone for Receiver<T> {
175196
fn clone(&self) -> Self {
176197
Self {

pulsebeam/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ license.workspace = true
1111

1212
[dependencies]
1313
anyhow = { workspace = true }
14-
futures = { workspace = true }
14+
futures-lite = { workspace = true }
1515
serde = { workspace = true }
1616
thiserror = { workspace = true }
1717
tokio = { workspace = true }

pulsebeam/src/controller.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ use crate::{
44
entity::{ParticipantId, RoomId},
55
node, room,
66
};
7-
use futures::stream::FuturesUnordered;
8-
use pulsebeam_runtime::actor::{self, ActorKind};
7+
use pulsebeam_runtime::actor::{self, ActorKind, ActorStatus};
98
use pulsebeam_runtime::prelude::*;
109
use str0m::{Candidate, RtcConfig, RtcError, change::SdpOffer, error::SdpError};
11-
use tokio::sync::oneshot;
10+
use tokio::{sync::oneshot, task::JoinSet};
1211

1312
#[derive(thiserror::Error, Debug)]
1413
pub enum ControllerError {
@@ -54,7 +53,7 @@ pub struct ControllerActor {
5453
local_addrs: Vec<SocketAddr>,
5554

5655
rooms: HashMap<Arc<RoomId>, room::RoomHandle>,
57-
room_tasks: FuturesUnordered<actor::JoinHandle<room::RoomMessageSet>>,
56+
room_tasks: JoinSet<(Arc<RoomId>, ActorStatus)>,
5857
}
5958

6059
impl actor::Actor<ControllerMessageSet> for ControllerActor {
@@ -79,7 +78,7 @@ impl actor::Actor<ControllerMessageSet> for ControllerActor {
7978
) -> Result<(), actor::ActorError> {
8079
pulsebeam_runtime::actor_loop!(self, ctx, pre_select:{} ,
8180
select: {
82-
Some((room_id, _)) = self.room_tasks.next() => {
81+
Some(Ok((room_id, _))) = self.room_tasks.join_next() => {
8382
self.rooms.remove(&room_id);
8483
}
8584
}
@@ -203,11 +202,11 @@ impl ControllerActor {
203202
} else {
204203
tracing::info!("create_room: {}", room_id);
205204
let room_actor = room::RoomActor::new(self.node_ctx.clone(), room_id.clone());
206-
let (room_handle, room_join) = actor::spawn(
205+
let (room_handle, room_task) = actor::prepare(
207206
room_actor,
208207
actor::RunnerConfig::default().with_mailbox_cap(1024),
209208
);
210-
self.room_tasks.push(room_join);
209+
self.room_tasks.spawn(room_task);
211210

212211
self.rooms.insert(room_id.clone(), room_handle.clone());
213212

@@ -227,7 +226,7 @@ impl ControllerActor {
227226
node_ctx: system_ctx,
228227
local_addrs,
229228
rooms: HashMap::new(),
230-
room_tasks: FuturesUnordered::new(),
229+
room_tasks: JoinSet::new(),
231230
}
232231
}
233232
}

0 commit comments

Comments
 (0)