diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index c2756401b..1de42b0bb 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -1244,12 +1244,29 @@ impl Mailbox { ) } + /// Open a new port with an accumulator with default reduce options. + /// See [`open_accum_port_opts`] for more details. + pub fn open_accum_port(&self, accum: A) -> (PortHandle, PortReceiver) + where + A: Accumulator + Send + Sync + 'static, + A::Update: Message, + A::State: Message + Default + Clone, + { + self.open_accum_port_opts(accum, None) + } + /// Open a new port with an accumulator. This port accepts A::Update type /// messages, accumulate them into A::State with the given accumulator. /// The latest changed state can be received from the returned receiver as /// a single A::State message. If there is no new update, the receiver will /// not receive any message. - pub fn open_accum_port(&self, accum: A) -> (PortHandle, PortReceiver) + /// + /// If provided, reducer options are applied to reduce operations. + pub fn open_accum_port_opts( + &self, + accum: A, + reducer_opts: Option, + ) -> (PortHandle, PortReceiver) where A: Accumulator + Send + Sync + 'static, A::Update: Message, @@ -1273,7 +1290,7 @@ impl Mailbox { sender: UnboundedPortSender::Func(Arc::new(enqueue)), bound: Arc::new(OnceLock::new()), reducer_spec, - reducer_opts: None, // TODO: provide open_accum_port_opts + reducer_opts, }, PortReceiver::new(receiver, port_id, /*coalesce=*/ true, self.clone()), ) diff --git a/hyperactor_mesh/src/resource.rs b/hyperactor_mesh/src/resource.rs index c7c251b91..e1559ad0b 100644 --- a/hyperactor_mesh/src/resource.rs +++ b/hyperactor_mesh/src/resource.rs @@ -485,12 +485,10 @@ impl Accumulator for RankedValues { } fn reducer_spec(&self) -> Option { - None - // TODO: make this work. When it is enabled, the comm actor simply halts. - // Some(ReducerSpec { - // typehash: as Named>::typehash(), - // builder_params: None, - // }) + Some(ReducerSpec { + typehash: as Named>::typehash(), + builder_params: None, + }) } } diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index 661297d4c..9cff5aeae 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -6,6 +6,7 @@ * LICENSE file in the root directory of this source tree. */ +use hyperactor::accum::ReducerOpts; use hyperactor::channel::ChannelTransport; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; @@ -534,7 +535,13 @@ impl HostMeshRef { let mesh_name = Name::new(name); let mut procs = Vec::new(); let num_ranks = self.region().num_ranks() * per_host.num_ranks(); - let (port, rx) = cx.mailbox().open_accum_port(RankedValues::default()); + let (port, rx) = cx.mailbox().open_accum_port_opts( + RankedValues::default(), + Some(ReducerOpts { + max_update_interval: Some(Duration::from_millis(50)), + }), + ); + // We CreateOrUpdate each proc, and then fence on getting statuses back. // This is currently necessary because otherwise there is a race between // the procs being created, and subsequent messages becoming unroutable diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index e98f6209c..d5bece7f6 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -19,6 +19,7 @@ use hyperactor::ActorRef; use hyperactor::Named; use hyperactor::ProcId; use hyperactor::RemoteMessage; +use hyperactor::accum::ReducerOpts; use hyperactor::actor::Referable; use hyperactor::actor::remote::Remote; use hyperactor::channel; @@ -609,7 +610,12 @@ impl ProcMeshRef { }, )?; - let (port, rx) = cx.mailbox().open_accum_port(RankedValues::default()); + let (port, rx) = cx.mailbox().open_accum_port_opts( + RankedValues::default(), + Some(ReducerOpts { + max_update_interval: Some(Duration::from_millis(50)), + }), + ); self.agent_mesh().cast( cx,