Skip to content

Commit ab69b2c

Browse files
committed
Spawn commands into process groups.
1 parent 5032ca0 commit ab69b2c

File tree

5 files changed

+96
-63
lines changed

5 files changed

+96
-63
lines changed

Cargo.lock

Lines changed: 12 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ quickcheck = "1.0.3"
7979
quick_cache = "0.6.16"
8080
rand = "0.9"
8181
rayon = "1.10"
82+
rustix = { version = "1.1.2", features = ["process"] }
8283
reqwest = { version = "0.12", features = ["json"] }
8384
secp256k1 = { version = "0.31.0", features = ["global-context", "hashes", "rand", "serde"] }
8485
serde = { version = "1", features = ["derive", "rc"] }

test-utils/Cargo.toml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,26 @@ edition.workspace = true
55
rust-version.workspace = true
66

77
[features]
8-
ports = ["dep:bincode", "dep:tokio-util"]
9-
run-committee = ["dep:tokio-util"]
10-
verifier = ["dep:alloy", "dep:timeboost-utils"]
11-
maker = [
8+
default = [
9+
"dep:alloy",
10+
"dep:bincode",
1211
"dep:bytes",
1312
"dep:prost",
1413
"dep:quick_cache",
14+
"dep:tokio-util",
15+
"dep:tonic",
1516
"dep:timeboost-config",
1617
"dep:timeboost-proto",
1718
"dep:timeboost-utils",
18-
"dep:tonic"
19+
"ports"
1920
]
21+
ports = ["dep:bincode", "dep:tokio-util"]
2022

2123
[dependencies]
2224
anyhow = { workspace = true }
2325
clap = { workspace = true }
2426
futures = { workspace = true }
27+
rustix = { workspace = true }
2528
tokio = { workspace = true }
2629
tracing = { workspace = true }
2730
# optional
@@ -39,12 +42,10 @@ timeboost-utils = { path = "../timeboost-utils", optional = true }
3942
[[bin]]
4043
name = "block-maker"
4144
path = "src/binaries/block-maker.rs"
42-
required-features = ["maker"]
4345

4446
[[bin]]
4547
name = "block-verifier"
4648
path = "src/binaries/block-verifier.rs"
47-
required-features = ["verifier"]
4849

4950
[[bin]]
5051
name = "port-alloc"
@@ -58,4 +59,3 @@ path = "src/binaries/run.rs"
5859
[[bin]]
5960
name = "run-committee"
6061
path = "src/binaries/run-committee.rs"
61-
required-features = ["run-committee"]

test-utils/src/binaries/run-committee.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use std::{ffi::OsStr, path::PathBuf, time::Duration};
1+
use std::{ffi::OsStr, path::PathBuf};
22

33
use anyhow::{Result, bail};
44
use clap::Parser;
5-
use tokio::{fs::read_dir, process::Command, time::timeout};
5+
use tokio::{fs::read_dir, process::Command};
66
use tokio_util::task::TaskTracker;
77

88
#[derive(Parser, Debug)]
@@ -18,9 +18,6 @@ struct Args {
1818

1919
#[clap(long, short, default_value = "/tmp")]
2020
tmp: PathBuf,
21-
22-
#[clap(long)]
23-
max_duration: Option<u64>,
2421
}
2522

2623
#[tokio::main]
@@ -68,12 +65,7 @@ async fn main() -> Result<()> {
6865
}
6966

7067
tasks.close();
71-
72-
if let Some(d) = args.max_duration {
73-
let _ = timeout(Duration::from_secs(d), tasks.wait()).await;
74-
} else {
75-
tasks.wait().await
76-
}
68+
tasks.wait().await;
7769

7870
Ok(())
7971
}

test-utils/src/binaries/run.rs

Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1+
use std::future::pending;
2+
use std::ops::{Deref, DerefMut};
3+
use std::time::Duration;
14
use std::{ffi::OsStr, process::ExitStatus};
25

36
use anyhow::{Result, anyhow, bail};
47
use clap::Parser;
8+
use futures::FutureExt;
9+
use rustix::process::{Pid, Signal, kill_process_group};
510
use tokio::select;
11+
use tokio::signal::unix::{SignalKind, signal};
12+
use tokio::time::sleep;
613
use tokio::{
714
process::{Child, Command},
815
task::JoinSet,
@@ -18,6 +25,10 @@ struct Args {
1825
#[clap(long, short)]
1926
with: Vec<String>,
2027

28+
/// Optional timeout in seconds.
29+
#[clap(long, short)]
30+
timeout: Option<u64>,
31+
2132
/// Main command to execute.
2233
main: Vec<String>,
2334
}
@@ -26,8 +37,16 @@ struct Args {
2637
async fn main() -> Result<()> {
2738
let args = Args::parse();
2839

40+
let mut term = signal(SignalKind::terminate())?;
41+
let mut intr = signal(SignalKind::interrupt())?;
42+
2943
for exe in &args.exec {
30-
let status = command(exe.split_whitespace())?.status().await?;
44+
let mut pg = ProcessGroup::spawn(exe.split_whitespace())?;
45+
let status = select! {
46+
s = pg.wait() => s?,
47+
_ = term.recv() => return Ok(()),
48+
_ = intr.recv() => return Ok(()),
49+
};
3150
if !status.success() {
3251
bail!("{exe:?} failed with {:?}", status.code());
3352
}
@@ -36,13 +55,19 @@ async fn main() -> Result<()> {
3655
let mut helpers = JoinSet::<Result<ExitStatus>>::new();
3756
for w in args.with {
3857
helpers.spawn(async move {
39-
let mut c = spawn_command(w.split_whitespace())?;
40-
let status = c.wait().await?;
58+
let mut pg = ProcessGroup::spawn(w.split_whitespace())?;
59+
let status = pg.wait().await?;
4160
Ok(status)
4261
});
4362
}
4463

45-
let mut main = spawn_command(args.main)?;
64+
let mut main = ProcessGroup::spawn(args.main)?;
65+
66+
let timeout = if let Some(d) = args.timeout {
67+
sleep(Duration::from_secs(d)).boxed()
68+
} else {
69+
pending().boxed()
70+
};
4671

4772
select! {
4873
status = main.wait() => {
@@ -55,32 +80,55 @@ async fn main() -> Result<()> {
5580
let status = status??;
5681
bail!("helper command exited before main with status {:?}", status)
5782
}
83+
_ = term.recv() => {}
84+
_ = intr.recv() => {}
85+
_ = timeout => eprintln!("timeout")
5886
}
5987

6088
Ok(())
6189
}
6290

63-
fn command<I, S>(it: I) -> Result<Command>
64-
where
65-
I: IntoIterator<Item = S>,
66-
S: AsRef<OsStr>,
67-
{
68-
let mut args = it.into_iter();
69-
let exe = args
70-
.next()
71-
.ok_or_else(|| anyhow!("invalid command-line args"))?;
72-
let mut cmd = Command::new(exe);
73-
for a in args {
74-
cmd.arg(a);
91+
/// Every command is spawned into its own, newly created process group.
92+
struct ProcessGroup(Child, Pid);
93+
94+
impl ProcessGroup {
95+
fn spawn<I, S>(it: I) -> Result<Self>
96+
where
97+
I: IntoIterator<Item = S>,
98+
S: AsRef<OsStr>,
99+
{
100+
let mut args = it.into_iter();
101+
let exe = args
102+
.next()
103+
.ok_or_else(|| anyhow!("invalid command-line args"))?;
104+
let mut cmd = Command::new(exe);
105+
for a in args {
106+
cmd.arg(a);
107+
}
108+
cmd.process_group(0);
109+
let child = cmd.spawn()?;
110+
let id = child.id().ok_or_else(|| anyhow!("child already exited"))?;
111+
let pid = Pid::from_raw(id.try_into()?).ok_or_else(|| anyhow!("invalid pid"))?;
112+
Ok(Self(child, pid))
113+
}
114+
}
115+
116+
impl Drop for ProcessGroup {
117+
fn drop(&mut self) {
118+
let _ = kill_process_group(self.1, Signal::KILL);
75119
}
76-
cmd.kill_on_drop(true);
77-
Ok(cmd)
78120
}
79121

80-
fn spawn_command<I, S>(it: I) -> Result<Child>
81-
where
82-
I: IntoIterator<Item = S>,
83-
S: AsRef<OsStr>,
84-
{
85-
command(it)?.spawn().map_err(|e| e.into())
122+
impl Deref for ProcessGroup {
123+
type Target = Child;
124+
125+
fn deref(&self) -> &Self::Target {
126+
&self.0
127+
}
128+
}
129+
130+
impl DerefMut for ProcessGroup {
131+
fn deref_mut(&mut self) -> &mut Self::Target {
132+
&mut self.0
133+
}
86134
}

0 commit comments

Comments
 (0)