From 22cca5d82f07d919e8034527bb63dc08af810b8e Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Mon, 6 Oct 2025 07:56:00 -0700 Subject: [PATCH] [hyperactor] mesh: use reducers for spawns Now that we have Nagle's algorithm (D83768514) for reducers, we can use them for spawning procs and hosts. Differential Revision: [D83931455](https://our.internmc.facebook.com/intern/diff/D83931455/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D83931455/)! [ghstack-poisoned] --- hyperactor/src/mailbox.rs | 21 +++++++++++++++++++-- hyperactor_mesh/src/resource.rs | 10 ++++------ hyperactor_mesh/src/v1/host_mesh.rs | 9 ++++++++- hyperactor_mesh/src/v1/proc_mesh.rs | 8 +++++++- 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index e4baf7d01..c420d2cff 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -1244,12 +1244,29 @@ impl Mailbox { ) } + /// Open a new port with an accumulator with default reduce options. + /// See [`open_accum_port_opts`] for more details. + pub fn open_accum_port(&self, accum: A) -> (PortHandle, PortReceiver) + where + A: Accumulator + Send + Sync + 'static, + A::Update: Message, + A::State: Message + Default + Clone, + { + self.open_accum_port_opts(accum, None) + } + /// Open a new port with an accumulator. This port accepts A::Update type /// messages, accumulate them into A::State with the given accumulator. /// The latest changed state can be received from the returned receiver as /// a single A::State message. If there is no new update, the receiver will /// not receive any message. - pub fn open_accum_port(&self, accum: A) -> (PortHandle, PortReceiver) + /// + /// If provided, reducer options are applied to reduce operations. + pub fn open_accum_port_opts( + &self, + accum: A, + reducer_opts: Option, + ) -> (PortHandle, PortReceiver) where A: Accumulator + Send + Sync + 'static, A::Update: Message, @@ -1273,7 +1290,7 @@ impl Mailbox { sender: UnboundedPortSender::Func(Arc::new(enqueue)), bound: Arc::new(OnceLock::new()), reducer_spec, - reducer_opts: None, // TODO: provide open_accum_port_opts + reducer_opts, }, PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()), ) diff --git a/hyperactor_mesh/src/resource.rs b/hyperactor_mesh/src/resource.rs index c7c251b91..e1559ad0b 100644 --- a/hyperactor_mesh/src/resource.rs +++ b/hyperactor_mesh/src/resource.rs @@ -485,12 +485,10 @@ impl Accumulator for RankedValues { } fn reducer_spec(&self) -> Option { - None - // TODO: make this work. When it is enabled, the comm actor simply halts. - // Some(ReducerSpec { - // typehash: as Named>::typehash(), - // builder_params: None, - // }) + Some(ReducerSpec { + typehash: as Named>::typehash(), + builder_params: None, + }) } } diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index a7026a791..7fa20ab0d 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -6,6 +6,7 @@ * LICENSE file in the root directory of this source tree. */ +use hyperactor::accum::ReducerOpts; use hyperactor::channel::ChannelTransport; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; @@ -546,7 +547,13 @@ impl HostMeshRef { let mesh_name = Name::new(name); let mut procs = Vec::new(); let num_ranks = self.region().num_ranks() * per_host.num_ranks(); - let (port, rx) = cx.mailbox().open_accum_port(RankedValues::default()); + let (port, rx) = cx.mailbox().open_accum_port_opts( + RankedValues::default(), + Some(ReducerOpts { + max_update_interval: Some(Duration::from_millis(50)), + }), + ); + // We CreateOrUpdate each proc, and then fence on getting statuses back. // This is currently necessary because otherwise there is a race between // the procs being created, and subsequent messages becoming unroutable diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 23311cb50..953ca7b5d 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -19,6 +19,7 @@ use hyperactor::ActorRef; use hyperactor::Named; use hyperactor::ProcId; use hyperactor::RemoteMessage; +use hyperactor::accum::ReducerOpts; use hyperactor::actor::RemoteActor; use hyperactor::actor::remote::Remote; use hyperactor::channel; @@ -609,7 +610,12 @@ impl ProcMeshRef { }, )?; - let (port, rx) = cx.mailbox().open_accum_port(RankedValues::default()); + let (port, rx) = cx.mailbox().open_accum_port_opts( + RankedValues::default(), + Some(ReducerOpts { + max_update_interval: Some(Duration::from_millis(50)), + }), + ); self.agent_mesh().cast( cx,