Skip to content

Commit 4792009

Browse files
committed
Improve handling of socket errors in the async client
Previously, socket errors in `ttrpc::r#async::Client` were not properly handled. If such errors were occurred during an RPC, the resources associated with the RPC will become leaked, causing the RPC to hang forever. This commit tries to properly deliver socket errors to all pending RPC callers, as well as stop any further RPC requests from being processed. Signed-off-by: Yu Chen <[email protected]>
1 parent e5a5373 commit 4792009

File tree

2 files changed

+20
-4
lines changed

2 files changed

+20
-4
lines changed

src/asynchronous/client.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl Client {
5454
let notify2 = notify.clone();
5555

5656
// Request sender
57-
tokio::spawn(async move {
57+
let request_sender = tokio::spawn(async move {
5858
let mut stream_id: u32 = 1;
5959

6060
while let Some((body, resp_tx)) = rx.recv().await {
@@ -93,14 +93,14 @@ impl Client {
9393
// Response receiver
9494
tokio::spawn(async move {
9595
loop {
96-
let req_map = req_map.clone();
9796
tokio::select! {
9897
_ = notify2.notified() => {
9998
break;
10099
}
101100
res = receive(&mut reader) => {
102101
match res {
103102
Ok((header, body)) => {
103+
let req_map = req_map.clone();
104104
tokio::spawn(async move {
105105
let resp_tx2;
106106
{
@@ -135,7 +135,23 @@ impl Client {
135135
});
136136
}
137137
Err(e) => {
138-
trace!("error {:?}", e);
138+
debug!("Connection closed by the ttRPC server: {}", e);
139+
140+
// Abort the request sender task to prevent incoming RPC requests
141+
// from being processed.
142+
request_sender.abort();
143+
let _ = request_sender.await;
144+
145+
// Take all items out of `req_map`.
146+
let mut map = std::mem::take(&mut *req_map.lock().unwrap());
147+
// Terminate outstanding RPC requests with the error.
148+
for (_stream_id, resp_tx) in map.drain() {
149+
if let Err(_e) = resp_tx.send(Err(e.clone())).await {
150+
warn!("Failed to terminate pending RPC: \
151+
the request has returned");
152+
}
153+
}
154+
139155
break;
140156
}
141157
}

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::result;
1919
use thiserror::Error;
2020

2121
/// The error type for ttrpc.
22-
#[derive(Error, Debug)]
22+
#[derive(Error, Debug, Clone)]
2323
pub enum Error {
2424
#[error("socket err: {0}")]
2525
Socket(String),

0 commit comments

Comments
 (0)