Skip to content

Commit 1160d50

Browse files
authored
Fixing the naive implementation of sleep and adding a tokio based (#1523)
1 parent c999ef3 commit 1160d50

File tree

5 files changed

+60
-9
lines changed

5 files changed

+60
-9
lines changed

sdk/core/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ rustc_version = "0.4"
4949

5050
[dev-dependencies]
5151
env_logger = "0.10"
52-
tokio = { version = "1.0", features = ["default"] }
52+
tokio = { version = "1.0", features = ["default", "macros", "rt", "time"] }
5353
thiserror = "1.0"
5454

5555
[features]
@@ -62,7 +62,8 @@ hmac_openssl = ["dep:openssl"]
6262
test_e2e = []
6363
azurite_workaround = []
6464
xml = ["quick-xml"]
65-
tokio-fs = ["tokio/fs", "tokio/io-util"]
65+
tokio-fs = ["tokio/fs", "tokio/sync", "tokio/io-util"]
66+
tokio-sleep = ["tokio"]
6667

6768
[package.metadata.docs.rs]
6869
features = ["xml", "tokio-fs", "enable_reqwest", "enable_reqwest_gzip", "enable_reqwest_rustls", "hmac_rust", "hmac_openssl", "xml"]

sdk/core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use uuid::Uuid;
4545
#[cfg(feature = "xml")]
4646
pub mod xml;
4747

48-
#[cfg(feature = "tokio")]
4948
pub mod tokio;
5049

5150
pub mod base64;

sdk/core/src/sleep/mod.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#[cfg(not(feature = "tokio-sleep"))]
2+
mod thread;
3+
4+
#[cfg(not(feature = "tokio-sleep"))]
5+
pub use self::thread::{sleep, Sleep};
6+
7+
#[cfg(feature = "tokio-sleep")]
8+
pub use tokio::time::{sleep, Sleep};
9+
10+
// Unit tests
11+
#[cfg(test)]
12+
mod tests {
13+
14+
/// Basic test that launches 10k futures and waits for them to complete
15+
/// Has a high chance of failing if there is a race condition in sleep method
16+
/// Runs quickly otherwise
17+
#[cfg(not(feature = "tokio-sleep"))]
18+
#[tokio::test]
19+
async fn test_timeout() {
20+
use super::*;
21+
use std::time::Duration;
22+
use tokio::task::JoinSet;
23+
24+
let mut join_set = JoinSet::default();
25+
let total = 10000;
26+
for _i in 0..total {
27+
join_set.spawn(async move {
28+
sleep(Duration::from_millis(10)).await;
29+
});
30+
}
31+
32+
loop {
33+
let res =
34+
tokio::time::timeout(std::time::Duration::from_secs(10), join_set.join_next())
35+
.await;
36+
assert!(res.is_ok());
37+
if let Ok(None) = res {
38+
break;
39+
}
40+
}
41+
}
42+
}

sdk/core/src/sleep.rs renamed to sdk/core/src/sleep/thread.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,47 @@
11
use futures::Future;
22
use std::pin::Pin;
3+
use std::sync::atomic::{AtomicBool, Ordering};
4+
use std::sync::Arc;
35
use std::task::{Context, Poll};
46
use std::thread;
57
use std::time::Duration;
68

9+
/// Creates a future that resolves after a specified duration of time.
10+
/// Uses a simple thread based implementation for sleep. A more efficient
11+
/// implementation is available by using the `tokio-sleep` crate feature.
712
pub fn sleep(duration: Duration) -> Sleep {
813
Sleep {
9-
thread: None,
14+
signal: None,
1015
duration,
1116
}
1217
}
1318

1419
#[derive(Debug)]
1520
pub struct Sleep {
16-
thread: Option<thread::JoinHandle<()>>,
21+
signal: Option<Arc<AtomicBool>>,
1722
duration: Duration,
1823
}
1924

2025
impl Future for Sleep {
2126
type Output = ();
2227

2328
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
24-
if let Some(thread) = &self.thread {
25-
if thread.is_finished() {
29+
if let Some(signal) = &self.signal {
30+
if signal.load(Ordering::Acquire) {
2631
Poll::Ready(())
2732
} else {
2833
Poll::Pending
2934
}
3035
} else {
36+
let signal = Arc::new(AtomicBool::new(false));
3137
let waker = cx.waker().clone();
3238
let duration = self.duration;
33-
self.get_mut().thread = Some(thread::spawn(move || {
39+
self.get_mut().signal = Some(signal.clone());
40+
thread::spawn(move || {
3441
thread::sleep(duration);
42+
signal.store(true, Ordering::Release);
3543
waker.wake();
36-
}));
44+
});
3745
Poll::Pending
3846
}
3947
}

sdk/core/src/tokio/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
#[cfg(feature = "tokio-fs")]
12
pub mod fs;

0 commit comments

Comments
 (0)