Skip to content

Commit 4bebaa0

Browse files
authored
Merge pull request #51 from Tim-Zhang/support-timeout-for-async
Async: Add support for timeout in ttrpc request
2 parents 97be17a + 1d87615 commit 4bebaa0

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ byteorder = "1.3.2"
2020
thiserror = "1.0"
2121

2222
async-trait = { version = "0.1.31", optional = true }
23-
tokio = { version = "0.2", features = ["rt-threaded", "sync", "uds", "stream", "macros", "io-util"], optional = true }
23+
tokio = { version = "0.2", features = ["rt-threaded", "sync", "uds", "stream", "macros", "io-util", "time"], optional = true }
2424
futures = { version = "0.3", optional = true }
2525
tokio-vsock = { version = "0.2.1", optional = true }
2626

src/asynchronous/client.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use protobuf::{CodedInputStream, CodedOutputStream, Message};
88
use std::collections::HashMap;
99
use std::os::unix::io::RawFd;
1010
use std::sync::{Arc, Mutex};
11+
use std::time::Duration;
1112

1213
use crate::common::MESSAGE_TYPE_RESPONSE;
1314
use crate::error::{Error, Result};
@@ -154,10 +155,22 @@ impl Client {
154155
.await
155156
.map_err(|e| Error::Others(format!("Send packet to sender error {:?}", e)))?;
156157

157-
let result = rx
158-
.recv()
159-
.await
160-
.ok_or_else(|| Error::Others("Recive packet from recver error".to_string()))?;
158+
let result: Result<Vec<u8>> = if req.timeout_nano == 0 {
159+
rx.recv()
160+
.await
161+
.ok_or_else(|| Error::Others("Recive packet from recver error".to_string()))?
162+
} else {
163+
let timeout = tokio::time::delay_for(Duration::from_nanos(req.timeout_nano as u64));
164+
165+
tokio::select! {
166+
result = rx.recv() => {
167+
result.ok_or_else(|| Error::Others("Recive packet from recver error".to_string()))?
168+
}
169+
_ = timeout => {
170+
return Err(Error::Others("Recive packet from recver error".to_string()));
171+
}
172+
}
173+
};
161174

162175
let buf = result?;
163176
let mut s = CodedInputStream::from_bytes(&buf);

0 commit comments

Comments
 (0)