Skip to content

Commit 16e3de3

Browse files
mariusaemeta-codesync[bot]
authored andcommitted
mesh: concurrent spawns, aggregated statuses (#1394)
Summary: Pull Request resolved: #1394 This adds concurrent spawning for procs, and introduces a principled cast/accumulate status aggregation. The latter lets us efficiently determine the status of a mesh, and apply whatever policy makes sense in context (e.g., early exit). We will use GetRankStatus also to apply liveness checks for actors, as we can support this efficiently. This change also introduces an RLE-encoded data structure, RankedValues, which works well for low-cardinality contexts (e.g., statuses); but to use it more generally for value meshes will require that the merge operation be optimized. ghstack-source-id: 313885569 exported-using-ghexport Reviewed By: shayne-fletcher Differential Revision: D83678884 fbshipit-source-id: 185847b978c2bfc68e2a0f004ee37354ff56ddae
1 parent d417d18 commit 16e3de3

File tree

13 files changed

+664
-106
lines changed

13 files changed

+664
-106
lines changed

hyperactor/src/message.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,15 @@ impl<M> Unbound<M> {
136136
pub fn new(message: M, bindings: Bindings) -> Self {
137137
Self { message, bindings }
138138
}
139+
140+
/// Use the provided function to update values inside bindings in the same
141+
/// order as they were pushed into bindings.
142+
pub fn visit_mut<T: Serialize + DeserializeOwned + Named>(
143+
&mut self,
144+
f: impl FnMut(&mut T) -> anyhow::Result<()>,
145+
) -> anyhow::Result<()> {
146+
self.bindings.visit_mut(f)
147+
}
139148
}
140149

141150
impl<M: Bind> Unbound<M> {

hyperactor_mesh/src/bootstrap.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::future;
1515
use std::io;
1616
use std::io::Write;
1717
use std::os::unix::process::ExitStatusExt;
18+
use std::path::PathBuf;
1819
use std::process::Stdio;
1920
use std::sync::Arc;
2021
use std::sync::Mutex;
@@ -220,7 +221,9 @@ pub enum Bootstrap {
220221
Host {
221222
/// The address on which to serve the host.
222223
addr: ChannelAddr,
223-
224+
/// If specified, use the provided command instead of
225+
/// [`BootstrapCommand::current`].
226+
command: Option<BootstrapCommand>,
224227
/// Config snapshot for the child.
225228
config: Option<Attrs>,
226229
},
@@ -345,7 +348,11 @@ impl Bootstrap {
345348
Err(e) => e.into(),
346349
}
347350
}
348-
Bootstrap::Host { addr, config } => {
351+
Bootstrap::Host {
352+
addr,
353+
command,
354+
config,
355+
} => {
349356
if config.is_some() {
350357
tracing::debug!(
351358
"bootstrap: Host received config snapshot (carried, not applied)"
@@ -354,7 +361,10 @@ impl Bootstrap {
354361
tracing::debug!("bootstrap: no config snapshot provided (Host)");
355362
}
356363

357-
let command = ok!(BootstrapCommand::current());
364+
let command = match command {
365+
Some(command) => command,
366+
None => ok!(BootstrapCommand::current()),
367+
};
358368
let manager = BootstrapProcManager::new(command);
359369
let (host, _handle) = ok!(Host::serve(manager, addr).await);
360370
let addr = host.addr().clone();
@@ -1303,7 +1313,7 @@ impl hyperactor::host::ProcHandle for BootstrapProcHandle {
13031313
/// A specification of the command used to bootstrap procs.
13041314
#[derive(Debug, Named, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
13051315
pub struct BootstrapCommand {
1306-
pub program: std::path::PathBuf,
1316+
pub program: PathBuf,
13071317
pub arg0: Option<String>,
13081318
pub args: Vec<String>,
13091319
pub env: HashMap<String, String>,
@@ -1341,6 +1351,18 @@ impl BootstrapCommand {
13411351
}
13421352
}
13431353

1354+
impl<T: Into<PathBuf>> From<T> for BootstrapCommand {
1355+
/// Creates a bootstrap command from the provided path.
1356+
fn from(s: T) -> Self {
1357+
Self {
1358+
program: s.into(),
1359+
arg0: None,
1360+
args: vec![],
1361+
env: HashMap::new(),
1362+
}
1363+
}
1364+
}
1365+
13441366
/// A process manager for launching and supervising **bootstrap
13451367
/// processes** (via the [`bootstrap`] entry point).
13461368
///
@@ -1956,7 +1978,7 @@ fn debug_sink() -> &'static Mutex<DebugSink> {
19561978
}
19571979

19581980
/// If true, send `Debug` messages to stderr.
1959-
const DEBUG_TO_STDERR: bool = true;
1981+
const DEBUG_TO_STDERR: bool = false;
19601982

19611983
/// A bootstrap specific debug writer. If the file /tmp/monarch-bootstrap-debug.log
19621984
/// exists, then the writer's destination is that file; otherwise it discards all writes.
@@ -2059,6 +2081,7 @@ mod tests {
20592081
fn test_bootstrap_mode_env_string_none_config_host() {
20602082
let value = Bootstrap::Host {
20612083
addr: ChannelAddr::any(ChannelTransport::Unix),
2084+
command: None,
20622085
config: None,
20632086
};
20642087

@@ -2113,6 +2136,7 @@ mod tests {
21132136
{
21142137
let original = Bootstrap::Host {
21152138
addr: ChannelAddr::any(ChannelTransport::Unix),
2139+
command: None,
21162140
config: Some(attrs.clone()),
21172141
};
21182142
let env_str = original.to_env_safe_string().expect("encode bootstrap");

hyperactor_mesh/src/comm.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
use crate::comm::multicast::CAST_ORIGINATING_SENDER;
1010
use crate::reference::ActorMeshId;
11+
use crate::resource;
1112
pub mod multicast;
1213

1314
use std::cmp::Ordering;
@@ -279,6 +280,13 @@ impl CommActor {
279280
if deliver_here {
280281
let rank_on_root_mesh = mode.self_rank(cx.self_id())?;
281282
let cast_rank = message.relative_rank(rank_on_root_mesh)?;
283+
// Replace ranks with self ranks.
284+
message
285+
.data_mut()
286+
.visit_mut::<resource::Rank>(|resource::Rank(rank)| {
287+
*rank = Some(cast_rank);
288+
Ok(())
289+
})?;
282290
let cast_shape = message.shape();
283291
let point = cast_shape
284292
.extent()

hyperactor_mesh/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#![feature(assert_matches)]
1212
#![feature(exit_status_error)]
1313
#![feature(impl_trait_in_bindings)]
14+
#![feature(get_disjoint_mut_helpers)]
15+
#![feature(exact_size_is_empty)]
1416

1517
pub mod actor_mesh;
1618
pub mod alloc;

hyperactor_mesh/src/proc_mesh/mesh_agent.rs

Lines changed: 75 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -164,21 +164,29 @@ impl State {
164164
}
165165
}
166166

167+
/// Actor state used for v1 API.
168+
#[derive(Debug)]
169+
struct ActorInstanceState {
170+
create_rank: usize,
171+
spawn: Result<ActorId, anyhow::Error>,
172+
}
173+
167174
/// A mesh agent is responsible for managing procs in a [`ProcMesh`].
168175
#[derive(Debug)]
169176
#[hyperactor::export(
170177
handlers=[
171178
MeshAgentMessage,
172-
resource::CreateOrUpdate<ActorSpec>,
179+
resource::CreateOrUpdate<ActorSpec> { cast = true },
173180
resource::GetState<ActorState> { cast = true },
181+
resource::GetRankStatus { cast = true },
174182
]
175183
)]
176184
pub struct ProcMeshAgent {
177185
proc: Proc,
178186
remote: Remote,
179187
state: State,
180188
/// Actors created and tracked through the resource behavior.
181-
created: HashMap<Name, Result<ActorId, anyhow::Error>>,
189+
actor_states: HashMap<Name, ActorInstanceState>,
182190
/// If true, and supervisor is None, record supervision events to be reported
183191
/// to owning actors later.
184192
record_supervision_events: bool,
@@ -203,7 +211,7 @@ impl ProcMeshAgent {
203211
proc: proc.clone(),
204212
remote: Remote::collect(),
205213
state: State::UnconfiguredV0 { sender },
206-
created: HashMap::new(),
214+
actor_states: HashMap::new(),
207215
record_supervision_events: false,
208216
supervision_events: HashMap::new(),
209217
};
@@ -216,7 +224,7 @@ impl ProcMeshAgent {
216224
proc: proc.clone(),
217225
remote: Remote::collect(),
218226
state: State::V1,
219-
created: HashMap::new(),
227+
actor_states: HashMap::new(),
220228
record_supervision_events: true,
221229
supervision_events: HashMap::new(),
222230
};
@@ -442,10 +450,10 @@ pub struct ActorState {
442450
impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcMeshAgent {
443451
async fn handle(
444452
&mut self,
445-
cx: &Context<Self>,
453+
_cx: &Context<Self>,
446454
create_or_update: resource::CreateOrUpdate<ActorSpec>,
447455
) -> anyhow::Result<()> {
448-
if self.created.contains_key(&create_or_update.name) {
456+
if self.actor_states.contains_key(&create_or_update.name) {
449457
// There is no update.
450458
return Ok(());
451459
}
@@ -454,19 +462,63 @@ impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcMeshAgent {
454462
actor_type,
455463
params_data,
456464
} = create_or_update.spec;
457-
self.created.insert(
465+
self.actor_states.insert(
458466
create_or_update.name.clone(),
459-
self.remote
460-
.gspawn(
461-
&self.proc,
462-
&actor_type,
463-
&create_or_update.name.to_string(),
464-
params_data,
465-
)
466-
.await,
467+
ActorInstanceState {
468+
create_rank: create_or_update.rank.unwrap(),
469+
spawn: self
470+
.remote
471+
.gspawn(
472+
&self.proc,
473+
&actor_type,
474+
&create_or_update.name.to_string(),
475+
params_data,
476+
)
477+
.await,
478+
},
467479
);
468480

469-
create_or_update.reply.send(cx, true)?;
481+
Ok(())
482+
}
483+
}
484+
485+
#[async_trait]
486+
impl Handler<resource::GetRankStatus> for ProcMeshAgent {
487+
async fn handle(
488+
&mut self,
489+
cx: &Context<Self>,
490+
get_rank_status: resource::GetRankStatus,
491+
) -> anyhow::Result<()> {
492+
let (rank, status) = match self.actor_states.get(&get_rank_status.name) {
493+
Some(ActorInstanceState {
494+
spawn: Ok(actor_id),
495+
create_rank,
496+
}) => {
497+
let supervision_events = self
498+
.supervision_events
499+
.get(actor_id)
500+
.map_or_else(Vec::new, |a| a.clone());
501+
(
502+
*create_rank,
503+
if supervision_events.is_empty() {
504+
resource::Status::Running
505+
} else {
506+
resource::Status::Failed(format!(
507+
"because of supervision events: {:?}",
508+
supervision_events
509+
))
510+
},
511+
)
512+
}
513+
Some(ActorInstanceState {
514+
spawn: Err(e),
515+
create_rank,
516+
}) => (*create_rank, resource::Status::Failed(e.to_string())),
517+
// TODO: represent unknown rank
518+
None => (usize::MAX, resource::Status::NotExist),
519+
};
520+
521+
get_rank_status.reply.send(cx, (rank, status).into())?;
470522
Ok(())
471523
}
472524
}
@@ -478,12 +530,11 @@ impl Handler<resource::GetState<ActorState>> for ProcMeshAgent {
478530
cx: &Context<Self>,
479531
get_state: resource::GetState<ActorState>,
480532
) -> anyhow::Result<()> {
481-
let rank = self
482-
.state
483-
.rank()
484-
.ok_or_else(|| anyhow::anyhow!("tried to get status of unconfigured proc"))?;
485-
let state = match self.created.get(&get_state.name) {
486-
Some(Ok(actor_id)) => {
533+
let state = match self.actor_states.get(&get_state.name) {
534+
Some(ActorInstanceState {
535+
create_rank,
536+
spawn: Ok(actor_id),
537+
}) => {
487538
let supervision_events = self
488539
.supervision_events
489540
.get(actor_id)
@@ -501,12 +552,12 @@ impl Handler<resource::GetState<ActorState>> for ProcMeshAgent {
501552
status,
502553
state: Some(ActorState {
503554
actor_id: actor_id.clone(),
504-
create_rank: rank,
555+
create_rank: *create_rank,
505556
supervision_events,
506557
}),
507558
}
508559
}
509-
Some(Err(e)) => resource::State {
560+
Some(ActorInstanceState { spawn: Err(e), .. }) => resource::State {
510561
name: get_state.name.clone(),
511562
status: resource::Status::Failed(e.to_string()),
512563
state: None,

0 commit comments

Comments
 (0)