@@ -31,8 +31,6 @@ use log::{
31
31
use time:: { format_description:: well_known:: Rfc3339 , OffsetDateTime } ;
32
32
33
33
use crate :: error:: Error ;
34
- #[ cfg( windows) ]
35
- use crate :: sys:: windows:: NamedPipeLogger ;
36
34
37
35
pub const LOG_ENV : & str = "RUST_LOG" ;
38
36
@@ -41,22 +39,92 @@ pub struct FifoLogger {
41
39
}
42
40
43
41
impl FifoLogger {
44
- #[ allow( dead_code) ]
45
- pub fn new ( ) -> Result < FifoLogger , io:: Error > {
46
- Self :: with_path ( "log" )
42
+ pub fn new ( _namespace : & str , _id : & str ) -> io:: Result < FifoLogger > {
43
+ #[ cfg( unix) ]
44
+ let logger = Self :: with_path ( "log" ) ?;
45
+
46
+ #[ cfg( windows) ]
47
+ let logger = {
48
+ let pipe_name = format ! ( r"\\.\pipe\containerd-shim-{_namespace}-{_id}-log" ) ;
49
+ Self :: with_named_pipe ( & pipe_name) ?
50
+ } ;
51
+
52
+ Ok ( logger)
47
53
}
48
54
49
55
#[ allow( dead_code) ]
50
- pub fn with_path < P : AsRef < Path > > ( path : P ) -> Result < FifoLogger , io:: Error > {
56
+ pub fn with_path ( path : impl AsRef < Path > ) -> io:: Result < FifoLogger > {
51
57
let f = OpenOptions :: new ( )
52
58
. write ( true )
53
59
. read ( false )
54
60
. create ( false )
55
61
. open ( path) ?;
56
62
57
- Ok ( FifoLogger {
58
- file : Mutex :: new ( f) ,
59
- } )
63
+ Ok ( FifoLogger :: with_file ( f) )
64
+ }
65
+
66
+ pub fn with_file ( file : File ) -> FifoLogger {
67
+ let file = Mutex :: new ( file) ;
68
+ FifoLogger { file }
69
+ }
70
+
71
+ #[ cfg( windows) ]
72
+ pub fn with_named_pipe ( name : & str ) -> io:: Result < FifoLogger > {
73
+ // Containerd on windows expects the log to be a named pipe in the format of \\.\pipe\containerd-<namespace>-<id>-log
74
+ // There is an assumption that there is always only one client connected which is containerd.
75
+ // If there is a restart of containerd then logs during that time period will be lost.
76
+ //
77
+ // https://github.com/containerd/containerd/blob/v1.7.0/runtime/v2/shim_windows.go#L77
78
+ // https://github.com/microsoft/hcsshim/blob/5871d0c4436f131c377655a3eb09fc9b5065f11d/cmd/containerd-shim-runhcs-v1/serve.go#L132-L137
79
+
80
+ use std:: os:: windows:: io:: { AsRawHandle , BorrowedHandle } ;
81
+
82
+ use mio:: { windows:: NamedPipe , Events , Interest , Poll , Token } ;
83
+
84
+ let mut pipe_server = NamedPipe :: new ( name) ?;
85
+
86
+ let file = unsafe { BorrowedHandle :: borrow_raw ( pipe_server. as_raw_handle ( ) ) }
87
+ . try_clone_to_owned ( ) ?;
88
+ let file = File :: from ( file) ;
89
+
90
+ let poll = Poll :: new ( ) ?;
91
+ poll. registry ( ) . register (
92
+ & mut pipe_server,
93
+ Token ( 0 ) ,
94
+ Interest :: READABLE | Interest :: WRITABLE ,
95
+ ) ?;
96
+
97
+ std:: thread:: spawn ( move || {
98
+ let pipe_server = pipe_server;
99
+ let mut poll = poll;
100
+ let mut events = Events :: with_capacity ( 128 ) ;
101
+ let _ = pipe_server. connect ( ) ;
102
+ loop {
103
+ poll. poll ( & mut events, None ) . unwrap ( ) ;
104
+
105
+ for event in events. iter ( ) {
106
+ if event. is_writable ( ) {
107
+ match pipe_server. connect ( ) {
108
+ Ok ( ( ) ) => { }
109
+ Err ( e) if e. kind ( ) == io:: ErrorKind :: Interrupted => {
110
+ // this would block just keep processing
111
+ }
112
+ Err ( e) if e. kind ( ) == std:: io:: ErrorKind :: WouldBlock => {
113
+ // this would block just keep processing
114
+ }
115
+ Err ( e) => {
116
+ panic ! ( "Error connecting to client: {}" , e) ;
117
+ }
118
+ } ;
119
+ }
120
+ if event. is_readable ( ) {
121
+ pipe_server. disconnect ( ) . unwrap ( ) ;
122
+ }
123
+ }
124
+ }
125
+ } ) ;
126
+
127
+ Ok ( FifoLogger :: with_file ( file) )
60
128
}
61
129
}
62
130
@@ -116,29 +184,12 @@ impl log::Log for FifoLogger {
116
184
117
185
fn flush ( & self ) {
118
186
// The logger server may have temporarily shutdown, ignore the error instead of panic.
119
- let _ = self . file . lock ( ) . unwrap ( ) . sync_all ( ) ;
187
+ let _ = self . file . lock ( ) . unwrap ( ) . flush ( ) ;
120
188
}
121
189
}
122
190
123
- pub fn init (
124
- debug : bool ,
125
- default_log_level : & str ,
126
- _namespace : & str ,
127
- _id : & str ,
128
- ) -> Result < ( ) , Error > {
129
- #[ cfg( unix) ]
130
- let logger = FifoLogger :: new ( ) . map_err ( io_error ! ( e, "failed to init logger" ) ) ?;
131
-
132
- // Containerd on windows expects the log to be a named pipe in the format of \\.\pipe\containerd-<namespace>-<id>-log
133
- // There is an assumption that there is always only one client connected which is containerd.
134
- // If there is a restart of containerd then logs during that time period will be lost.
135
- //
136
- // https://github.com/containerd/containerd/blob/v1.7.0/runtime/v2/shim_windows.go#L77
137
- // https://github.com/microsoft/hcsshim/blob/5871d0c4436f131c377655a3eb09fc9b5065f11d/cmd/containerd-shim-runhcs-v1/serve.go#L132-L137
138
- #[ cfg( windows) ]
139
- let logger =
140
- NamedPipeLogger :: new ( _namespace, _id) . map_err ( io_error ! ( e, "failed to init logger" ) ) ?;
141
-
191
+ pub fn init ( debug : bool , default_log_level : & str , namespace : & str , id : & str ) -> Result < ( ) , Error > {
192
+ let logger = FifoLogger :: new ( namespace, id) . map_err ( io_error ! ( e, "failed to init logger" ) ) ?;
142
193
configure_logging_level ( debug, default_log_level) ;
143
194
log:: set_boxed_logger ( Box :: new ( logger) ) ?;
144
195
Ok ( ( ) )
@@ -291,3 +342,130 @@ mod tests {
291
342
assert ! ( contents. contains( "level=error key=\" 1\" b=\" 2\" msg=\" structured!\" " ) ) ;
292
343
}
293
344
}
345
+
346
+ #[ cfg( all( windows, test) ) ]
347
+ mod windows_tests {
348
+ use std:: {
349
+ fs:: OpenOptions ,
350
+ io:: Read ,
351
+ os:: windows:: {
352
+ fs:: OpenOptionsExt ,
353
+ io:: { FromRawHandle , IntoRawHandle } ,
354
+ prelude:: AsRawHandle ,
355
+ } ,
356
+ time:: Duration ,
357
+ } ;
358
+
359
+ use log:: { Log , Record } ;
360
+ use mio:: { windows:: NamedPipe , Events , Interest , Poll , Token } ;
361
+ use windows_sys:: Win32 :: {
362
+ Foundation :: ERROR_PIPE_NOT_CONNECTED , Storage :: FileSystem :: FILE_FLAG_OVERLAPPED ,
363
+ } ;
364
+
365
+ use super :: * ;
366
+
367
+ #[ test]
368
+ fn test_namedpipe_log_can_write_before_client_connected ( ) {
369
+ let ns = "test" . to_string ( ) ;
370
+ let id = "notconnected" . to_string ( ) ;
371
+ let logger = FifoLogger :: new ( & ns, & id) . unwrap ( ) ;
372
+
373
+ // test can write before a reader is connected (should succeed but the messages will be dropped)
374
+ log:: set_max_level ( log:: LevelFilter :: Info ) ;
375
+ let record = Record :: builder ( )
376
+ . level ( log:: Level :: Info )
377
+ . line ( Some ( 1 ) )
378
+ . file ( Some ( "sample file" ) )
379
+ . args ( format_args ! ( "hello" ) )
380
+ . build ( ) ;
381
+ logger. log ( & record) ;
382
+ logger. flush ( ) ;
383
+ }
384
+
385
+ #[ test]
386
+ fn test_namedpipe_log ( ) {
387
+ use std:: fs:: File ;
388
+
389
+ let ns = "test" . to_string ( ) ;
390
+ let id = "clients" . to_string ( ) ;
391
+ let pipe_name = format ! ( "\\ \\ .\\ pipe\\ containerd-shim-{}-{}-log" , ns, id) ;
392
+
393
+ let logger = FifoLogger :: new ( & ns, & id) . unwrap ( ) ;
394
+ let mut client = create_client ( pipe_name. as_str ( ) ) ;
395
+
396
+ log:: set_max_level ( log:: LevelFilter :: Info ) ;
397
+ let kvs: & [ ( & str , i32 ) ] = & [ ( "key" , 1 ) , ( "b" , 2 ) ] ;
398
+ let record = Record :: builder ( )
399
+ . level ( log:: Level :: Info )
400
+ . line ( Some ( 1 ) )
401
+ . key_values ( & kvs)
402
+ . args ( format_args ! ( "hello" ) )
403
+ . build ( ) ;
404
+ logger. log ( & record) ;
405
+ logger. flush ( ) ;
406
+
407
+ let buf = read_message ( & mut client, 73 ) ;
408
+ let message = std:: str:: from_utf8 ( & buf) . unwrap ( ) ;
409
+ assert ! ( message. starts_with( "time=\" " ) , "message was: {:?}" , message) ;
410
+ assert ! (
411
+ message. contains( "level=info key=\" 1\" b=\" 2\" msg=\" hello\" \n " ) ,
412
+ "message was: {:?}" ,
413
+ message
414
+ ) ;
415
+
416
+ // test that we can reconnect after a reader disconnects
417
+ // we need to get the raw handle and drop that as well to force full disconnect
418
+ // and give a few milliseconds for the disconnect to happen
419
+ println ! ( "dropping client" ) ;
420
+ let handle = client. as_raw_handle ( ) ;
421
+ drop ( client) ;
422
+ let f = unsafe { File :: from_raw_handle ( handle) } ;
423
+ drop ( f) ;
424
+ std:: thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
425
+
426
+ let mut client2 = create_client ( pipe_name. as_str ( ) ) ;
427
+ logger. log ( & record) ;
428
+ logger. flush ( ) ;
429
+
430
+ read_message ( & mut client2, 51 ) ;
431
+ }
432
+
433
+ fn read_message ( client : & mut NamedPipe , length : usize ) -> Vec < u8 > {
434
+ let mut poll = Poll :: new ( ) . unwrap ( ) ;
435
+ poll. registry ( )
436
+ . register ( client, Token ( 1 ) , Interest :: READABLE )
437
+ . unwrap ( ) ;
438
+ let mut events = Events :: with_capacity ( 128 ) ;
439
+ let mut buf = vec ! [ 0 ; length] ;
440
+ loop {
441
+ poll. poll ( & mut events, Some ( Duration :: from_millis ( 10 ) ) )
442
+ . unwrap ( ) ;
443
+ match client. read ( & mut buf) {
444
+ Ok ( 0 ) => {
445
+ panic ! ( "Read no bytes from pipe" )
446
+ }
447
+ Ok ( _) => {
448
+ break ;
449
+ }
450
+ Err ( e) if e. raw_os_error ( ) == Some ( ERROR_PIPE_NOT_CONNECTED as i32 ) => {
451
+ panic ! ( "not connected to the pipe" ) ;
452
+ }
453
+ Err ( e) if e. kind ( ) == std:: io:: ErrorKind :: WouldBlock => {
454
+ continue ;
455
+ }
456
+ Err ( e) => panic ! ( "Error reading from pipe: {}" , e) ,
457
+ }
458
+ }
459
+ buf. to_vec ( )
460
+ }
461
+
462
+ fn create_client ( pipe_name : & str ) -> mio:: windows:: NamedPipe {
463
+ let mut opts = OpenOptions :: new ( ) ;
464
+ opts. read ( true )
465
+ . write ( true )
466
+ . custom_flags ( FILE_FLAG_OVERLAPPED ) ;
467
+ let file = opts. open ( pipe_name) . unwrap ( ) ;
468
+
469
+ unsafe { NamedPipe :: from_raw_handle ( file. into_raw_handle ( ) ) }
470
+ }
471
+ }
0 commit comments