Skip to content

Commit a2a742c

Browse files
committed
[hyperactor] mesh: implement local proc bypasss
Pull Request resolved: #1442 This change allows procs local to a host to bypass the host multiplexer. This should improve multicast delivery, as local comm actors do not need to traverse the same (frontend) bottleneck. ghstack-source-id: 314642007 @exported-using-ghexport Differential Revision: [D83996264](https://our.internmc.facebook.com/intern/diff/D83996264/)
1 parent 76bea8b commit a2a742c

File tree

6 files changed

+283
-28
lines changed

6 files changed

+283
-28
lines changed

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ pub type Data = Vec<u8>;
164164
Deserialize,
165165
Named,
166166
Clone,
167-
PartialEq
167+
PartialEq,
168+
Eq
168169
)]
169170
pub enum DeliveryError {
170171
/// The destination address is not reachable.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ use crate::supervision::ActorSupervisionEvent;
3232
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
3333
pub struct Undeliverable<M: Message>(pub M);
3434

35+
impl<M: Message> Undeliverable<M> {
36+
/// Return the inner M-typed message.
37+
pub fn into_inner(self) -> M {
38+
self.0
39+
}
40+
}
41+
3542
// Port handle and receiver for undeliverable messages.
3643
pub(crate) fn new_undeliverable_port() -> (
3744
PortHandle<Undeliverable<MessageEnvelope>>,

hyperactor_mesh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ serde_bytes = "0.11"
7575
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] }
7676
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
7777
strum = { version = "0.27.1", features = ["derive"] }
78+
tempfile = "3.22"
7879
thiserror = "2.0.12"
7980
tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] }
8081
tokio-stream = { version = "0.1.17", features = ["fs", "io-util", "net", "signal", "sync", "time"] }

hyperactor_mesh/src/bootstrap.rs

Lines changed: 76 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::future;
1515
use std::io;
1616
use std::io::Write;
1717
use std::os::unix::process::ExitStatusExt;
18+
use std::path::Path;
1819
use std::path::PathBuf;
1920
use std::process::Stdio;
2021
use std::sync::Arc;
@@ -34,6 +35,7 @@ use hyperactor::ProcId;
3435
use hyperactor::attrs::Attrs;
3536
use hyperactor::channel;
3637
use hyperactor::channel::ChannelAddr;
38+
use hyperactor::channel::ChannelError;
3739
use hyperactor::channel::ChannelTransport;
3840
use hyperactor::channel::Rx;
3941
use hyperactor::channel::Tx;
@@ -48,10 +50,13 @@ use hyperactor::host::HostError;
4850
use hyperactor::host::ProcHandle;
4951
use hyperactor::host::ProcManager;
5052
use hyperactor::host::TerminateSummary;
53+
use hyperactor::mailbox::IntoBoxedMailboxSender;
54+
use hyperactor::mailbox::MailboxClient;
5155
use hyperactor::mailbox::MailboxServer;
5256
use hyperactor::proc::Proc;
5357
use serde::Deserialize;
5458
use serde::Serialize;
59+
use tempfile::TempDir;
5560
use tokio::process::Child;
5661
use tokio::process::Command;
5762
use tokio::sync::oneshot;
@@ -64,6 +69,8 @@ use crate::v1;
6469
use crate::v1::host_mesh::mesh_agent::HostAgentMode;
6570
use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
6671

72+
mod mailbox;
73+
6774
declare_attrs! {
6875
/// If enabled (default), bootstrap child processes install
6976
/// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
@@ -212,6 +219,10 @@ pub enum Bootstrap {
212219
backend_addr: ChannelAddr,
213220
/// The callback address used to indicate successful spawning.
214221
callback_addr: ChannelAddr,
222+
/// Directory for storing proc socket files. Procs place their sockets
223+
/// in this directory, so that they can be looked up by other procs
224+
/// for direct transfer.
225+
socket_dir_path: PathBuf,
215226
/// Optional config snapshot (`hyperactor::config::Attrs`)
216227
/// captured by the parent. If present, the child installs it
217228
/// as the `Runtime` layer so the parent's effective config
@@ -324,6 +335,7 @@ impl Bootstrap {
324335
proc_id,
325336
backend_addr,
326337
callback_addr,
338+
socket_dir_path,
327339
config,
328340
} => {
329341
if let Some(attrs) = config {
@@ -343,15 +355,39 @@ impl Bootstrap {
343355
eprintln!("(bootstrap) PDEATHSIG disabled via config");
344356
}
345357

346-
let result =
347-
host::spawn_proc(proc_id, backend_addr, callback_addr, |proc| async move {
348-
ProcMeshAgent::boot_v1(proc).await
349-
})
350-
.await;
351-
match result {
352-
Ok(_proc) => halt().await,
353-
Err(e) => e.into(),
354-
}
358+
let (local_addr, name) = ok!(proc_id
359+
.as_direct()
360+
.ok_or_else(|| anyhow::anyhow!("invalid proc id type: {}", proc_id)));
361+
// TODO provide a direct way to construct these
362+
let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
363+
let serve_addr = serve_addr.parse().unwrap();
364+
365+
// The following is a modified host::spawn_proc to support direct
366+
// dialing between local procs: 1) we bind each proc to a deterministic
367+
// address in socket_dir_path; 2) we use LocalProcDialer to dial these
368+
// addresses for local procs.
369+
let proc_sender = mailbox::LocalProcDialer::new(
370+
local_addr.clone(),
371+
socket_dir_path,
372+
ok!(MailboxClient::dial(backend_addr)),
373+
);
374+
375+
let proc = Proc::new(proc_id.clone(), proc_sender.into_boxed());
376+
377+
let agent_handle = ok!(ProcMeshAgent::boot_v1(proc.clone())
378+
.await
379+
.map_err(|e| HostError::AgentSpawnFailure(proc_id, e)));
380+
381+
// Finally serve the proc on the same transport as the backend address,
382+
// and call back.
383+
let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr));
384+
proc.clone().serve(proc_rx);
385+
ok!(ok!(channel::dial(callback_addr))
386+
.send((proc_addr, agent_handle.bind::<ProcMeshAgent>()))
387+
.await
388+
.map_err(ChannelError::from));
389+
390+
halt().await
355391
}
356392
Bootstrap::Host {
357393
addr,
@@ -369,7 +405,7 @@ impl Bootstrap {
369405
Some(command) => command,
370406
None => ok!(BootstrapCommand::current()),
371407
};
372-
let manager = BootstrapProcManager::new(command);
408+
let manager = BootstrapProcManager::new(command).unwrap();
373409
let (host, _handle) = ok!(Host::serve(manager, addr).await);
374410
let addr = host.addr().clone();
375411
let host_mesh_agent = ok!(host
@@ -1402,6 +1438,11 @@ pub struct BootstrapProcManager {
14021438
/// exclusively in the [`Drop`] impl to send `SIGKILL` without
14031439
/// needing async context.
14041440
pid_table: Arc<std::sync::Mutex<HashMap<ProcId, u32>>>,
1441+
1442+
/// Directory for storing proc socket files. Procs place their sockets
1443+
/// in this directory, so that they can be looked up by other procs
1444+
/// for direct transfer.
1445+
socket_dir: TempDir,
14051446
}
14061447

14071448
impl Drop for BootstrapProcManager {
@@ -1451,12 +1492,13 @@ impl BootstrapProcManager {
14511492
/// This is the general entry point when you want to manage procs
14521493
/// backed by a specific binary path (e.g. a bootstrap
14531494
/// trampoline).
1454-
pub(crate) fn new(command: BootstrapCommand) -> Self {
1455-
Self {
1495+
pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
1496+
Ok(Self {
14561497
command,
14571498
children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
14581499
pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
1459-
}
1500+
socket_dir: tempfile::tempdir()?,
1501+
})
14601502
}
14611503

14621504
/// The bootstrap command used to launch processes.
@@ -1628,6 +1670,7 @@ impl ProcManager for BootstrapProcManager {
16281670
proc_id: proc_id.clone(),
16291671
backend_addr,
16301672
callback_addr,
1673+
socket_dir_path: self.socket_dir.path().to_owned(),
16311674
config: Some(cfg),
16321675
};
16331676
let mut cmd = Command::new(&self.command.program);
@@ -2062,6 +2105,7 @@ mod tests {
20622105
proc_id: id!(foo[0]),
20632106
backend_addr: ChannelAddr::any(ChannelTransport::Tcp),
20642107
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2108+
socket_dir_path: PathBuf::from("notexist"),
20652109
config: None,
20662110
},
20672111
];
@@ -2119,13 +2163,16 @@ mod tests {
21192163
attrs[MESH_TAIL_LOG_LINES] = 123;
21202164
attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
21212165

2166+
let socket_dir = tempfile::tempdir().unwrap();
2167+
21222168
// Proc case
21232169
{
21242170
let original = Bootstrap::Proc {
21252171
proc_id: id!(foo[42]),
21262172
backend_addr: ChannelAddr::any(ChannelTransport::Unix),
21272173
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
21282174
config: Some(attrs.clone()),
2175+
socket_dir_path: socket_dir.path().to_owned(),
21292176
};
21302177
let env_str = original.to_env_safe_string().expect("encode bootstrap");
21312178
let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
@@ -2165,14 +2212,13 @@ mod tests {
21652212
use std::process::Stdio;
21662213

21672214
use tokio::process::Command;
2168-
use tokio::time::Duration;
21692215

21702216
// Manager; program path is irrelevant for this test.
21712217
let command = BootstrapCommand {
21722218
program: PathBuf::from("/bin/true"),
21732219
..Default::default()
21742220
};
2175-
let manager = BootstrapProcManager::new(command);
2221+
let manager = BootstrapProcManager::new(command).unwrap();
21762222

21772223
// Spawn a long-running child process (sleep 30) with
21782224
// kill_on_drop(true).
@@ -2552,7 +2598,7 @@ mod tests {
25522598
program: PathBuf::from("/bin/true"),
25532599
..Default::default()
25542600
};
2555-
let manager = BootstrapProcManager::new(command);
2601+
let manager = BootstrapProcManager::new(command).unwrap();
25562602

25572603
// Spawn a fast-exiting child.
25582604
let mut cmd = Command::new("true");
@@ -2586,7 +2632,7 @@ mod tests {
25862632
program: PathBuf::from("/bin/sleep"),
25872633
..Default::default()
25882634
};
2589-
let manager = BootstrapProcManager::new(command);
2635+
let manager = BootstrapProcManager::new(command).unwrap();
25902636

25912637
// Spawn a process that will live long enough to kill.
25922638
let mut cmd = Command::new("/bin/sleep");
@@ -2703,7 +2749,8 @@ mod tests {
27032749
let manager = BootstrapProcManager::new(BootstrapCommand {
27042750
program: PathBuf::from("/bin/true"),
27052751
..Default::default()
2706-
});
2752+
})
2753+
.unwrap();
27072754
let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
27082755
assert!(manager.status(&unknown).await.is_none());
27092756
}
@@ -2713,7 +2760,8 @@ mod tests {
27132760
let manager = BootstrapProcManager::new(BootstrapCommand {
27142761
program: PathBuf::from("/bin/sleep"),
27152762
..Default::default()
2716-
});
2763+
})
2764+
.unwrap();
27172765

27182766
// Long-ish child so it's alive while we "steal" it.
27192767
let mut cmd = Command::new("/bin/sleep");
@@ -2752,7 +2800,8 @@ mod tests {
27522800
let manager = BootstrapProcManager::new(BootstrapCommand {
27532801
program: PathBuf::from("/bin/sleep"),
27542802
..Default::default()
2755-
});
2803+
})
2804+
.unwrap();
27562805

27572806
let mut cmd = Command::new("/bin/sleep");
27582807
cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
@@ -3105,8 +3154,6 @@ mod tests {
31053154
instance: &hyperactor::Instance<()>,
31063155
_tag: &str,
31073156
) -> (ProcId, ChannelAddr) {
3108-
let proc_id = id!(bootstrap_child[0]);
3109-
31103157
// Serve a Unix channel as the "backend_addr" and hook it into
31113158
// this test proc.
31123159
let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap();
@@ -3116,6 +3163,9 @@ mod tests {
31163163
// router.
31173164
instance.proc().clone().serve(rx);
31183165

3166+
// We return an arbitrary (but unbound!) unix direct proc id here;
3167+
// it is okay, as we're not testing connectivity.
3168+
let proc_id = ProcId::Direct(ChannelTransport::Unix.any(), "test".to_string());
31193169
(proc_id, backend_addr)
31203170
}
31213171

@@ -3127,7 +3177,7 @@ mod tests {
31273177
.unwrap();
31283178
let (instance, _handle) = root.instance("client").unwrap();
31293179

3130-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3180+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
31313181
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
31323182
let handle = mgr
31333183
.spawn(proc_id.clone(), backend_addr.clone())
@@ -3183,7 +3233,7 @@ mod tests {
31833233
.unwrap();
31843234
let (instance, _handle) = root.instance("client").unwrap();
31853235

3186-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3236+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
31873237

31883238
// Proc identity + host backend channel the child will dial.
31893239
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
@@ -3382,7 +3432,8 @@ mod tests {
33823432
let manager = BootstrapProcManager::new(BootstrapCommand {
33833433
program: std::path::PathBuf::from("/bin/true"), // unused in this test
33843434
..Default::default()
3385-
});
3435+
})
3436+
.unwrap();
33863437
manager.spawn_exit_monitor(proc_id.clone(), handle.clone());
33873438

33883439
// Await terminal status and assert on exit code and stderr

0 commit comments

Comments
 (0)