1
1
//!
2
2
3
- use std:: io:: { Read , Write } ;
3
+ use std:: io:: { self , Read , Write } ;
4
4
use std:: net:: TcpStream ;
5
5
use crossbeam_channel:: { Sender , Receiver } ;
6
6
@@ -13,12 +13,22 @@ use logging_core::Logger;
13
13
14
14
use crate :: logging:: { CommunicationEvent , CommunicationSetup , MessageEvent , StateEvent } ;
15
15
16
+ fn tcp_panic ( context : & ' static str , cause : io:: Error ) -> ! {
17
+ // NOTE: some downstream crates sniff out "timely communication error:" from
18
+ // the panic message. Avoid removing or rewording this message if possible.
19
+ // It'd be nice to instead use `panic_any` here with a structured error
20
+ // type, but the panic message for `panic_any` is no good (Box<dyn Any>).
21
+ panic ! ( "timely communication error: {}: {}" , context, cause)
22
+ }
23
+
16
24
/// Repeatedly reads from a TcpStream and carves out messages.
17
25
///
18
26
/// The intended communication pattern is a sequence of (header, message)^* for valid
19
27
/// messages, followed by a header for a zero length message indicating the end of stream.
20
- /// If the stream ends without being shut down, the receive thread panics in an attempt to
21
- /// take down the computation and cause the failures to cascade.
28
+ ///
29
+ /// If the stream ends without being shut down, or if reading from the stream fails, the
30
+ /// receive thread panics with a message that starts with "timely communication error:"
31
+ /// in an attempt to take down the computation and cause the failures to cascade.
22
32
pub fn recv_loop (
23
33
mut reader : TcpStream ,
24
34
targets : Vec < Receiver < MergeQueue > > ,
@@ -56,15 +66,16 @@ pub fn recv_loop(
56
66
57
67
// Attempt to read some more bytes into self.buffer.
58
68
let read = match reader. read ( & mut buffer. empty ( ) ) {
69
+ Err ( x) => tcp_panic ( "reading data" , x) ,
70
+ Ok ( n) if n == 0 => {
71
+ tcp_panic (
72
+ "reading data" ,
73
+ std:: io:: Error :: new ( std:: io:: ErrorKind :: UnexpectedEof , "socket closed" ) ,
74
+ ) ;
75
+ }
59
76
Ok ( n) => n,
60
- Err ( x) => {
61
- // We don't expect this, as socket closure results in Ok(0) reads.
62
- println ! ( "Error: {:?}" , x) ;
63
- 0
64
- } ,
65
77
} ;
66
78
67
- assert ! ( read > 0 ) ;
68
79
buffer. make_valid ( read) ;
69
80
70
81
// Consume complete messages from the front of self.buffer.
@@ -89,7 +100,7 @@ pub fn recv_loop(
89
100
panic ! ( "Clean shutdown followed by data." ) ;
90
101
}
91
102
buffer. ensure_capacity ( 1 ) ;
92
- if reader. read ( & mut buffer. empty ( ) ) . expect ( "read failure" ) > 0 {
103
+ if reader. read ( & mut buffer. empty ( ) ) . unwrap_or_else ( |e| tcp_panic ( "reading EOF" , e ) ) > 0 {
93
104
panic ! ( "Clean shutdown followed by data." ) ;
94
105
}
95
106
}
@@ -111,6 +122,10 @@ pub fn recv_loop(
111
122
///
112
123
/// The intended communication pattern is a sequence of (header, message)^* for valid
113
124
/// messages, followed by a header for a zero length message indicating the end of stream.
125
+ ///
126
+ /// If writing to the stream fails, the send thread panics with a message that starts with
127
+ /// "timely communication error:" in an attempt to take down the computation and cause the
128
+ /// failures to cascade.
114
129
pub fn send_loop (
115
130
// TODO: Maybe we don't need BufWriter with consolidation in writes.
116
131
writer : TcpStream ,
@@ -148,7 +163,7 @@ pub fn send_loop(
148
163
// still be a signal incoming.
149
164
//
150
165
// We could get awoken by more data, a channel closing, or spuriously perhaps.
151
- writer. flush ( ) . expect ( "Failed to flush writer." ) ;
166
+ writer. flush ( ) . unwrap_or_else ( |e| tcp_panic ( "flushing writer" , e ) ) ;
152
167
sources. retain ( |source| !source. is_complete ( ) ) ;
153
168
if !sources. is_empty ( ) {
154
169
std:: thread:: park ( ) ;
@@ -167,7 +182,7 @@ pub fn send_loop(
167
182
}
168
183
} ) ;
169
184
170
- writer. write_all ( & bytes[ ..] ) . expect ( "Write failure in send_loop." ) ;
185
+ writer. write_all ( & bytes[ ..] ) . unwrap_or_else ( |e| tcp_panic ( "writing data" , e ) ) ;
171
186
}
172
187
}
173
188
}
@@ -182,9 +197,9 @@ pub fn send_loop(
182
197
length : 0 ,
183
198
seqno : 0 ,
184
199
} ;
185
- header. write_to ( & mut writer) . expect ( "Failed to write header!" ) ;
186
- writer. flush ( ) . expect ( "Failed to flush writer." ) ;
187
- writer. get_mut ( ) . shutdown ( :: std:: net:: Shutdown :: Write ) . expect ( "Write shutdown failed" ) ;
200
+ header. write_to ( & mut writer) . unwrap_or_else ( |e| tcp_panic ( "writing data" , e ) ) ;
201
+ writer. flush ( ) . unwrap_or_else ( |e| tcp_panic ( "flushing writer" , e ) ) ;
202
+ writer. get_mut ( ) . shutdown ( :: std:: net:: Shutdown :: Write ) . unwrap_or_else ( |e| tcp_panic ( "shutting down writer" , e ) ) ;
188
203
logger. as_mut ( ) . map ( |logger| logger. log ( MessageEvent { is_send : true , header } ) ) ;
189
204
190
205
// Log the send thread's end.
0 commit comments