Skip to content

Commit cab39cd

Browse files
authored
Add cancellation example to Cosmos (#457)
1 parent edd64a2 commit cab39cd

File tree

3 files changed

+63
-2
lines changed

3 files changed

+63
-2
lines changed

sdk/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ rustc_version = "0.4"
4040

4141
[dev-dependencies]
4242
env_logger = "0.8"
43-
tokio = { version = "1.0", features = ["default"] }
43+
tokio = { version = "1", features = ["default"] }
4444

4545
[features]
4646
default = ["enable_reqwest"]

sdk/cosmos/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ bytes = "1.0"
3232

3333
[dev-dependencies]
3434
env_logger = "0.8"
35-
tokio = { version = "1.0", features = ["macros"] }
35+
tokio = { version = "1", features = ["macros"] }
3636
hyper = "0.14"
3737
hyper-rustls = "0.22"
3838
reqwest = "0.11.0"
39+
stop-token = { version = "0.6.1", features = ["tokio"] }
3940

4041
[features]
4142
test_e2e = []

sdk/cosmos/examples/cancellation.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use azure_core::prelude::*;
2+
use azure_cosmos::prelude::*;
3+
use stop_token::prelude::*;
4+
use stop_token::StopSource;
5+
use tokio::time::{Duration, Instant};
6+
7+
#[tokio::main]
8+
async fn main() -> azure_cosmos::Result<()> {
9+
env_logger::init();
10+
// First we retrieve the account name and master key from environment variables, and
11+
// create an authorization token.
12+
let account = std::env::var("COSMOS_ACCOUNT").expect("Set env variable COSMOS_ACCOUNT first!");
13+
let master_key =
14+
std::env::var("COSMOS_MASTER_KEY").expect("Set env variable COSMOS_MASTER_KEY first!");
15+
let authorization_token = AuthorizationToken::primary_from_base64(&master_key)?;
16+
17+
// Create a new Cosmos client.
18+
let options = CosmosOptions::default();
19+
let client = CosmosClient::new(account.clone(), authorization_token.clone(), options);
20+
21+
// Create a new database, and time out if it takes more than 1 second.
22+
let options = CreateDatabaseOptions::new();
23+
let future = client.create_database(Context::new(), "my_database", options);
24+
let deadline = Instant::now() + Duration::from_secs(1);
25+
match future.until(deadline).await {
26+
Ok(Ok(r)) => println!("successful response: {:?}", r),
27+
Ok(Err(e)) => println!("request was made but failed: {:?}", e),
28+
Err(_) => println!("request timed out!"),
29+
};
30+
31+
// Create multiple new databases, and cancel them if they don't complete before
32+
// they're sent a stop signal.
33+
let source = StopSource::new();
34+
for _ in 1..10 {
35+
let client = client.clone();
36+
// Clone the stop token for each request.
37+
let stop = source.token();
38+
tokio::spawn(async move {
39+
let options = CreateDatabaseOptions::new();
40+
let future = client.create_database(Context::new(), "my_database", options);
41+
match future.until(stop).await {
42+
Ok(Ok(r)) => println!("successful response: {:?}", r),
43+
Ok(Err(e)) => println!("request was made but failed: {:?}", e),
44+
Err(_) => println!("request was cancelled!"),
45+
};
46+
});
47+
}
48+
49+
tokio::time::sleep(Duration::from_secs(5)).await;
50+
// This causes all cancel tokens to fire. Any request tied to a stop token created
51+
// from this source will be canceled.
52+
println!("cancelling all requests");
53+
drop(source);
54+
// Any request that has not yet completed will be canceled at this point
55+
56+
// Keep the program alive for a bit longer so the tasks get a chance to
57+
// print before exiting.
58+
tokio::time::sleep(Duration::from_millis(200)).await;
59+
Ok(())
60+
}

0 commit comments

Comments
 (0)