Skip to content

Commit 8586a66

Browse files
authored
Move storage blobs to pipeline architecture (#843)
1 parent a4a9480 commit 8586a66

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+648
-771
lines changed

sdk/storage/src/account/operations/find_blobs_by_tags.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use crate::core::prelude::*;
2-
use crate::xml::read_xml;
1+
use crate::{core::clients::ServiceType, core::prelude::*, xml::read_xml};
32
use azure_core::headers::{date_from_headers, request_id_from_headers};
43
use azure_core::prelude::*;
54
use azure_core::{collect_pinned_stream, RequestId, Response as HttpResponse};
@@ -57,8 +56,7 @@ impl FindBlobsByTagsBuilder {
5756
let response = self
5857
.client
5958
.storage_account_client()
60-
.pipeline()
61-
.send(&mut self.context, &mut request)
59+
.send(&mut self.context, &mut request, ServiceType::Blob)
6260
.await?;
6361

6462
ListBlobsByTagsResponse::try_from(response).await

sdk/storage/src/core/clients/storage_account_client.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ use crate::{
77
AccountSharedAccessSignatureBuilder, ClientAccountSharedAccessSignature,
88
},
99
};
10-
use azure_core::auth::TokenCredential;
11-
use azure_core::error::{Error, ErrorKind, ResultExt};
12-
use azure_core::{headers, Request};
13-
use azure_core::{headers::*, Pipeline};
14-
use azure_core::{ClientOptions, HttpClient};
10+
use azure_core::{
11+
auth::TokenCredential,
12+
error::{Error, ErrorKind, ResultExt},
13+
headers::*,
14+
ClientOptions, Context, HttpClient, Pipeline, Request, Response,
15+
};
1516
use bytes::Bytes;
1617
use http::method::Method;
1718
use std::sync::Arc;
@@ -446,8 +447,8 @@ impl StorageAccountClient {
446447
None => request.insert_header(CONTENT_LENGTH, "0"),
447448
};
448449

449-
request.insert_header(headers::MS_DATE, time);
450-
request.insert_header(headers::VERSION, AZURE_VERSION);
450+
request.insert_header(MS_DATE, time);
451+
request.insert_header(VERSION, AZURE_VERSION);
451452

452453
// We sign the request only if it is not already signed (with the signature of an
453454
// SAS token for example)
@@ -496,6 +497,17 @@ impl StorageAccountClient {
496497
&self.pipeline
497498
}
498499

500+
pub async fn send(
501+
&self,
502+
context: &mut Context,
503+
request: &mut Request,
504+
service_type: ServiceType,
505+
) -> azure_core::Result<Response> {
506+
self.pipeline
507+
.send(context.insert(service_type), request)
508+
.await
509+
}
510+
499511
/// Prepares' an `azure_core::Request`.
500512
pub(crate) fn blob_storage_request(&self, http_method: http::Method) -> Request {
501513
Request::new(self.blob_storage_url().clone(), http_method)

sdk/storage/src/core/clients/storage_client.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::core::clients::{ServiceType, StorageAccountClient};
22
use crate::operations::*;
3-
use azure_core::error::{Error, ErrorKind};
4-
use azure_core::Request;
3+
use azure_core::{
4+
error::{Error, ErrorKind},
5+
Context, Request, Response,
6+
};
57
use bytes::Bytes;
68
use http::method::Method;
79
use std::sync::Arc;
@@ -94,4 +96,15 @@ impl StorageClient {
9496
self.storage_account_client
9597
.prepare_request(url, method, ServiceType::Blob, request_body)
9698
}
99+
100+
pub async fn send(
101+
&self,
102+
context: &mut Context,
103+
request: &mut Request,
104+
service_type: ServiceType,
105+
) -> azure_core::Result<Response> {
106+
self.storage_account_client
107+
.send(context, request, service_type)
108+
.await
109+
}
97110
}

sdk/storage_blobs/examples/connection_string.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use azure_storage::core::prelude::*;
22
use azure_storage_blobs::prelude::*;
3-
use futures::stream::StreamExt;
3+
use futures::StreamExt;
44
use std::{num::NonZeroU32, time::Duration};
55

66
#[tokio::main]
@@ -20,10 +20,10 @@ async fn main() -> azure_core::Result<()> {
2020
let container_client = storage_client.as_container_client(&container_name);
2121
let blob_service = storage_client.as_blob_service_client();
2222

23-
let mut stream = Box::pin(blob_service.list_containers().stream());
23+
let mut stream = blob_service.list_containers().into_stream();
2424
while let Some(result) = stream.next().await {
25-
let container = result?;
26-
for container in container.incomplete_vector.as_ref() {
25+
let result = result?;
26+
for container in result.containers {
2727
if container.name == container_name {
2828
panic!("The specified container must not exists!");
2929
}
@@ -52,12 +52,10 @@ async fn main() -> azure_core::Result<()> {
5252

5353
let max_results = NonZeroU32::new(3).unwrap();
5454

55-
let mut stream = Box::pin(
56-
container_client
57-
.list_blobs()
58-
.max_results(max_results)
59-
.stream(),
60-
);
55+
let mut stream = container_client
56+
.list_blobs()
57+
.max_results(max_results)
58+
.into_stream();
6159

6260
let mut cnt: i32 = 0;
6361
while let Some(value) = stream.next().await {

sdk/storage_blobs/examples/container_00.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,26 @@ async fn main() -> azure_core::Result<()> {
2323
let container_client = storage_client.as_container_client(container_name);
2424

2525
let max_results = NonZeroU32::new(3).unwrap();
26-
let mut iv = Box::pin(
27-
blob_service_client
28-
.list_containers()
29-
.max_results(max_results)
30-
.stream(),
31-
);
26+
let mut iv = blob_service_client
27+
.list_containers()
28+
.max_results(max_results)
29+
.into_stream();
3230

3331
let mut count = 0;
3432
while let Some(result) = iv.next().await {
35-
let container = result?;
36-
count += container.incomplete_vector.len();
37-
for container in container.incomplete_vector.iter() {
33+
let page = result?;
34+
count += page.containers.len();
35+
for container in page.containers.iter() {
3836
println!("\t{}", container.name);
3937
}
4038
}
4139

4240
println!("List containers returned {} containers.", count);
4341

44-
let mut stream = Box::pin(
45-
container_client
46-
.list_blobs()
47-
.max_results(max_results)
48-
.stream(),
49-
);
42+
let mut stream = container_client
43+
.list_blobs()
44+
.max_results(max_results)
45+
.into_stream();
5046

5147
let mut count = 0;
5248
while let Some(result) = stream.next().await {

sdk/storage_blobs/examples/container_and_blob.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,13 @@ async fn main() -> azure_core::Result<()> {
6464
println!("{:?}", res);
6565

6666
// only get the first set of blobs in the list
67-
let res = Box::pin(
68-
container_client
69-
.list_blobs()
70-
.include_metadata(true)
71-
.stream(),
72-
)
73-
.next()
74-
.await
75-
.expect("stream failed")?;
67+
let res = container_client
68+
.list_blobs()
69+
.include_metadata(true)
70+
.into_stream()
71+
.next()
72+
.await
73+
.expect("stream failed")?;
7674
println!("{:?}", res);
7775

7876
Ok(())

sdk/storage_blobs/examples/count_blobs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ async fn main() -> azure_core::Result<()> {
1919
.as_container_client(&container);
2020

2121
let mut count: usize = 0;
22-
let mut list_blobs = Box::pin(container_client.list_blobs().stream());
22+
let mut list_blobs = container_client.list_blobs().into_stream();
2323
while let Some(list_blobs_response) = list_blobs.next().await {
2424
let list_blobs_response = list_blobs_response?;
2525
count += list_blobs_response.blobs.blobs.len();

sdk/storage_blobs/examples/device_code_flow.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ async fn main() -> azure_core::Result<()> {
8080

8181
// now we enumerate the containers in the
8282
// specified storage account.
83-
let containers = Box::pin(blob_service_client.list_containers().stream())
83+
let containers = blob_service_client
84+
.list_containers()
85+
.into_stream()
8486
.next()
8587
.await
8688
.expect("stream failed")?;

sdk/storage_blobs/examples/emulator_00.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,13 @@ async fn main() -> azure_core::Result<()> {
1818
.await?;
1919
println!("{:?}", res);
2020

21-
let res = Box::pin(
22-
container_client
23-
.list_blobs()
24-
.include_metadata(true)
25-
.stream(),
26-
)
27-
.next()
28-
.await
29-
.expect("stream failed")?;
21+
let res = container_client
22+
.list_blobs()
23+
.include_metadata(true)
24+
.into_stream()
25+
.next()
26+
.await
27+
.expect("stream failed")?;
3028
println!("{:?}", res);
3129

3230
Ok(())

sdk/storage_blobs/examples/list_blobs_00.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ async fn main() -> azure_core::Result<()> {
2222
let blob_service = storage_client.as_blob_service_client();
2323
let container_client = storage_client.as_container_client(&container_name);
2424

25-
let iv = Box::pin(blob_service.list_containers().stream())
25+
let page = blob_service
26+
.list_containers()
27+
.into_stream()
2628
.next()
2729
.await
2830
.expect("stream failed")?;
2931

30-
if iv
31-
.incomplete_vector
32+
if page
33+
.containers
3234
.iter()
3335
.any(|item| item.name == container_name)
3436
{
@@ -55,27 +57,23 @@ async fn main() -> azure_core::Result<()> {
5557
println!("\tAdded blob {}", i);
5658
}
5759

58-
let iv = Box::pin(
59-
container_client
60-
.list_blobs()
61-
.max_results(NonZeroU32::new(3u32).unwrap())
62-
.stream(),
63-
)
64-
.next()
65-
.await
66-
.expect("stream failed")?;
60+
let page = container_client
61+
.list_blobs()
62+
.max_results(NonZeroU32::new(3u32).unwrap())
63+
.into_stream()
64+
.next()
65+
.await
66+
.expect("stream failed")?;
6767

68-
println!("List blob returned {} blobs.", iv.blobs.blobs.len());
69-
for cont in iv.blobs.blobs.iter() {
68+
println!("List blob returned {} blobs.", page.blobs.blobs.len());
69+
for cont in page.blobs.blobs.iter() {
7070
println!("\t{}\t{} bytes", cont.name, cont.properties.content_length);
7171
}
7272

73-
let mut stream = Box::pin(
74-
container_client
75-
.list_blobs()
76-
.max_results(NonZeroU32::new(3u32).unwrap())
77-
.stream(),
78-
);
73+
let mut stream = container_client
74+
.list_blobs()
75+
.max_results(NonZeroU32::new(3u32).unwrap())
76+
.into_stream();
7977

8078
let mut cnt: i32 = 0;
8179
while let Some(value) = stream.next().await {

0 commit comments

Comments
 (0)