Skip to content

Commit 5624ae3

Browse files
committed
perf(acton-reactive): optimize broadcast with pre-allocated Vec and increased inbox capacity
- Replace FuturesUnordered with Vec::with_capacity() + join_all() for broadcast to enable pre-allocation and reduce memory churn - Increase default actor_inbox_capacity from 255 to 512 for improved throughput in high-volume messaging scenarios
1 parent c7edc77 commit 5624ae3

File tree

2 files changed

+17
-20
lines changed

2 files changed

+17
-20
lines changed

acton-reactive/src/common/broker.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121

2222
use acton_ern::Ern;
2323
use dashmap::DashMap;
24-
use futures::stream::{FuturesUnordered, StreamExt};
24+
use futures::future::join_all;
2525
use tracing::{instrument, trace};
2626

2727
use crate::actor::{ActorConfig, Idle, ManagedActor};
@@ -205,25 +205,22 @@ impl Broker {
205205
// BrokerRequestEnvelope contains Arc<dyn ActonMessage>, so cloning is cheap.
206206
let shared_envelope: BrokerRequestEnvelope = request.into();
207207

208-
// Use FuturesUnordered to execute sends in parallel rather than sequentially.
209-
// This allows the Tokio runtime to poll all futures concurrently.
210-
let mut futures: FuturesUnordered<_> = subscribers_set
211-
.value()
212-
.iter()
213-
.map(|(_, subscriber_handle)| {
214-
let handle = subscriber_handle.clone();
215-
let envelope_to_send = shared_envelope.clone();
216-
async move {
217-
trace!(subscriber = %handle.id(), message_type = ?message_type_id, "Sending broadcast");
218-
// Send the envelope to the subscriber's handle.
219-
// Ignore potential send errors (e.g., closed channel).
220-
handle.send(envelope_to_send).await;
221-
}
222-
})
223-
.collect();
208+
// Pre-allocate Vec with known capacity, then use join_all for parallel execution.
209+
// This avoids reallocations during iteration and executes all sends concurrently.
210+
let mut futures = Vec::with_capacity(num_subscribers);
211+
for (_, subscriber_handle) in subscribers_set.value() {
212+
let handle = subscriber_handle.clone();
213+
let envelope_to_send = shared_envelope.clone();
214+
futures.push(async move {
215+
trace!(subscriber = %handle.id(), message_type = ?message_type_id, "Sending broadcast");
216+
// Send the envelope to the subscriber's handle.
217+
// Ignore potential send errors (e.g., closed channel).
218+
handle.send(envelope_to_send).await;
219+
});
220+
}
224221

225-
// Drive all futures to completion in parallel.
226-
while futures.next().await.is_some() {}
222+
// Execute all sends concurrently and wait for completion.
223+
join_all(futures).await;
227224

228225
trace!(count = num_subscribers, message_type = ?message_type_id, "Broadcast sends completed");
229226
} else {

acton-reactive/src/common/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl Default for LimitsConfig {
118118
fn default() -> Self {
119119
Self {
120120
concurrent_handlers_high_water_mark: 100,
121-
actor_inbox_capacity: 255,
121+
actor_inbox_capacity: 512,
122122
dummy_channel_size: 1,
123123
}
124124
}

0 commit comments

Comments
 (0)