Skip to content

Commit 1c01ffb

Browse files
committed
[hyperactor] mesh: implement local proc bypasss
This change allows procs local to a host to bypass the host multiplexer. This should improve multicast delivery, as local comm actors do not need to traverse the same (frontend) bottleneck. Differential Revision: [D83996264](https://our.internmc.facebook.com/intern/diff/D83996264/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D83996264/)! ghstack-source-id: 314347091 Pull Request resolved: #1442
1 parent 6e87ff0 commit 1c01ffb

File tree

5 files changed

+271
-26
lines changed

5 files changed

+271
-26
lines changed

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ pub type Data = Vec<u8>;
164164
Deserialize,
165165
Named,
166166
Clone,
167-
PartialEq
167+
PartialEq,
168+
Eq
168169
)]
169170
pub enum DeliveryError {
170171
/// The destination address is not reachable.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ use crate::supervision::ActorSupervisionEvent;
3232
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
3333
pub struct Undeliverable<M: Message>(pub M);
3434

35+
impl<M: Message> Undeliverable<M> {
36+
/// Return the inner M-typed message.
37+
pub fn into_inner(self) -> M {
38+
self.0
39+
}
40+
}
41+
3542
// Port handle and receiver for undeliverable messages.
3643
pub(crate) fn new_undeliverable_port() -> (
3744
PortHandle<Undeliverable<MessageEnvelope>>,

hyperactor_mesh/src/bootstrap.rs

Lines changed: 73 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::future;
1515
use std::io;
1616
use std::io::Write;
1717
use std::os::unix::process::ExitStatusExt;
18+
use std::path::Path;
1819
use std::path::PathBuf;
1920
use std::process::Stdio;
2021
use std::sync::Arc;
@@ -34,6 +35,7 @@ use hyperactor::ProcId;
3435
use hyperactor::attrs::Attrs;
3536
use hyperactor::channel;
3637
use hyperactor::channel::ChannelAddr;
38+
use hyperactor::channel::ChannelError;
3739
use hyperactor::channel::ChannelTransport;
3840
use hyperactor::channel::Rx;
3941
use hyperactor::channel::Tx;
@@ -48,11 +50,14 @@ use hyperactor::host::HostError;
4850
use hyperactor::host::ProcHandle;
4951
use hyperactor::host::ProcManager;
5052
use hyperactor::host::TerminateSummary;
53+
use hyperactor::mailbox::IntoBoxedMailboxSender;
54+
use hyperactor::mailbox::MailboxClient;
5155
use hyperactor::mailbox::MailboxServer;
5256
use hyperactor::proc::Proc;
5357
use libc::c_int;
5458
use serde::Deserialize;
5559
use serde::Serialize;
60+
use tempfile::TempDir;
5661
use tokio::process::Child;
5762
use tokio::process::Command;
5863
use tokio::sync::oneshot;
@@ -65,6 +70,8 @@ use crate::v1;
6570
use crate::v1::host_mesh::mesh_agent::HostAgentMode;
6671
use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
6772

73+
mod mailbox;
74+
6875
declare_attrs! {
6976
/// If enabled (default), bootstrap child processes install
7077
/// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
@@ -213,6 +220,10 @@ pub enum Bootstrap {
213220
backend_addr: ChannelAddr,
214221
/// The callback address used to indicate successful spawning.
215222
callback_addr: ChannelAddr,
223+
/// Directory for storing proc socket files. Procs place their sockets
224+
/// in this directory, so that they can be looked up by other procs
225+
/// for direct transfer.
226+
socket_dir_path: PathBuf,
216227
/// Optional config snapshot (`hyperactor::config::Attrs`)
217228
/// captured by the parent. If present, the child installs it
218229
/// as the `Runtime` layer so the parent's effective config
@@ -325,6 +336,7 @@ impl Bootstrap {
325336
proc_id,
326337
backend_addr,
327338
callback_addr,
339+
socket_dir_path,
328340
config,
329341
} => {
330342
if let Some(attrs) = config {
@@ -344,15 +356,39 @@ impl Bootstrap {
344356
eprintln!("(bootstrap) PDEATHSIG disabled via config");
345357
}
346358

347-
let result =
348-
host::spawn_proc(proc_id, backend_addr, callback_addr, |proc| async move {
349-
ProcMeshAgent::boot_v1(proc).await
350-
})
351-
.await;
352-
match result {
353-
Ok(_proc) => halt().await,
354-
Err(e) => e.into(),
355-
}
359+
let (local_addr, name) = ok!(proc_id
360+
.as_direct()
361+
.ok_or_else(|| anyhow::anyhow!("invalid proc id type")));
362+
// TODO provide a direct way to construct these
363+
let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
364+
let serve_addr = serve_addr.parse().unwrap();
365+
366+
// The following is a modified host::spawn_proc to support direct
367+
// dialing between local procs: 1) we bind each proc to a deterministic
368+
// address in socket_dir_path; 2) we use LocalProcDialer to dial these
369+
// addresses for local procs.
370+
let proc_sender = mailbox::LocalProcDialer::new(
371+
local_addr.clone(),
372+
socket_dir_path,
373+
ok!(MailboxClient::dial(backend_addr)),
374+
);
375+
376+
let proc = Proc::new(proc_id.clone(), proc_sender.into_boxed());
377+
378+
let agent_handle = ok!(ProcMeshAgent::boot_v1(proc.clone())
379+
.await
380+
.map_err(|e| HostError::AgentSpawnFailure(proc_id, e)));
381+
382+
// Finally serve the proc on the same transport as the backend address,
383+
// and call back.
384+
let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr));
385+
proc.clone().serve(proc_rx);
386+
ok!(ok!(channel::dial(callback_addr))
387+
.send((proc_addr, agent_handle.bind::<ProcMeshAgent>()))
388+
.await
389+
.map_err(ChannelError::from));
390+
391+
halt().await
356392
}
357393
Bootstrap::Host {
358394
addr,
@@ -370,7 +406,7 @@ impl Bootstrap {
370406
Some(command) => command,
371407
None => ok!(BootstrapCommand::current()),
372408
};
373-
let manager = BootstrapProcManager::new(command);
409+
let manager = BootstrapProcManager::new(command).unwrap();
374410
let (host, _handle) = ok!(Host::serve(manager, addr).await);
375411
let addr = host.addr().clone();
376412
let host_mesh_agent = ok!(host
@@ -1400,6 +1436,11 @@ pub struct BootstrapProcManager {
14001436
/// exclusively in the [`Drop`] impl to send `SIGKILL` without
14011437
/// needing async context.
14021438
pid_table: Arc<std::sync::Mutex<HashMap<ProcId, u32>>>,
1439+
1440+
/// Directory for storing proc socket files. Procs place their sockets
1441+
/// in this directory, so that they can be looked up by other procs
1442+
/// for direct transfer.
1443+
socket_dir: TempDir,
14031444
}
14041445

14051446
impl Drop for BootstrapProcManager {
@@ -1449,12 +1490,13 @@ impl BootstrapProcManager {
14491490
/// This is the general entry point when you want to manage procs
14501491
/// backed by a specific binary path (e.g. a bootstrap
14511492
/// trampoline).
1452-
pub(crate) fn new(command: BootstrapCommand) -> Self {
1453-
Self {
1493+
pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1494+
Ok(Self {
14541495
command,
14551496
children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
14561497
pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
1457-
}
1498+
socket_dir: tempfile::tempdir()?,
1499+
})
14581500
}
14591501

14601502
/// The bootstrap command used to launch processes.
@@ -1626,6 +1668,7 @@ impl ProcManager for BootstrapProcManager {
16261668
proc_id: proc_id.clone(),
16271669
backend_addr,
16281670
callback_addr,
1671+
socket_dir_path: self.socket_dir.path().to_owned(),
16291672
config: Some(cfg),
16301673
};
16311674
let mut cmd = Command::new(&self.command.program);
@@ -2060,6 +2103,7 @@ mod tests {
20602103
proc_id: id!(foo[0]),
20612104
backend_addr: ChannelAddr::any(ChannelTransport::Tcp),
20622105
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2106+
socket_dir_path: PathBuf::from("notexist"),
20632107
config: None,
20642108
},
20652109
];
@@ -2117,13 +2161,16 @@ mod tests {
21172161
attrs[MESH_TAIL_LOG_LINES] = 123;
21182162
attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
21192163

2164+
let socket_dir = tempfile::tempdir().unwrap();
2165+
21202166
// Proc case
21212167
{
21222168
let original = Bootstrap::Proc {
21232169
proc_id: id!(foo[42]),
21242170
backend_addr: ChannelAddr::any(ChannelTransport::Unix),
21252171
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
21262172
config: Some(attrs.clone()),
2173+
socket_dir_path: socket_dir.path().to_owned(),
21272174
};
21282175
let env_str = original.to_env_safe_string().expect("encode bootstrap");
21292176
let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
@@ -2163,14 +2210,13 @@ mod tests {
21632210
use std::process::Stdio;
21642211

21652212
use tokio::process::Command;
2166-
use tokio::time::Duration;
21672213

21682214
// Manager; program path is irrelevant for this test.
21692215
let command = BootstrapCommand {
21702216
program: PathBuf::from("/bin/true"),
21712217
..Default::default()
21722218
};
2173-
let manager = BootstrapProcManager::new(command);
2219+
let manager = BootstrapProcManager::new(command).unwrap();
21742220

21752221
// Spawn a long-running child process (sleep 30) with
21762222
// kill_on_drop(true).
@@ -2550,7 +2596,7 @@ mod tests {
25502596
program: PathBuf::from("/bin/true"),
25512597
..Default::default()
25522598
};
2553-
let manager = BootstrapProcManager::new(command);
2599+
let manager = BootstrapProcManager::new(command).unwrap();
25542600

25552601
// Spawn a fast-exiting child.
25562602
let mut cmd = Command::new("/bin/true");
@@ -2584,7 +2630,7 @@ mod tests {
25842630
program: PathBuf::from("/bin/sleep"),
25852631
..Default::default()
25862632
};
2587-
let manager = BootstrapProcManager::new(command);
2633+
let manager = BootstrapProcManager::new(command).unwrap();
25882634

25892635
// Spawn a process that will live long enough to kill.
25902636
let mut cmd = Command::new("/bin/sleep");
@@ -2701,7 +2747,8 @@ mod tests {
27012747
let manager = BootstrapProcManager::new(BootstrapCommand {
27022748
program: PathBuf::from("/bin/true"),
27032749
..Default::default()
2704-
});
2750+
})
2751+
.unwrap();
27052752
let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
27062753
assert!(manager.status(&unknown).await.is_none());
27072754
}
@@ -2711,7 +2758,8 @@ mod tests {
27112758
let manager = BootstrapProcManager::new(BootstrapCommand {
27122759
program: PathBuf::from("/bin/sleep"),
27132760
..Default::default()
2714-
});
2761+
})
2762+
.unwrap();
27152763

27162764
// Long-ish child so it's alive while we "steal" it.
27172765
let mut cmd = Command::new("/bin/sleep");
@@ -2750,7 +2798,8 @@ mod tests {
27502798
let manager = BootstrapProcManager::new(BootstrapCommand {
27512799
program: PathBuf::from("/bin/sleep"),
27522800
..Default::default()
2753-
});
2801+
})
2802+
.unwrap();
27542803

27552804
let mut cmd = Command::new("/bin/sleep");
27562805
cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
@@ -3125,7 +3174,7 @@ mod tests {
31253174
.unwrap();
31263175
let (instance, _handle) = root.instance("client").unwrap();
31273176

3128-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3177+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
31293178
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
31303179
let handle = mgr
31313180
.spawn(proc_id.clone(), backend_addr.clone())
@@ -3181,7 +3230,7 @@ mod tests {
31813230
.unwrap();
31823231
let (instance, _handle) = root.instance("client").unwrap();
31833232

3184-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3233+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
31853234

31863235
// Proc identity + host backend channel the child will dial.
31873236
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
@@ -3380,7 +3429,8 @@ mod tests {
33803429
let manager = BootstrapProcManager::new(BootstrapCommand {
33813430
program: std::path::PathBuf::from("/bin/true"), // unused in this test
33823431
..Default::default()
3383-
});
3432+
})
3433+
.unwrap();
33843434
manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
33853435

33863436
// Await terminal status and assert on exit code and stderr

0 commit comments

Comments
 (0)