1515
1616#[ cfg( any( feature = "vllm" , feature = "sglang" ) ) ]
1717use std:: { future:: Future , pin:: Pin } ;
18- use std:: { io:: Read , sync:: Arc } ;
18+ use std:: { io:: Read , sync:: Arc , time :: Duration } ;
1919
2020use anyhow:: Context ;
2121use dynamo_llm:: {
2222 backend:: ExecutionContext , engines:: StreamingEngine , kv_router:: publisher:: KvMetricsPublisher ,
2323 LocalModel ,
2424} ;
25- use dynamo_runtime:: { protocols:: Endpoint , DistributedRuntime } ;
25+ use dynamo_runtime:: { protocols:: Endpoint , CancellationToken , DistributedRuntime } ;
2626
2727mod flags;
2828pub use flags:: Flags ;
@@ -32,11 +32,7 @@ mod net;
3232mod opt;
3333pub use dynamo_llm:: request_template:: RequestTemplate ;
3434pub use opt:: { Input , Output } ;
35-
36- /// How we identify a namespace/component/endpoint URL.
37- /// Technically the '://' is not part of the scheme but it eliminates several string
38- /// concatenations.
39- const ENDPOINT_SCHEME : & str = "dyn://" ;
35+ mod subprocess;
4036
4137/// When `in=text` the user doesn't need to know the model name, and doesn't need to provide it on
4238/// the command line. Hence it's optional, and defaults to this.
@@ -45,6 +41,8 @@ const INVISIBLE_MODEL_NAME: &str = "dynamo-run";
4541/// The component name for the KV publisher, if used
4642const KV_PUBLISHER_COMPONENT : & str = "kvpublisher" ;
4743
44+ const CHILD_STOP_TIMEOUT : Duration = Duration :: from_secs ( 2 ) ;
45+
4846/// How we identify a python string endpoint
4947#[ cfg( feature = "python" ) ]
5048const PYTHON_STR_SCHEME : & str = "pystr:" ;
@@ -97,6 +95,8 @@ pub async fn run(
9795 // If output is an endpoint we are ingress and don't have a local model, but making an
9896 // empty one cleans up the code.
9997 Output :: Endpoint ( _) => Default :: default ( ) ,
98+
99+ // All other output types have a local model
100100 _ => {
101101 match & maybe_path {
102102 Some ( model_path) => {
@@ -143,7 +143,6 @@ pub async fn run(
143143 _ => None ,
144144 } ;
145145
146- #[ cfg( any( feature = "vllm" , feature = "sglang" ) ) ]
147146 let mut extra: Option < Pin < Box < dyn Future < Output = ( ) > + Send > > > = None ; // vllm and sglang sub-process
148147
149148 let template = if let Some ( path) = flags. request_template . as_ref ( ) {
@@ -184,8 +183,42 @@ pub async fn run(
184183 engine : dynamo_engine_mistralrs:: make_engine ( local_model. path ( ) ) . await ?,
185184 model : Box :: new ( local_model) ,
186185 } ,
187- # [ cfg ( feature = "sglang" ) ]
186+
188187 Output :: SgLang => {
188+ if !local_model. path ( ) . is_dir ( ) {
189+ // TODO Does sglang support GGUF? Can we make it work?
190+ anyhow:: bail!( "`--model-path should point at a HuggingFace repo checkout" ) ;
191+ }
192+ let ( py_script, mut child) = match subprocess:: start (
193+ subprocess:: sglang:: PY ,
194+ local_model. path ( ) ,
195+ flags. tensor_parallel_size ,
196+ if flags. base_gpu_id == 0 {
197+ None
198+ } else {
199+ Some ( flags. base_gpu_id )
200+ } ,
201+ flags. extra_engine_args . as_deref ( ) ,
202+ )
203+ . await
204+ {
205+ Ok ( x) => x,
206+ Err ( err) => {
207+ anyhow:: bail!( "Failed starting sglang sub-process: {err}" ) ;
208+ }
209+ } ;
210+ let cancel_token = cancel_token. clone ( ) ;
211+
212+ // Sub-process cleanup
213+ extra = Some ( Box :: pin ( async move {
214+ stopper ( cancel_token, child, py_script) . await ;
215+ } ) ) ;
216+ let endpoint: Endpoint = subprocess:: ENDPOINT . parse ( ) ?;
217+ EngineConfig :: Dynamic ( endpoint)
218+ }
219+
220+ #[ cfg( feature = "sglang" ) ]
221+ Output :: SgLangLegacy => {
189222 if !local_model. path ( ) . is_dir ( ) {
190223 anyhow:: bail!( "`--model-path should point at a HuggingFace repo checkout" ) ;
191224 }
@@ -295,7 +328,7 @@ pub async fn run(
295328 }
296329
297330 #[ cfg( feature = "vllm" ) ]
298- Output :: Vllm | Output :: Vllm0_8 => {
331+ Output :: Vllm0_8 => {
299332 if flags. base_gpu_id != 0 {
300333 anyhow:: bail!( "vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead." ) ;
301334 }
@@ -318,6 +351,35 @@ pub async fn run(
318351 }
319352 }
320353
354+ // No feature flag because it uses a sub-process, it's very cheap to include
355+ Output :: Vllm => {
356+ if flags. base_gpu_id != 0 {
357+ anyhow:: bail!( "vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead." ) ;
358+ }
359+ let ( py_script, mut child) = match subprocess:: start (
360+ subprocess:: vllm:: PY ,
361+ local_model. path ( ) ,
362+ flags. tensor_parallel_size ,
363+ None , // base_gpu_id. vllm uses CUDA_VISIBLE_DEVICES instead
364+ flags. extra_engine_args . as_deref ( ) ,
365+ )
366+ . await
367+ {
368+ Ok ( x) => x,
369+ Err ( err) => {
370+ anyhow:: bail!( "Failed starting vllm sub-process: {err}" ) ;
371+ }
372+ } ;
373+ let cancel_token = cancel_token. clone ( ) ;
374+
375+ // Sub-process cleanup
376+ extra = Some ( Box :: pin ( async move {
377+ stopper ( cancel_token, child, py_script) . await ;
378+ } ) ) ;
379+ let endpoint: Endpoint = subprocess:: ENDPOINT . parse ( ) ?;
380+ EngineConfig :: Dynamic ( endpoint)
381+ }
382+
321383 #[ cfg( feature = "llamacpp" ) ]
322384 Output :: LlamaCpp => {
323385 if !local_model. path ( ) . is_file ( ) {
@@ -394,11 +456,50 @@ pub async fn run(
394456 }
395457 }
396458
397- #[ cfg( any( feature = "vllm" , feature = "sglang" ) ) ]
398459 // Allow engines to ask main thread to wait on an extra future.
460+ // We use this to stop the vllm and sglang sub-process
399461 if let Some ( extra) = extra {
400462 extra. await ;
401463 }
402464
403465 Ok ( ( ) )
404466}
467+
468+ /// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
469+ /// Keeps the TempPath alive until the child is stopped.
470+ async fn stopper (
471+ cancel_token : CancellationToken ,
472+ mut child : tokio:: process:: Child ,
473+ py_script : tempfile:: TempPath ,
474+ ) {
475+ cancel_token. cancelled ( ) . await ;
476+
477+ // Ask subprocess to stop gracefully
478+ if let Some ( pid) = child. id ( ) {
479+ unsafe { libc:: kill ( pid as i32 , libc:: SIGTERM ) } ;
480+ }
481+
482+ tokio:: select! {
483+ exit = child. wait( ) => {
484+ tracing:: trace!( "vllm sub-process graceful exit" ) ;
485+ match exit {
486+ Ok ( exit_status) if exit_status. success( ) => { }
487+ Ok ( exit_status) => {
488+ // This is nearly always 15 (SIGTERM)
489+ tracing:: trace!( "vllm sub-process non-0 exit: {exit_status}" ) ;
490+ }
491+ Err ( err) => {
492+ tracing:: warn!( "vllm sub-process error getting exit status: {err}" ) ;
493+ }
494+ }
495+ }
496+ _ = tokio:: time:: sleep( CHILD_STOP_TIMEOUT ) => {
497+ // It didn't stop in time, kill it
498+ child. kill( ) . await . expect( "Failed killing vllm subprocess" ) ;
499+ let _ = child. wait( ) . await ;
500+ }
501+ }
502+ // This temporary file contains the python script running the engine. It deletes on drop.
503+ // Keep it alive until the engine has stopped.
504+ drop ( py_script) ;
505+ }
0 commit comments