@@ -31,18 +31,13 @@ use winapi;
3131use winapi:: { HANDLE , INVALID_HANDLE_VALUE , LPVOID } ;
3232use kernel32;
3333
34+ // some debug bump macros to better track what's going on in case of errors
3435lazy_static ! {
3536 static ref CURRENT_PROCESS_ID : winapi:: ULONG = unsafe { kernel32:: GetCurrentProcessId ( ) } ;
3637 static ref CURRENT_PROCESS_HANDLE : intptr_t = unsafe { kernel32:: GetCurrentProcess ( ) as intptr_t } ;
3738
38- static ref DD_ENABLED : bool = match env:: var_os( "DD" ) {
39- Some ( _) => true ,
40- None => false ,
41- } ;
42- static ref DD2_ENABLED : bool = match env:: var_os( "DD2" ) {
43- Some ( _) => true ,
44- None => false ,
45- } ;
39+ static ref DD_ENABLED : bool = env:: var_os( "IPC_CHANNEL_WIN_DEBUG_DUMP" ) . is_some( ) ;
40+ static ref DD2_ENABLED : bool = env:: var_os( "IPC_CHANNEL_WIN_DEBUG_MORE_DUMP" ) . is_some( ) ;
4641}
4742
4843macro_rules! dd { ( $( $rest: tt) * ) => { if * DD_ENABLED { println!( $( $rest) * ) ; } } }
@@ -111,8 +106,8 @@ impl OsIpcOutOfBandMessage {
111106 }
112107
113108 fn needs_to_be_sent ( & self ) -> bool {
114- self . channel_handles . len ( ) > 0 ||
115- self . shmem_handles . len ( ) > 0 ||
109+ ! self . channel_handles . is_empty ( ) ||
110+ ! self . shmem_handles . is_empty ( ) ||
116111 self . big_data_receiver_handle != 0
117112 }
118113
@@ -378,7 +373,8 @@ impl MessageReader {
378373
379374 // if the remote end closed...
380375 if err != winapi:: ERROR_SUCCESS {
381- panic ! ( "[$ {:?}:{:?}] *** notify_completion: need to handle error! {}" , self . iocp, self . handle, err) ;
376+ // This should never happen
377+ panic ! ( "[$ {:?}:{:?}] *** notify_completion: unhandled error reported! {}" , self . iocp, self . handle, err) ;
382378 }
383379
384380 unsafe {
@@ -399,17 +395,18 @@ impl MessageReader {
399395 }
400396
401397 dd2 ! ( "[$ {:?}:{:?}] start_read ov {:?}" , self . iocp, self . handle, self . ov_ptr( ) ) ;
402- let mut bytes_read: u32 = 0 ;
403398
404- // if the buffer is full, add more space
405399 let buf_len = self . read_buf . len ( ) ;
406400 let mut buf_cap = self . read_buf . capacity ( ) ;
401+ let mut bytes_read: u32 = 0 ;
402+
403+ // if the buffer is full, add more capacity
407404 if buf_cap == buf_len {
408- let more =
409- if buf_cap == 0 { READ_BUFFER_SIZE }
410- else if buf_cap < READ_BUFFER_MAX_GROWTH { buf_cap }
411- else { READ_BUFFER_MAX_GROWTH } ;
412- self . read_buf . reserve ( more ) ;
405+ self . read_buf . reserve ( match buf_cap {
406+ 0 => READ_BUFFER_SIZE ,
407+ 1 ... READ_BUFFER_MAX_GROWTH => buf_cap,
408+ _ => READ_BUFFER_MAX_GROWTH
409+ } ) ;
413410 buf_cap = self . read_buf . capacity ( ) ;
414411 }
415412
@@ -467,8 +464,8 @@ impl MessageReader {
467464
468465 // Err(false) -> something really failed
469466 // Err(true) -> no message
470- // XXX This is dumb, we should return
471- // Result<Option<(...)>,WinError>
467+ // FIXME This is dumb, we should probably make this return Result<Option<(...)>,WinError>
468+ // so that we can pass through the error that's lost in map_err below
472469 fn get_message ( & mut self ) -> Result < ( Vec < u8 > , Vec < OsOpaqueIpcChannel > , Vec < OsIpcSharedMemory > ) , bool > {
473470 let message_lengths = self . message_length ( ) ;
474471 if message_lengths. is_none ( ) {
@@ -480,7 +477,6 @@ impl MessageReader {
480477
481478 // remove this message's bytes from read_buf, or just take read_buf
482479 // if it contains exactly one message
483- //dd!("[$ {:?}:{:?}] rb {:?}", self.iocp, self.handle, self.read_buf);
484480 let msg_buf = if self . read_buf . len ( ) == bytes_needed {
485481 mem:: replace ( & mut self . read_buf , Vec :: with_capacity ( READ_BUFFER_SIZE ) )
486482 } else {
@@ -525,7 +521,6 @@ impl MessageReader {
525521
526522 dd ! ( "[$ {:?}:{:?}] get_message success -> {} bytes, {} channels, {} shmems" ,
527523 self . iocp, self . handle, buf_data. len( ) , channels. len( ) , shmems. len( ) ) ;
528- //dd!("[$ {:?}:{:?}] bd {:?}", self.iocp, self.handle, buf_data);
529524 Ok ( ( buf_data, channels, shmems) )
530525 }
531526}
@@ -635,7 +630,7 @@ impl OsIpcReceiver {
635630 // cancel any outstanding IO request
636631 reader. cancel_io ( ) ;
637632 // this is only okay if we have nothing in the read buf
638- Ok ( reader. read_buf . len ( ) == 0 )
633+ Ok ( reader. read_buf . is_empty ( ) )
639634 }
640635
641636 pub fn consume ( & self ) -> OsIpcReceiver {
@@ -733,7 +728,7 @@ impl OsIpcReceiver {
733728 iocp,
734729 * self . handle as winapi:: ULONG_PTR ,
735730 0 ) ;
736- if ret == ptr :: null_mut ( ) {
731+ if ret. is_null ( ) {
737732 return Err ( WinError :: last ( "CreateIoCompletionPort" ) ) ;
738733 }
739734
@@ -825,7 +820,6 @@ unsafe fn write_buf(handle: HANDLE, bytes: &[u8]) -> Result<(),WinError> {
825820 return Ok ( ( ) ) ;
826821 }
827822 let mut nwritten: u32 = 0 ;
828- //dd!("[c {:?}] writing: {:?}", handle, bytes);
829823 while nwritten < ntowrite {
830824 let mut nwrote: u32 = 0 ;
831825 if kernel32:: WriteFile ( handle,
@@ -839,7 +833,7 @@ unsafe fn write_buf(handle: HANDLE, bytes: &[u8]) -> Result<(),WinError> {
839833 }
840834 nwritten += nwrote;
841835 ntowrite -= nwrote;
842- //dd !("[c {:?}] ... wrote {} bytes, total {}/{} err {}", handle, nwrote, nwritten, bytes.len(), GetLastError());
836+ dd2 ! ( "[c {:?}] ... wrote {} bytes, total {}/{} err {}" , handle, nwrote, nwritten, bytes. len( ) , GetLastError ( ) ) ;
843837 }
844838
845839 Ok ( ( ) )
@@ -908,7 +902,7 @@ impl OsIpcSender {
908902 let raw_handle = kernel32:: OpenProcess ( winapi:: PROCESS_DUP_HANDLE ,
909903 winapi:: FALSE ,
910904 server_pid as winapi:: DWORD ) ;
911- if raw_handle == ptr :: null_mut ( ) {
905+ if raw_handle. is_null ( ) {
912906 return Err ( WinError :: last ( "OpenProcess" ) ) ;
913907 }
914908
@@ -1055,7 +1049,7 @@ impl OsIpcReceiverSet {
10551049 ptr:: null_mut ( ) ,
10561050 0 as winapi:: ULONG_PTR ,
10571051 0 ) ;
1058- if iocp == ptr :: null_mut ( ) {
1052+ if iocp. is_null ( ) {
10591053 return Err ( WinError :: last ( "CreateIoCompletionPort" ) ) ;
10601054 }
10611055
@@ -1070,9 +1064,18 @@ impl OsIpcReceiverSet {
10701064 // use this to identify the receiver
10711065 let receiver_handle = * receiver. handle ;
10721066
1073- // XXX we'll need a mutex here... at least while we loop through
1074- // receivers to find a matching handle when we get a IOCP
10751067 try!( receiver. add_to_iocp ( * self . iocp ) ) ;
1068+
1069+ // FIXME we *may* need a mutex to protect self.receivers --
1070+ // one thread could be adding something to this Set while
1071+ // another is calling select(); the add() could cause the
1072+ // receivers array to reallocate while we're doing stuff with
1073+ // it in select(). That would mean an add() would block while
1074+ // a select() is blocking.
1075+ //
1076+ // A better option would be to have a mutex around a
1077+ // self.receivers_to_add array, and have select drain those
1078+ // and append to self.receivers whenever it's called.
10761079 self . receivers . push ( receiver) ;
10771080
10781081 dd ! ( "[# {:?}] ReceiverSet add {:?}" , * self . iocp, receiver_handle) ;
@@ -1081,7 +1084,7 @@ impl OsIpcReceiverSet {
10811084 }
10821085
10831086 pub fn select ( & mut self ) -> Result < Vec < OsIpcSelectionResult > , WinError > {
1084- assert ! ( self . receivers. len ( ) > 0 , "selecting with no objects?" ) ;
1087+ assert ! ( ! self . receivers. is_empty ( ) , "selecting with no objects?" ) ;
10851088 dd ! ( "[# {:?}] select() with {} receivers" , * self . iocp, self . receivers. len( ) ) ;
10861089
10871090 unsafe {
@@ -1107,7 +1110,7 @@ impl OsIpcReceiverSet {
11071110 } ) ;
11081111
11091112 // if we had prematurely closed elements, just process them first
1110- if selection_results. len ( ) > 0 {
1113+ if ! selection_results. is_empty ( ) {
11111114 return Ok ( selection_results) ;
11121115 }
11131116
@@ -1130,7 +1133,7 @@ impl OsIpcReceiverSet {
11301133 // function call itself failed or timed out.
11311134 // Otherwise, the async IO operation failed, and
11321135 // we want to hand io_err to notify_completion below.
1133- if ov_ptr == ptr :: null_mut ( ) {
1136+ if ov_ptr. is_null ( ) {
11341137 return Err ( WinError :: last ( "GetQueuedCompletionStatus" ) ) ;
11351138 }
11361139
@@ -1192,7 +1195,7 @@ impl OsIpcReceiverSet {
11921195
11931196 // if we didn't dequeue at least one complete message -- we need to loop through GetQueuedCS again;
11941197 // otherwise we're done.
1195- if selection_results. len ( ) > 0 {
1198+ if ! selection_results. is_empty ( ) {
11961199 break ;
11971200 }
11981201 }
@@ -1302,7 +1305,7 @@ impl OsIpcSharedMemory {
13021305 winapi:: FILE_MAP_ALL_ACCESS ,
13031306 0 , 0 , 0 )
13041307 } ;
1305- if address == ptr :: null_mut ( ) {
1308+ if address. is_null ( ) {
13061309 return Err ( WinError :: last ( "MapViewOfFile" ) ) ;
13071310 }
13081311
0 commit comments