Skip to content

Commit 9c40736

Browse files
authored
Merge pull request #135 from Kazurin-775/master
Improve handling of socket errors in the async client
2 parents 1786480 + 4792009 commit 9c40736

File tree

2 files changed

+20
-4
lines changed

2 files changed

+20
-4
lines changed

src/asynchronous/client.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl Client {
5454
let notify2 = notify.clone();
5555

5656
// Request sender
57-
tokio::spawn(async move {
57+
let request_sender = tokio::spawn(async move {
5858
let mut stream_id: u32 = 1;
5959

6060
while let Some((body, resp_tx)) = rx.recv().await {
@@ -93,14 +93,14 @@ impl Client {
9393
// Response receiver
9494
tokio::spawn(async move {
9595
loop {
96-
let req_map = req_map.clone();
9796
tokio::select! {
9897
_ = notify2.notified() => {
9998
break;
10099
}
101100
res = receive(&mut reader) => {
102101
match res {
103102
Ok((header, body)) => {
103+
let req_map = req_map.clone();
104104
tokio::spawn(async move {
105105
let resp_tx2;
106106
{
@@ -135,7 +135,23 @@ impl Client {
135135
});
136136
}
137137
Err(e) => {
138-
trace!("error {:?}", e);
138+
debug!("Connection closed by the ttRPC server: {}", e);
139+
140+
// Abort the request sender task to prevent incoming RPC requests
141+
// from being processed.
142+
request_sender.abort();
143+
let _ = request_sender.await;
144+
145+
// Take all items out of `req_map`.
146+
let mut map = std::mem::take(&mut *req_map.lock().unwrap());
147+
// Terminate outstanding RPC requests with the error.
148+
for (_stream_id, resp_tx) in map.drain() {
149+
if let Err(_e) = resp_tx.send(Err(e.clone())).await {
150+
warn!("Failed to terminate pending RPC: \
151+
the request has returned");
152+
}
153+
}
154+
139155
break;
140156
}
141157
}

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::result;
1919
use thiserror::Error;
2020

2121
/// The error type for ttrpc.
22-
#[derive(Error, Debug)]
22+
#[derive(Error, Debug, Clone)]
2323
pub enum Error {
2424
#[error("socket err: {0}")]
2525
Socket(String),

0 commit comments

Comments
 (0)