@@ -54,11 +54,19 @@ impl<S: Stream + Unpin> Stream for NotifyOnEos<S> {
54
54
55
55
struct WorkerService {
56
56
worker_req_tx : mpsc:: UnboundedSender < WorkerRequestMsg > ,
57
+ cancel : CancellationToken ,
57
58
}
58
59
59
60
impl WorkerService {
60
- fn new ( worker_req_tx : mpsc:: UnboundedSender < WorkerRequestMsg > ) -> Self {
61
- Self { worker_req_tx }
61
+ fn new ( worker_req_tx : mpsc:: UnboundedSender < WorkerRequestMsg > ) -> ( Self , CancellationToken ) {
62
+ let cancel = CancellationToken :: new ( ) ;
63
+ (
64
+ Self {
65
+ worker_req_tx,
66
+ cancel : cancel. clone ( ) ,
67
+ } ,
68
+ cancel,
69
+ )
62
70
}
63
71
}
64
72
@@ -73,7 +81,7 @@ impl Service<Request<Body>> for WorkerService {
73
81
74
82
fn call ( & mut self , req : Request < Body > ) -> Self :: Future {
75
83
// create a response in a future.
76
- let cancel = CancellationToken :: new ( ) ;
84
+ let cancel = self . cancel . child_token ( ) ;
77
85
let worker_req_tx = self . worker_req_tx . clone ( ) ;
78
86
let fut = async move {
79
87
let ( res_tx, res_rx) = oneshot:: channel :: < Result < Response < Body > , hyper:: Error > > ( ) ;
@@ -83,7 +91,7 @@ impl Service<Request<Body>> for WorkerService {
83
91
let msg = WorkerRequestMsg {
84
92
req,
85
93
res_tx,
86
- conn_watch : Some ( ob_conn_watch_rx) ,
94
+ conn_watch : Some ( ob_conn_watch_rx. clone ( ) ) ,
87
95
} ;
88
96
89
97
worker_req_tx. send ( msg) ?;
@@ -235,15 +243,21 @@ impl Server {
235
243
match msg {
236
244
Ok ( ( conn, _) ) => {
237
245
tokio:: task:: spawn( async move {
238
- let service = WorkerService :: new( main_worker_req_tx) ;
246
+ let ( service, cancel) = WorkerService :: new( main_worker_req_tx) ;
247
+ let _guard = cancel. drop_guard( ) ;
248
+
239
249
let conn_fut = Http :: new( )
240
250
. serve_connection( conn, service) ;
241
251
242
252
if let Err ( e) = conn_fut. await {
243
253
// Most common cause for these errors are
244
254
// when the client closes the connection
245
255
// before we could send a response
246
- error!( "client connection error ({:?})" , e) ;
256
+ if e. is_incomplete_message( ) {
257
+ debug!( "connection reset ({:?})" , e) ;
258
+ } else {
259
+ error!( "client connection error ({:?})" , e) ;
260
+ }
247
261
}
248
262
} ) ;
249
263
}
0 commit comments