Skip to content

Commit 62600f3

Browse files
authored
Add Poller<T> for long-running operations (LROs) (#2759)
* Add Poller<T> for long-running operations (LROs) Resolves #2756 and resolves #2482 * Complete implementation of Poller * Add more examples of Poller to core Also adds a `DEFAULT_POLICY` to Key Vault certificates similar to most other Azure SDK languages. This significantly simplifies examples. * Resolve PR feedback * Move poller methods to convenience client Redefined as extension methods to avoid overwriting the generated code until the emitter has added support for Poller. * Rename DEFAULT_POLICY to DEFAULT_CERTIFICATE_POLICY * Inline default CertificatePolicy creation in every sample
1 parent 5127760 commit 62600f3

File tree

20 files changed

+1740
-292
lines changed

20 files changed

+1740
-292
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/core/azure_core/CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,23 @@
66

77
- Added `get_async_runtime()` and `set_async_runtime()` to allow customers to replace the asynchronous runtime used by the Azure SDK.
88
- Added `PageIterator::continuation_token()` and `PageIterator::with_continuation_token()` to support reconstructing a `PageIterator` in another process or on another machine to continue paging.
9+
- Added `Poller<T>` for long-running operations (LROs).
10+
- Added `Request::set_method()` to allow changing the HTTP method of a request.
11+
- Added `StatusMonitor` for long-running operations.
912

1013
### Breaking Changes
1114

15+
- Added `http::PollerOptions` parameter to `http::poller::get_retry_after`.
16+
- Implemented `FromStr` where `FromStr::Err = Infallible` for `PollerStatus` instead of `From<&str>`.
1217
- Minimum supported Rust version (MSRV) is now 1.85.
1318
- `azure_core::http::Pipeline::new` now takes an `azure_core::http::ClientOptions` which is defined in `azure_core`, but convertible to `typespec_client_core::http::ClientOptions`.
1419
- Moved `process::Executor` to `azure_identity`.
1520
- Removed `Pipeline::replace_policy`.
21+
- Removed unused `location` and `body` modules from `http::poller`.
1622
- Renamed `azure_core::date` to `azure_core::time` and added `azure_core::time::Duration` as the standard "duration" type for the SDK.
23+
- Renamed `http::poller::body_content` to `http::poller::body`.
1724
- Renamed `PagerResult::More { next }` to `continuation`.
25+
- Renamed `PollerStatus::Other` to `PollerStatus::UnknownValue` following [guidelines](https://azure.github.io/azure-sdk/rust_introduction.html#rust-enum-extensible).
1826
- Renamed `TelemetryOptions` to `UserAgentOptions`.
1927
- Renamed `TelemetryPolicy` to `UserAgentPolicy`.
2028

sdk/core/azure_core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ rustc_version.workspace = true
3939
[dev-dependencies]
4040
azure_core_test.workspace = true
4141
azure_identity.workspace = true
42+
azure_security_keyvault_certificates.path = "../../keyvault/azure_security_keyvault_certificates"
4243
azure_security_keyvault_secrets.path = "../../keyvault/azure_security_keyvault_secrets"
4344
criterion.workspace = true
4445
thiserror.workspace = true

sdk/core/azure_core/README.md

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
180180

181181
### Consuming service methods returning `Pager<T>`
182182

183-
If a service call returns multiple values in pages, it would return `Result<Pager<T>>` as a result. You can iterate all items from all pages.
183+
If a service call returns multiple values in pages, it should return `Result<Pager<T>>` as a result. You can iterate all items from all pages.
184184

185185
```rust no_run
186186
use azure_identity::DefaultAzureCredential;
@@ -246,6 +246,120 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
246246
}
247247
```
248248

249+
### Consuming service methods returning `Poller<T>`
250+
251+
If a service call may take a while to process, it should return `Result<Poller<T>>` as a result, representing a long-running operation (LRO).
252+
The `Poller<T>` implements `futures::Stream` so you can asynchronously iterate over each status monitor update:
253+
254+
```rust no_run
255+
use azure_identity::DefaultAzureCredential;
256+
use azure_security_keyvault_certificates::{
257+
CertificateClient, CertificateClientExt,
258+
models::{CreateCertificateParameters, CertificatePolicy, X509CertificateProperties, IssuerParameters},
259+
};
260+
use futures::stream::TryStreamExt as _;
261+
262+
#[tokio::main]
263+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
264+
let credential = DefaultAzureCredential::new()?;
265+
let client = CertificateClient::new(
266+
"https://your-key-vault-name.vault.azure.net/",
267+
credential.clone(),
268+
None,
269+
)?;
270+
271+
// Create a self-signed certificate.
272+
let policy = CertificatePolicy {
273+
x509_certificate_properties: Some(X509CertificateProperties {
274+
subject: Some("CN=DefaultPolicy".into()),
275+
..Default::default()
276+
}),
277+
issuer_parameters: Some(IssuerParameters {
278+
name: Some("Self".into()),
279+
..Default::default()
280+
}),
281+
..Default::default()
282+
};
283+
let body = CreateCertificateParameters {
284+
certificate_policy: Some(policy),
285+
..Default::default()
286+
};
287+
288+
// Wait for the certificate operation to complete.
289+
// The Poller implements futures::Stream and automatically waits between polls.
290+
let mut poller = client.begin_create_certificate("certificate-name", body.try_into()?, None)?;
291+
while let Some(operation) = poller.try_next().await? {
292+
let operation = operation.into_body().await?;
293+
match operation.status.as_deref().unwrap_or("unknown") {
294+
"inProgress" => continue,
295+
"completed" => {
296+
let target = operation.target.ok_or("expected target")?;
297+
println!("Created certificate {}", target);
298+
break;
299+
},
300+
status => Err(format!("operation terminated with status {status}"))?,
301+
}
302+
}
303+
304+
Ok(())
305+
}
306+
```
307+
308+
If you just want to wait until the `Poller<T>` is complete and get the last status monitor, you can await `wait()`:
309+
310+
```rust no_run
311+
use azure_identity::DefaultAzureCredential;
312+
use azure_security_keyvault_certificates::{
313+
CertificateClient, CertificateClientExt,
314+
models::{CreateCertificateParameters, CertificatePolicy, X509CertificateProperties, IssuerParameters},
315+
};
316+
317+
#[tokio::main]
318+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
319+
let credential = DefaultAzureCredential::new()?;
320+
let client = CertificateClient::new(
321+
"https://your-key-vault-name.vault.azure.net/",
322+
credential.clone(),
323+
None,
324+
)?;
325+
326+
// Create a self-signed certificate.
327+
let policy = CertificatePolicy {
328+
x509_certificate_properties: Some(X509CertificateProperties {
329+
subject: Some("CN=DefaultPolicy".into()),
330+
..Default::default()
331+
}),
332+
issuer_parameters: Some(IssuerParameters {
333+
name: Some("Self".into()),
334+
..Default::default()
335+
}),
336+
..Default::default()
337+
};
338+
let body = CreateCertificateParameters {
339+
certificate_policy: Some(policy),
340+
..Default::default()
341+
};
342+
343+
// Wait for the certificate operation to complete and get the last status monitor.
344+
let operation = client
345+
.begin_create_certificate("certificate-name", body.try_into()?, None)?
346+
.wait()
347+
.await?
348+
// Deserialize the CertificateOperation:
349+
.into_body()
350+
.await?;
351+
352+
if matches!(operation.status, Some(status) if status == "completed") {
353+
let target = operation.target.ok_or("expected target")?;
354+
println!("Created certificate {}", target);
355+
}
356+
357+
Ok(())
358+
}
359+
```
360+
361+
Awaiting `wait()` will only fail if the HTTP status code does not indicate successfully fetching the status monitor.
362+
249363
### Replacing the async runtime
250364

251365
Internally, the Azure SDK uses either the `tokio` async runtime (with the `tokio` feature), or it implements asynchronous functionality using functions in the `std` namespace.

sdk/core/azure_core/src/http/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ pub use models::*;
1616
pub use options::*;
1717
pub use pager::*;
1818
pub use pipeline::*;
19+
pub use poller::{Poller, PollerStatus};
1920
pub use request::{Body, Request, RequestContent};
2021
pub use response::{RawResponse, Response};
2122

2223
pub use typespec_client_core::http::response;
2324
pub use typespec_client_core::http::{
24-
new_http_client, AppendToUrlQuery, Context, Format, HttpClient, JsonFormat, Method, NoFormat,
25-
StatusCode, Url,
25+
new_http_client, AppendToUrlQuery, Context, DeserializeWith, Format, HttpClient, JsonFormat,
26+
Method, NoFormat, StatusCode, Url,
2627
};
2728

2829
#[cfg(feature = "xml")]

sdk/core/azure_core/src/http/pager.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4-
use crate::http::{headers::HeaderName, response::Response};
4+
use crate::http::{headers::HeaderName, response::Response, DeserializeWith, Format, JsonFormat};
55
use async_trait::async_trait;
66
use futures::{stream::unfold, FutureExt, Stream};
77
use std::{
@@ -12,8 +12,6 @@ use std::{
1212
sync::{Arc, Mutex},
1313
task,
1414
};
15-
use typespec::Error;
16-
use typespec_client_core::http::{DeserializeWith, Format, JsonFormat};
1715

1816
/// The result of fetching a single page from a [`Pager`], whether there are more pages or paging is done.
1917
pub enum PagerResult<P, C: AsRef<str>> {
@@ -85,10 +83,10 @@ where
8583
pub type Pager<P, F = JsonFormat> = ItemIterator<Response<P, F>>;
8684

8785
#[cfg(not(target_arch = "wasm32"))]
88-
type BoxedStream<P> = Box<dyn Stream<Item = Result<P, Error>> + Send>;
86+
type BoxedStream<P> = Box<dyn Stream<Item = crate::Result<P>> + Send>;
8987

9088
#[cfg(target_arch = "wasm32")]
91-
type BoxedStream<P> = Box<dyn Stream<Item = Result<P, Error>>>;
89+
type BoxedStream<P> = Box<dyn Stream<Item = crate::Result<P>>>;
9290

9391
/// Iterates over a collection of items or individual pages of items from a service.
9492
///
@@ -214,23 +212,23 @@ impl<P: Page> ItemIterator<P> {
214212
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
215213
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + Send + 'static,
216214
#[cfg(not(target_arch = "wasm32"))] F: Fn(Option<C>) -> Fut + Send + 'static,
217-
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + Send + 'static,
215+
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
218216
#[cfg(target_arch = "wasm32")] C: AsRef<str> + 'static,
219217
#[cfg(target_arch = "wasm32")] F: Fn(Option<C>) -> Fut + 'static,
220-
#[cfg(target_arch = "wasm32")] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + 'static,
218+
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
221219
>(
222220
make_request: F,
223221
) -> Self {
224222
Self::from_stream(iter_from_callback(make_request, || None, |_| {}))
225223
}
226224

227-
/// Creates a [`ItemIterator<P>`] from a raw stream of [`Result<P>`](typespec::Result<P>) values.
225+
/// Creates a [`ItemIterator<P>`] from a raw stream of [`Result<P>`](crate::Result<P>) values.
228226
///
229227
/// This constructor is used when you are implementing a completely custom stream and want to use it as a pager.
230228
pub fn from_stream<
231229
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
232-
#[cfg(not(target_arch = "wasm32"))] S: Stream<Item = Result<P, Error>> + Send + 'static,
233-
#[cfg(target_arch = "wasm32")] S: Stream<Item = Result<P, Error>> + 'static,
230+
#[cfg(not(target_arch = "wasm32"))] S: Stream<Item = crate::Result<P>> + Send + 'static,
231+
#[cfg(target_arch = "wasm32")] S: Stream<Item = crate::Result<P>> + 'static,
234232
>(
235233
stream: S,
236234
) -> Self {
@@ -254,7 +252,7 @@ impl<P: Page> ItemIterator<P> {
254252
}
255253

256254
impl<P: Page> futures::Stream for ItemIterator<P> {
257-
type Item = Result<P::Item, Error>;
255+
type Item = crate::Result<P::Item>;
258256

259257
fn poll_next(
260258
self: Pin<&mut Self>,
@@ -398,10 +396,10 @@ impl<P> PageIterator<P> {
398396
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
399397
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + FromStr + Send + 'static,
400398
#[cfg(not(target_arch = "wasm32"))] F: Fn(Option<C>) -> Fut + Send + 'static,
401-
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + Send + 'static,
399+
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
402400
#[cfg(target_arch = "wasm32")] C: AsRef<str> + FromStr + 'static,
403401
#[cfg(target_arch = "wasm32")] F: Fn(Option<C>) -> Fut + 'static,
404-
#[cfg(target_arch = "wasm32")] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + 'static,
402+
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
405403
>(
406404
make_request: F,
407405
) -> Self
@@ -441,8 +439,8 @@ impl<P> PageIterator<P> {
441439
/// This constructor is used when you are implementing a completely custom stream and want to use it as a pager.
442440
pub fn from_stream<
443441
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
444-
#[cfg(not(target_arch = "wasm32"))] S: Stream<Item = Result<P, Error>> + Send + 'static,
445-
#[cfg(target_arch = "wasm32")] S: Stream<Item = Result<P, Error>> + 'static,
442+
#[cfg(not(target_arch = "wasm32"))] S: Stream<Item = crate::Result<P>> + Send + 'static,
443+
#[cfg(target_arch = "wasm32")] S: Stream<Item = crate::Result<P>> + 'static,
446444
>(
447445
stream: S,
448446
) -> Self {
@@ -509,7 +507,7 @@ impl<P> PageIterator<P> {
509507
}
510508

511509
impl<P> futures::Stream for PageIterator<P> {
512-
type Item = Result<P, Error>;
510+
type Item = crate::Result<P>;
513511

514512
fn poll_next(
515513
self: Pin<&mut Self>,
@@ -537,19 +535,19 @@ fn iter_from_callback<
537535
// This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds.
538536
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + Send + 'static,
539537
#[cfg(not(target_arch = "wasm32"))] F: Fn(Option<C>) -> Fut + Send + 'static,
540-
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + Send + 'static,
538+
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
541539
#[cfg(not(target_arch = "wasm32"))] G: Fn() -> Option<C> + Send + 'static,
542540
#[cfg(not(target_arch = "wasm32"))] S: Fn(Option<&str>) + Send + 'static,
543541
#[cfg(target_arch = "wasm32")] C: AsRef<str> + 'static,
544542
#[cfg(target_arch = "wasm32")] F: Fn(Option<C>) -> Fut + 'static,
545-
#[cfg(target_arch = "wasm32")] Fut: Future<Output = Result<PagerResult<P, C>, typespec::Error>> + 'static,
543+
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
546544
#[cfg(target_arch = "wasm32")] G: Fn() -> Option<C> + 'static,
547545
#[cfg(target_arch = "wasm32")] S: Fn(Option<&str>) + 'static,
548546
>(
549547
make_request: F,
550548
get_next: G,
551549
set_next: S,
552-
) -> impl Stream<Item = Result<P, Error>> + 'static {
550+
) -> impl Stream<Item = crate::Result<P>> + 'static {
553551
unfold(
554552
// We flow the `make_request` callback, 'get_next', and `set_next` through the state value so that we can avoid cloning.
555553
(State::Init, make_request, get_next, set_next),

0 commit comments

Comments
 (0)