Skip to content

Commit fcd8dd2

Browse files
committed
[hyperactor] mesh: use reducers for spawns
Now that we have Nagle's algorithm (D83768514) for reducers, we can use them for spawning procs and hosts. Differential Revision: [D83931455](https://our.internmc.facebook.com/intern/diff/D83931455/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D83931455/)! ghstack-source-id: 314156943 Pull Request resolved: #1440
1 parent 3b5571e commit fcd8dd2

File tree

4 files changed

+38
-10
lines changed

4 files changed

+38
-10
lines changed

hyperactor/src/mailbox.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,12 +1244,29 @@ impl Mailbox {
12441244
)
12451245
}
12461246

1247+
/// Open a new port with an accumulator with default reduce options.
1248+
/// See [`open_accum_port_opts`] for more details.
1249+
pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1250+
where
1251+
A: Accumulator + Send + Sync + 'static,
1252+
A::Update: Message,
1253+
A::State: Message + Default + Clone,
1254+
{
1255+
self.open_accum_port_opts(accum, None)
1256+
}
1257+
12471258
/// Open a new port with an accumulator. This port accepts A::Update type
12481259
/// messages, accumulate them into A::State with the given accumulator.
12491260
/// The latest changed state can be received from the returned receiver as
12501261
/// a single A::State message. If there is no new update, the receiver will
12511262
/// not receive any message.
1252-
pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1263+
///
1264+
/// If provided, reducer options are applied to reduce operations.
1265+
pub fn open_accum_port_opts<A>(
1266+
&self,
1267+
accum: A,
1268+
reducer_opts: Option<ReducerOpts>,
1269+
) -> (PortHandle<A::Update>, PortReceiver<A::State>)
12531270
where
12541271
A: Accumulator + Send + Sync + 'static,
12551272
A::Update: Message,
@@ -1273,7 +1290,7 @@ impl Mailbox {
12731290
sender: UnboundedPortSender::Func(Arc::new(enqueue)),
12741291
bound: Arc::new(OnceLock::new()),
12751292
reducer_spec,
1276-
reducer_opts: None, // TODO: provide open_accum_port_opts
1293+
reducer_opts,
12771294
},
12781295
PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()),
12791296
)

hyperactor_mesh/src/resource.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -485,12 +485,10 @@ impl<T: Eq + Clone + Named> Accumulator for RankedValues<T> {
485485
}
486486

487487
fn reducer_spec(&self) -> Option<ReducerSpec> {
488-
None
489-
// TODO: make this work. When it is enabled, the comm actor simply halts.
490-
// Some(ReducerSpec {
491-
// typehash: <RankedValuesReducer<T> as Named>::typehash(),
492-
// builder_params: None,
493-
// })
488+
Some(ReducerSpec {
489+
typehash: <RankedValuesReducer<T> as Named>::typehash(),
490+
builder_params: None,
491+
})
494492
}
495493
}
496494

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* LICENSE file in the root directory of this source tree.
77
*/
88

9+
use hyperactor::accum::ReducerOpts;
910
use hyperactor::channel::ChannelTransport;
1011
use hyperactor::clock::Clock;
1112
use hyperactor::clock::RealClock;
@@ -546,7 +547,13 @@ impl HostMeshRef {
546547
let mesh_name = Name::new(name);
547548
let mut procs = Vec::new();
548549
let num_ranks = self.region().num_ranks() * per_host.num_ranks();
549-
let (port, rx) = cx.mailbox().open_accum_port(RankedValues::default());
550+
let (port, rx) = cx.mailbox().open_accum_port_opts(
551+
RankedValues::default(),
552+
Some(ReducerOpts {
553+
max_update_interval: Some(Duration::from_millis(50)),
554+
}),
555+
);
556+
550557
// We CreateOrUpdate each proc, and then fence on getting statuses back.
551558
// This is currently necessary because otherwise there is a race between
552559
// the procs being created, and subsequent messages becoming unroutable

hyperactor_mesh/src/v1/proc_mesh.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use hyperactor::ActorRef;
1919
use hyperactor::Named;
2020
use hyperactor::ProcId;
2121
use hyperactor::RemoteMessage;
22+
use hyperactor::accum::ReducerOpts;
2223
use hyperactor::actor::RemoteActor;
2324
use hyperactor::actor::remote::Remote;
2425
use hyperactor::channel;
@@ -609,7 +610,12 @@ impl ProcMeshRef {
609610
},
610611
)?;
611612

612-
let (port, rx) = cx.mailbox().open_accum_port(RankedValues::default());
613+
let (port, rx) = cx.mailbox().open_accum_port_opts(
614+
RankedValues::default(),
615+
Some(ReducerOpts {
616+
max_update_interval: Some(Duration::from_millis(50)),
617+
}),
618+
);
613619

614620
self.agent_mesh().cast(
615621
cx,

0 commit comments

Comments
 (0)