Skip to content

Commit d33efb4

Browse files
committed
Remove use of subprocess crate
It has potential reliability issues and does not appear to be actively maintained
1 parent 02b637c commit d33efb4

File tree

3 files changed

+58
-77
lines changed

3 files changed

+58
-77
lines changed

Cargo.lock

Lines changed: 1 addition & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

launcher/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ description = "Text Generation Launcher"
88
[dependencies]
99
clap = { version = "4.4.12", features = ["derive", "env"] }
1010
ctrlc = { version = "3.4.2", features = ["termination"] }
11-
subprocess = "0.2.9"
11+
nix = "0.27.1"
1212
tracing = "0.1.40"
1313
tracing-subscriber = { version = "0.3.18", features = ["json"] }
1414

launcher/src/main.rs

Lines changed: 56 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use clap::Parser;
2+
use nix::sys::signal::{self, Signal};
3+
use nix::unistd::Pid;
24
use std::env;
35
use std::io::{BufRead, BufReader, ErrorKind, Write};
46
use std::path::Path;
5-
use std::process::ExitCode;
7+
use std::process::{Command, ExitCode, Stdio};
68
use std::sync::atomic::{AtomicBool, Ordering};
79
use std::sync::mpsc::TryRecvError;
810
use std::sync::Arc;
@@ -13,7 +15,7 @@ use std::time::{Duration, Instant};
1315
use std::{fs, io};
1416
use std::env::VarError;
1517
use std::ffi::OsString;
16-
use subprocess::{Popen, PopenConfig, PopenError, Redirection};
18+
use std::os::unix::process::CommandExt;
1719
use tracing::info;
1820

1921
// In most cases this gives the best performance for inferencing
@@ -191,8 +193,7 @@ fn main() -> ExitCode {
191193
Err(TryRecvError::Empty) => {
192194
sleep(Duration::from_millis(100));
193195
}
194-
Ok(ShardStatus::Failed((rank, err))) => {
195-
tracing::error!("Shard {rank} failed to start:\n{err}");
196+
Ok(ShardStatus::Failed) => {
196197
shutdown_shards(shutdown, shutdown_receiver);
197198
return ExitCode::FAILURE;
198199
}
@@ -214,7 +215,6 @@ fn main() -> ExitCode {
214215
// Start webserver
215216
info!("Starting Router");
216217
let mut argv = vec![
217-
"text-generation-router".to_string(),
218218
"--max-concurrent-requests".to_string(),
219219
args.max_concurrent_requests.to_string(),
220220
"--max-sequence-length".to_string(),
@@ -271,27 +271,20 @@ fn main() -> ExitCode {
271271
argv.push("--default-include-stop-seqs".into());
272272
}
273273

274-
let mut webserver = match Popen::create(
275-
&argv,
276-
PopenConfig {
277-
stdout: Redirection::Pipe,
278-
stderr: Redirection::Pipe,
279-
// Needed for the shutdown procedure
280-
setpgid: true,
281-
// env: Some(vec![("RUST_BACKTRACE".into(), "1".into())]),
282-
..Default::default()
283-
},
284-
) {
274+
let mut webserver = match Command::new("text-generation-router")
275+
.args(argv)
276+
.stdout(Stdio::piped())
277+
.stderr(Stdio::piped())
278+
.process_group(0)
279+
.spawn()
280+
{
285281
Ok(p) => p,
286282
Err(err) => {
287-
tracing::error!("Failed to start webserver: {err}");
288-
if let PopenError::IoError(err) = err {
289-
if err.kind() == io::ErrorKind::NotFound {
290-
tracing::error!("text-generation-router not found in PATH");
291-
tracing::error!("Please install it with `make install-router`")
292-
}
283+
if err.kind() == ErrorKind::NotFound {
284+
tracing::error!("text-generation-router not found in PATH");
285+
tracing::error!("Please install it with `make install-router`")
293286
} else {
294-
tracing::error!("{err}");
287+
tracing::error!("Failed to start webserver: {err}");
295288
}
296289

297290
shutdown_shards(shutdown, &shutdown_receiver);
@@ -318,28 +311,25 @@ fn main() -> ExitCode {
318311
let mut exit_code = ExitCode::SUCCESS;
319312

320313
while running.load(Ordering::SeqCst) {
321-
if let Ok(ShardStatus::Failed((rank, err))) = status_receiver.try_recv() {
322-
tracing::error!("Shard {rank} failed: {err}");
314+
if let Ok(ShardStatus::Failed) = status_receiver.try_recv() {
323315
exit_code = ExitCode::FAILURE;
324316
break;
325317
};
326318

327-
match webserver.poll() {
319+
match webserver.try_wait().expect("Error polling status of router process") {
328320
Some(_) => {
329321
tracing::error!("Webserver Crashed");
330322
shutdown_shards(shutdown, &shutdown_receiver);
331323
return ExitCode::FAILURE;
332-
}
333-
None => {
334-
sleep(Duration::from_millis(100));
335-
}
324+
},
325+
None => sleep(Duration::from_millis(100)),
336326
};
337327
}
338328

339329
// Graceful termination
340-
webserver.terminate().unwrap();
330+
signal::kill(Pid::from_raw(webserver.id() as i32), Signal::SIGTERM).unwrap();
341331
info!("Waiting for router to gracefully shutdown");
342-
webserver.wait_timeout(Duration::from_secs(120)).unwrap();
332+
webserver.wait().unwrap();
343333
info!("Router terminated");
344334
shutdown_shards(shutdown, &shutdown_receiver);
345335

@@ -392,7 +382,7 @@ fn find_num_shards(num_shard: Option<usize>) -> usize {
392382
#[derive(Debug)]
393383
enum ShardStatus {
394384
Ready,
395-
Failed((usize, String)),
385+
Failed,
396386
}
397387

398388
#[allow(clippy::too_many_arguments)]
@@ -421,11 +411,12 @@ fn shard_manager(
421411
let uds_string = format!("{uds_path}-{rank}");
422412
let uds = Path::new(&uds_string);
423413
// Clean previous runs
424-
fs::remove_file(uds).unwrap_or_default();
414+
if uds.exists() {
415+
fs::remove_file(uds).unwrap_or_default();
416+
}
425417

426418
// Process args
427419
let mut shard_argv = vec![
428-
"text-generation-server".to_string(),
429420
"serve".to_string(),
430421
model_name,
431422
deployment_framework,
@@ -517,30 +508,24 @@ fn shard_manager(
517508

518509
// Start process
519510
info!("Starting shard {rank}");
520-
let mut p = match Popen::create(
521-
&shard_argv,
522-
PopenConfig {
523-
stdout: Redirection::Pipe,
524-
stderr: Redirection::Pipe,
525-
// Needed for the shutdown procedure
526-
setpgid: true,
527-
// NCCL env vars
528-
env: Some(env),
529-
..Default::default()
530-
},
531-
) {
511+
let mut p = match Command::new("text-generation-server")
512+
.args(shard_argv)
513+
.envs(env)
514+
.stdout(Stdio::piped())
515+
.stderr(Stdio::piped())
516+
.process_group(0)
517+
.spawn()
518+
{
532519
Ok(p) => p,
533520
Err(err) => {
534-
if let PopenError::IoError(ref err) = err {
535-
if err.kind() == io::ErrorKind::NotFound {
536-
tracing::error!("text-generation-server not found in PATH");
537-
tracing::error!("Please install it with `make install-server`")
538-
}
521+
if err.kind() == ErrorKind::NotFound {
522+
tracing::error!("text-generation-server not found in PATH");
523+
tracing::error!("Please install it with `make install-server`")
524+
} else {
525+
tracing::error!("Shard {rank} failed to start:\n{err}");
539526
}
540-
status_sender
541-
.send(ShardStatus::Failed((rank, err.to_string())))
542-
.unwrap();
543-
return;
527+
status_sender.send(ShardStatus::Failed).unwrap();
528+
return
544529
}
545530
};
546531

@@ -563,20 +548,26 @@ fn shard_manager(
563548
let mut wait_time = Instant::now();
564549
loop {
565550
// Process exited
566-
if let Some(status) = p.poll() {
567-
// Ensure we finish propagating any final stdout/stderr from the shard
568-
stdout_thread.join().unwrap_or_default();
569-
io::stdout().flush().unwrap_or_default();
570-
stderr_thread.join().unwrap_or_default();
571-
io::stderr().flush().unwrap_or_default();
572-
status_sender.send(ShardStatus::Failed((rank, format!("{status:?}")))).unwrap();
551+
if let Some(status) = p.try_wait()
552+
.expect(&*format!("Error polling status of shard {rank}")) {
553+
if *shutdown.lock().unwrap() {
554+
info!("Shard {rank} terminated");
555+
} else {
556+
tracing::error!("Shard {rank} failed: {status:?}");
557+
// Ensure we finish propagating any final stdout/stderr from the shard
558+
stdout_thread.join().unwrap_or_default();
559+
io::stdout().flush().unwrap_or_default();
560+
stderr_thread.join().unwrap_or_default();
561+
io::stderr().flush().unwrap_or_default();
562+
status_sender.send(ShardStatus::Failed).unwrap();
563+
}
573564
return
574565
}
575566

576567
// We received a shutdown signal
577568
if *shutdown.lock().unwrap() {
578-
p.terminate().unwrap();
579-
let _ = p.wait_timeout(Duration::from_secs(90));
569+
p.kill().unwrap();
570+
let _ = p.wait().unwrap();
580571
info!("Shard {rank} terminated");
581572
return
582573
}

0 commit comments

Comments
 (0)