15
15
*/
16
16
17
17
use std:: {
18
- convert:: TryFrom ,
19
18
env,
20
19
io:: Read ,
21
20
os:: unix:: { fs:: FileTypeExt , net:: UnixListener } ,
@@ -25,6 +24,7 @@ use std::{
25
24
atomic:: { AtomicBool , Ordering } ,
26
25
Arc ,
27
26
} ,
27
+ task:: { ready, Poll } ,
28
28
} ;
29
29
30
30
use async_trait:: async_trait;
@@ -36,7 +36,7 @@ use containerd_shim_protos::{
36
36
ttrpc:: r#async:: Server ,
37
37
types:: introspection:: { self , RuntimeInfo } ,
38
38
} ;
39
- use futures:: StreamExt ;
39
+ use futures:: stream :: { poll_fn , BoxStream , SelectAll , StreamExt } ;
40
40
use libc:: { SIGCHLD , SIGINT , SIGPIPE , SIGTERM } ;
41
41
use log:: { debug, error, info, warn} ;
42
42
use nix:: {
@@ -48,7 +48,6 @@ use nix::{
48
48
unistd:: Pid ,
49
49
} ;
50
50
use oci_spec:: runtime:: Features ;
51
- use signal_hook_tokio:: Signals ;
52
51
use tokio:: { io:: AsyncWriteExt , process:: Command , sync:: Notify } ;
53
52
use which:: which;
54
53
@@ -388,13 +387,76 @@ fn signal_server_started() {
388
387
}
389
388
}
390
389
390
+ #[ cfg( unix) ]
391
+ fn signal_stream ( kind : i32 ) -> std:: io:: Result < BoxStream < ' static , i32 > > {
392
+ use tokio:: signal:: unix:: { signal, SignalKind } ;
393
+ let kind = SignalKind :: from_raw ( kind) ;
394
+ signal ( kind) . map ( |mut sig| {
395
+ // The object returned by `signal` is not a `Stream`.
396
+ // The `poll_fn` function constructs a `Stream` based on a polling function.
397
+ // We need to create a `Stream` so that we can use the `SelectAll` stream "merge"
398
+ // all the signal streams.
399
+ poll_fn ( move |cx| {
400
+ ready ! ( sig. poll_recv( cx) ) ;
401
+ Poll :: Ready ( Some ( kind. as_raw_value ( ) ) )
402
+ } )
403
+ . boxed ( )
404
+ } )
405
+ }
406
+
407
+ #[ cfg( windows) ]
408
+ fn signal_stream ( kind : i32 ) -> std:: io:: Result < BoxStream < ' static , i32 > > {
409
+ use tokio:: signal:: windows:: ctrl_c;
410
+
411
+ // Windows doesn't have similar signal like SIGCHLD
412
+ // We could implement something if required but for now
413
+ // just implement support for SIGINT
414
+ if kind != SIGINT {
415
+ return Err ( std:: io:: Error :: new (
416
+ std:: io:: ErrorKind :: Other ,
417
+ format ! ( "Invalid signal {kind}" ) ,
418
+ ) ) ;
419
+ }
420
+
421
+ ctrl_c ( ) . map ( |mut sig| {
422
+ // The object returned by `signal` is not a `Stream`.
423
+ // The `poll_fn` function constructs a `Stream` based on a polling function.
424
+ // We need to create a `Stream` so that we can use the `SelectAll` stream "merge"
425
+ // all the signal streams.
426
+ poll_fn ( move |cx| {
427
+ ready ! ( sig. poll_recv( cx) ) ;
428
+ Poll :: Ready ( Some ( kind) )
429
+ } )
430
+ . boxed ( )
431
+ } )
432
+ }
433
+
434
+ type Signals = SelectAll < BoxStream < ' static , i32 > > ;
435
+
391
436
#[ cfg_attr( feature = "tracing" , tracing:: instrument( skip_all, level = "info" ) ) ]
392
437
fn setup_signals_tokio ( config : & Config ) -> Signals {
393
- if config. no_reaper {
394
- Signals :: new ( [ SIGTERM , SIGINT , SIGPIPE ] ) . expect ( "new signal failed" )
438
+ #[ cfg( unix) ]
439
+ let signals: & [ i32 ] = if config. no_reaper {
440
+ & [ SIGTERM , SIGINT , SIGPIPE ]
395
441
} else {
396
- Signals :: new ( [ SIGTERM , SIGINT , SIGPIPE , SIGCHLD ] ) . expect ( "new signal failed" )
397
- }
442
+ & [ SIGTERM , SIGINT , SIGPIPE , SIGCHLD ]
443
+ } ;
444
+
445
+ // Windows doesn't have similar signal like SIGCHLD
446
+ // We could implement something if required but for now
447
+ // just listen for SIGINT
448
+ // Note: see comment at the counterpart in synchronous/mod.rs for details.
449
+ #[ cfg( windows) ]
450
+ let signals: & [ i32 ] = & [ SIGINT ] ;
451
+
452
+ let signals: Vec < _ > = signals
453
+ . iter ( )
454
+ . copied ( )
455
+ . map ( signal_stream)
456
+ . collect :: < std:: io:: Result < _ > > ( )
457
+ . expect ( "signal setup failed" ) ;
458
+
459
+ SelectAll :: from_iter ( signals)
398
460
}
399
461
400
462
#[ cfg_attr( feature = "tracing" , tracing:: instrument( skip_all, level = "info" ) ) ]
@@ -408,14 +470,7 @@ async fn handle_signals(signals: Signals) {
408
470
}
409
471
SIGCHLD => loop {
410
472
// Note: see comment at the counterpart in synchronous/mod.rs for details.
411
- match asyncify ( move || {
412
- Ok ( wait:: waitpid (
413
- Some ( Pid :: from_raw ( -1 ) ) ,
414
- Some ( WaitPidFlag :: WNOHANG ) ,
415
- ) ?)
416
- } )
417
- . await
418
- {
473
+ match wait:: waitpid ( Some ( Pid :: from_raw ( -1 ) ) , Some ( WaitPidFlag :: WNOHANG ) ) {
419
474
Ok ( WaitStatus :: Exited ( pid, status) ) => {
420
475
monitor_notify_by_pid ( pid. as_raw ( ) , status)
421
476
. await
@@ -431,7 +486,7 @@ async fn handle_signals(signals: Signals) {
431
486
Ok ( WaitStatus :: StillAlive ) => {
432
487
break ;
433
488
}
434
- Err ( Error :: Nix ( Errno :: ECHILD ) ) => {
489
+ Err ( Errno :: ECHILD ) => {
435
490
break ;
436
491
}
437
492
Err ( e) => {
0 commit comments