Skip to content

Commit 4bbc406

Browse files
committed
use join set instead of futuresunordered
1 parent bd3d738 commit 4bbc406

File tree

3 files changed

+7
-6
lines changed

3 files changed

+7
-6
lines changed

pulsebeam/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ fn main() {
2929
let rt = tokio::runtime::Builder::new_multi_thread()
3030
.enable_all()
3131
.worker_threads(workers)
32+
.disable_lifo_slot()
3233
.build()
3334
.unwrap();
3435
// let rt = tokio::runtime::LocalRuntime::new().unwrap();

pulsebeam/src/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub async fn run(
7777
let (gateway, gateway_join) = actor::spawn_default(gateway::GatewayActor::new(sockets.clone()));
7878
join_set.push(gateway_join.map(|_| ()).boxed());
7979

80-
let shard_count = 16 * workers;
80+
let shard_count = 2 * workers;
8181
let mut shard_handles = Vec::with_capacity(shard_count);
8282

8383
for i in 0..shard_count {

pulsebeam/src/shard.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::pin::Pin;
22

3-
use futures::{StreamExt, stream::FuturesUnordered};
43
use pulsebeam_runtime::{actor, sync::Arc};
4+
use tokio::task::JoinSet;
55

66
pub type ShardTask = Pin<Box<dyn futures::Future<Output = ()> + Send>>;
77

@@ -11,7 +11,7 @@ pub enum ShardMessage {
1111

1212
pub struct ShardActor {
1313
shard_id: usize,
14-
tasks: FuturesUnordered<ShardTask>,
14+
tasks: JoinSet<()>,
1515
}
1616

1717
pub struct ShardMessageSet;
@@ -48,7 +48,7 @@ impl actor::Actor<ShardMessageSet> for ShardActor {
4848
pulsebeam_runtime::actor_loop!(self, _ctx,
4949
pre_select: {},
5050
select: {
51-
Some(_) = self.tasks.next() => {
51+
Some(_) = self.tasks.join_next() => {
5252
// task completed, shard does nothing else
5353
}
5454
}
@@ -59,7 +59,7 @@ impl actor::Actor<ShardMessageSet> for ShardActor {
5959

6060
async fn on_msg(&mut self, _ctx: &mut actor::ActorContext<ShardMessageSet>, msg: ShardMessage) {
6161
let ShardMessage::AddTask(task) = msg;
62-
self.tasks.push(task);
62+
self.tasks.spawn(task);
6363
metrics::counter!("shard_task_count", "shard_id" => self.shard_id.to_string())
6464
.absolute(self.tasks.len() as u64);
6565
}
@@ -69,7 +69,7 @@ impl ShardActor {
6969
pub fn new(shard_id: usize) -> Self {
7070
Self {
7171
shard_id,
72-
tasks: FuturesUnordered::new(),
72+
tasks: JoinSet::new(),
7373
}
7474
}
7575
}

0 commit comments

Comments
 (0)