Skip to content

Commit cce0cd4

Browse files
mariusaemeta-codesync[bot]
authored andcommitted
mesh: use reducers for spawns (#1440)
Summary: Pull Request resolved: #1440 Now that we have Nagle's algorithm (D83768514) for reducers, we can use them for spawning procs and hosts. ghstack-source-id: 314616749 exported-using-ghexport Reviewed By: shayne-fletcher Differential Revision: D83931455 fbshipit-source-id: d5e876cb093679f9b8c9228680d54ece49ef281c
1 parent 90c6202 commit cce0cd4

File tree

4 files changed

+38
-10
lines changed

4 files changed

+38
-10
lines changed

hyperactor/src/mailbox.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,12 +1244,29 @@ impl Mailbox {
12441244
)
12451245
}
12461246

1247+
/// Open a new port with an accumulator with default reduce options.
1248+
/// See [`open_accum_port_opts`] for more details.
1249+
pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1250+
where
1251+
A: Accumulator + Send + Sync + 'static,
1252+
A::Update: Message,
1253+
A::State: Message + Default + Clone,
1254+
{
1255+
self.open_accum_port_opts(accum, None)
1256+
}
1257+
12471258
/// Open a new port with an accumulator. This port accepts A::Update type
12481259
/// messages, accumulate them into A::State with the given accumulator.
12491260
/// The latest changed state can be received from the returned receiver as
12501261
/// a single A::State message. If there is no new update, the receiver will
12511262
/// not receive any message.
1252-
pub fn open_accum_port<A>(&self, accum: A) -> (PortHandle<A::Update>, PortReceiver<A::State>)
1263+
///
1264+
/// If provided, reducer options are applied to reduce operations.
1265+
pub fn open_accum_port_opts<A>(
1266+
&self,
1267+
accum: A,
1268+
reducer_opts: Option<ReducerOpts>,
1269+
) -> (PortHandle<A::Update>, PortReceiver<A::State>)
12531270
where
12541271
A: Accumulator + Send + Sync + 'static,
12551272
A::Update: Message,
@@ -1273,7 +1290,7 @@ impl Mailbox {
12731290
sender: UnboundedPortSender::Func(Arc::new(enqueue)),
12741291
bound: Arc::new(OnceLock::new()),
12751292
reducer_spec,
1276-
reducer_opts: None, // TODO: provide open_accum_port_opts
1293+
reducer_opts,
12771294
},
12781295
PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()),
12791296
)

hyperactor_mesh/src/resource.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -485,12 +485,10 @@ impl<T: Eq + Clone + Named> Accumulator for RankedValues<T> {
485485
}
486486

487487
fn reducer_spec(&self) -> Option<ReducerSpec> {
488-
None
489-
// TODO: make this work. When it is enabled, the comm actor simply halts.
490-
// Some(ReducerSpec {
491-
// typehash: <RankedValuesReducer<T> as Named>::typehash(),
492-
// builder_params: None,
493-
// })
488+
Some(ReducerSpec {
489+
typehash: <RankedValuesReducer<T> as Named>::typehash(),
490+
builder_params: None,
491+
})
494492
}
495493
}
496494

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* LICENSE file in the root directory of this source tree.
77
*/
88

9+
use hyperactor::accum::ReducerOpts;
910
use hyperactor::channel::ChannelTransport;
1011
use hyperactor::clock::Clock;
1112
use hyperactor::clock::RealClock;
@@ -534,7 +535,13 @@ impl HostMeshRef {
534535
let mesh_name = Name::new(name);
535536
let mut procs = Vec::new();
536537
let num_ranks = self.region().num_ranks() * per_host.num_ranks();
537-
let (port, rx) = cx.mailbox().open_accum_port(RankedValues::default());
538+
let (port, rx) = cx.mailbox().open_accum_port_opts(
539+
RankedValues::default(),
540+
Some(ReducerOpts {
541+
max_update_interval: Some(Duration::from_millis(50)),
542+
}),
543+
);
544+
538545
// We CreateOrUpdate each proc, and then fence on getting statuses back.
539546
// This is currently necessary because otherwise there is a race between
540547
// the procs being created, and subsequent messages becoming unroutable

hyperactor_mesh/src/v1/proc_mesh.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use hyperactor::ActorRef;
1919
use hyperactor::Named;
2020
use hyperactor::ProcId;
2121
use hyperactor::RemoteMessage;
22+
use hyperactor::accum::ReducerOpts;
2223
use hyperactor::actor::Referable;
2324
use hyperactor::actor::remote::Remote;
2425
use hyperactor::channel;
@@ -609,7 +610,12 @@ impl ProcMeshRef {
609610
},
610611
)?;
611612

612-
let (port, rx) = cx.mailbox().open_accum_port(RankedValues::default());
613+
let (port, rx) = cx.mailbox().open_accum_port_opts(
614+
RankedValues::default(),
615+
Some(ReducerOpts {
616+
max_update_interval: Some(Duration::from_millis(50)),
617+
}),
618+
);
613619

614620
self.agent_mesh().cast(
615621
cx,

0 commit comments

Comments
 (0)