Skip to content

Commit 9e76b2d

Browse files
bmc-msftdemoray
andauthored
move storage queues to pipeline architecture (#851)
* move storage queues to pipeline architecture * fmt Co-authored-by: Brian Caswell <[email protected]>
1 parent 94ab068 commit 9e76b2d

File tree

65 files changed

+1853
-1774
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1853
-1774
lines changed

sdk/storage/src/core/clients/storage_account_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl std::fmt::Debug for StorageCredentials {
5454
#[derive(Debug, Clone, Copy)]
5555
pub enum ServiceType {
5656
Blob,
57-
// Queue,
57+
Queue,
5858
// File,
5959
Table,
6060
}

sdk/storage_queues/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ url = "2.2"
2929

3030
[dev-dependencies]
3131
tokio = { version = "1.0", features = ["full"] }
32+
uuid = { version = "1.0", features = ["v4"] }
3233

3334
[features]
3435
default = ["enable_reqwest"]

sdk/storage_queues/examples/delete_message.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async fn main() -> azure_core::Result<()> {
3030
.get_messages()
3131
.number_of_messages(2)
3232
.visibility_timeout(Duration::from_secs(5)) // the message will become visible again after 5 secs
33-
.execute()
33+
.into_future()
3434
.await?;
3535

3636
println!("get_response == {:#?}", get_response);
@@ -44,7 +44,7 @@ async fn main() -> azure_core::Result<()> {
4444
let delete_response = queue
4545
.pop_receipt_client(message_to_delete)
4646
.delete()
47-
.execute()
47+
.into_future()
4848
.await?;
4949

5050
println!("delete_response == {:#?}", delete_response);

sdk/storage_queues/examples/get_messages.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async fn main() -> azure_core::Result<()> {
3030
.get_messages()
3131
.number_of_messages(2)
3232
.visibility_timeout(Duration::from_secs(5)) // the message will become visible again after 5 secs
33-
.execute()
33+
.into_future()
3434
.await?;
3535

3636
println!("response == {:#?}", response);
@@ -39,7 +39,7 @@ async fn main() -> azure_core::Result<()> {
3939
.get_messages()
4040
.number_of_messages(2)
4141
.visibility_timeout(Duration::from_secs(10)) // the message will become visible again after 10 secs
42-
.execute()
42+
.into_future()
4343
.await?;
4444
println!("get_messages_response == {:#?}", get_messages_response);
4545

@@ -50,8 +50,11 @@ async fn main() -> azure_core::Result<()> {
5050
let pop_receipt = queue.pop_receipt_client(message_to_update);
5151

5252
let response = pop_receipt
53-
.update(Duration::from_secs(4))
54-
.execute(format!("new body at {}", chrono::Utc::now()))
53+
.update(
54+
format!("new body at {}", chrono::Utc::now()),
55+
Duration::from_secs(4),
56+
)
57+
.into_future()
5558
.await?;
5659
println!("response == {:#?}", response);
5760
}

sdk/storage_queues/examples/list_queues.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ async fn main() -> azure_core::Result<()> {
1919
let queue_service = storage_account.queue_service_client();
2020

2121
println!("getting service stats");
22-
let response = queue_service.get_queue_service_stats().execute().await?;
22+
let response = queue_service
23+
.get_queue_service_stats()
24+
.into_future()
25+
.await?;
2326
println!("get_queue_service_properties.response == {:#?}", response);
2427

2528
println!("getting service properties");
2629
let response = queue_service
2730
.get_queue_service_properties()
28-
.execute()
31+
.into_future()
2932
.await?;
3033
println!("get_queue_service_stats.response == {:#?}", response);
3134

@@ -35,17 +38,16 @@ async fn main() -> azure_core::Result<()> {
3538
.prefix("a")
3639
.include_metadata(true)
3740
.max_results(NonZeroU32::new(2u32).unwrap())
38-
.execute()
39-
.await?;
41+
.into_stream()
42+
.next()
43+
.await;
4044
println!("response == {:#?}", response);
4145

4246
println!("streaming queues");
43-
let mut stream = Box::pin(
44-
queue_service
45-
.list_queues()
46-
.max_results(NonZeroU32::new(3u32).unwrap())
47-
.stream(),
48-
);
47+
let mut stream = queue_service
48+
.list_queues()
49+
.max_results(NonZeroU32::new(3u32).unwrap())
50+
.into_stream();
4951

5052
while let Some(value) = stream.next().await {
5153
let value = value?;

sdk/storage_queues/examples/peek_messages.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async fn main() -> azure_core::Result<()> {
2929
let response = queue
3030
.peek_messages()
3131
.number_of_messages(2)
32-
.execute()
32+
.into_future()
3333
.await?;
3434

3535
println!("response == {:#?}", response);

sdk/storage_queues/examples/put_message.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@ async fn main() -> azure_core::Result<()> {
2525

2626
trace!("putting message");
2727
let response = queue
28-
.put_message()
29-
.client_request_id("optional correlation token")
30-
.execute(format!("Azure SDK for Rust rocks! {}", chrono::Utc::now()))
28+
.put_message(format!("Azure SDK for Rust rocks! {}", chrono::Utc::now()))
29+
.into_future()
3130
.await?;
3231

3332
println!("response == {:#?}", response);

sdk/storage_queues/examples/queue_create.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ async fn main() -> azure_core::Result<()> {
3737
.as_mut()
3838
.insert("created".into(), format!("{:?}", Utc::now()).into());
3939

40-
let response = queue.create().metadata(&metadata).execute().await?;
40+
let response = queue
41+
.create()
42+
.metadata(metadata.clone())
43+
.into_future()
44+
.await?;
4145
println!("response == {:#?}", response);
4246

4347
// let's add some more metadata
@@ -46,15 +50,15 @@ async fn main() -> azure_core::Result<()> {
4650

4751
println!("metadata == {:#?}", metadata);
4852

49-
let response = queue.set_metadata().execute(&metadata).await?;
53+
let response = queue.set_metadata(metadata).into_future().await?;
5054
println!("response == {:#?}", response);
5155

5256
// let's get back the metadata
53-
let response = queue.get_metadata().execute().await?;
57+
let response = queue.get_metadata().into_future().await?;
5458
println!("response == {:#?}", response);
5559

5660
// use two queue stored access policies
57-
let queue_stored_acess_policies = vec![
61+
let policies = vec![
5862
QueueStoredAccessPolicy::new(
5963
"first_sap_read_process",
6064
Utc::now() - Duration::hours(1),
@@ -70,22 +74,15 @@ async fn main() -> azure_core::Result<()> {
7074
.enable_all(),
7175
];
7276

73-
let response = queue
74-
.set_acl()
75-
.execute(&queue_stored_acess_policies)
76-
.await?;
77+
let response = queue.set_acl(policies).into_future().await?;
7778
println!("response == {:#?}", response);
7879

7980
// get the queue ACL
80-
let response = queue.get_acl().execute().await?;
81+
let response = queue.get_acl().into_future().await?;
8182
println!("response == {:#?}", response);
8283

8384
// now let's delete it
84-
let response = queue
85-
.delete()
86-
.client_request_id("myclientid")
87-
.execute()
88-
.await?;
85+
let response = queue.delete().into_future().await?;
8986
println!("response == {:#?}", response);
9087

9188
Ok(())

sdk/storage_queues/src/clients/pop_receipt_client.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use crate::prelude::*;
2-
use crate::requests::*;
1+
use crate::{operations::*, prelude::*};
2+
use azure_core::{Context, Request, Response};
33
use azure_storage::core::clients::StorageClient;
44
use std::sync::Arc;
55

@@ -39,6 +39,14 @@ impl PopReceiptClient {
3939
})
4040
}
4141

42+
pub(crate) async fn send(
43+
&self,
44+
context: &mut Context,
45+
request: &mut Request,
46+
) -> azure_core::Result<Response> {
47+
self.queue_client.send(context, request).await
48+
}
49+
4250
pub(crate) fn storage_client(&self) -> &StorageClient {
4351
self.queue_client.storage_client()
4452
}
@@ -56,16 +64,19 @@ impl PopReceiptClient {
5664
Ok(url)
5765
}
5866

59-
/// Deletes the message. The message must not have been
60-
/// made visible again or this call would fail.
67+
/// Deletes the message. The message must not have been made visible again
68+
/// or this call would fail.
6169
pub fn delete(&self) -> DeleteMessageBuilder {
62-
DeleteMessageBuilder::new(self)
70+
DeleteMessageBuilder::new(self.clone())
6371
}
6472

65-
/// Updates the message.
66-
/// The message must not have been
67-
/// made visible again or this call would fail.
68-
pub fn update(&self, visibility_timeout: impl Into<VisibilityTimeout>) -> UpdateMessageBuilder {
69-
UpdateMessageBuilder::new(self, visibility_timeout)
73+
/// Updates the message. The message must not have been made visible again
74+
/// or this call would fail.
75+
pub fn update(
76+
&self,
77+
body: impl Into<String>,
78+
visibility_timeout: impl Into<VisibilityTimeout>,
79+
) -> UpdateMessageBuilder {
80+
UpdateMessageBuilder::new(self.clone(), body.into(), visibility_timeout.into())
7081
}
7182
}

sdk/storage_queues/src/clients/queue_client.rs

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
use crate::requests::*;
2-
use azure_core::error::{ErrorKind, ResultExt};
3-
use azure_storage::core::clients::{AsStorageClient, StorageAccountClient, StorageClient};
4-
use std::fmt::Debug;
5-
use std::sync::Arc;
1+
use crate::{operations::*, QueueStoredAccessPolicy};
2+
use azure_core::{
3+
error::{ErrorKind, ResultExt},
4+
prelude::*,
5+
Context, Request, Response,
6+
};
7+
use azure_storage::core::clients::{
8+
AsStorageClient, ServiceType, StorageAccountClient, StorageClient,
9+
};
10+
use std::{fmt::Debug, sync::Arc};
611

712
pub trait AsQueueClient<QN: Into<String>> {
813
fn queue_client(&self, queue_name: QN) -> Arc<QueueClient>;
@@ -34,6 +39,16 @@ impl QueueClient {
3439
})
3540
}
3641

42+
pub(crate) async fn send(
43+
&self,
44+
context: &mut Context,
45+
request: &mut Request,
46+
) -> azure_core::Result<Response> {
47+
self.storage_client
48+
.send(context, request, ServiceType::Queue)
49+
.await
50+
}
51+
3752
pub(crate) fn storage_client(&self) -> &StorageClient {
3853
self.storage_client.as_ref()
3954
}
@@ -53,68 +68,77 @@ impl QueueClient {
5368

5469
/// Creates the queue.
5570
pub fn create(&self) -> CreateQueueBuilder {
56-
CreateQueueBuilder::new(self)
71+
CreateQueueBuilder::new(self.clone())
5772
}
5873

5974
/// Deletes the queue.
6075
pub fn delete(&self) -> DeleteQueueBuilder {
61-
DeleteQueueBuilder::new(self)
76+
DeleteQueueBuilder::new(self.clone())
6277
}
6378

64-
/// Sets or clears the queue metadata. The metadata
65-
/// will be passed to the `execute` function of the returned struct.
66-
pub fn set_metadata(&self) -> SetQueueMetadataBuilder {
67-
SetQueueMetadataBuilder::new(self)
79+
/// Sets or clears the queue metadata.
80+
///
81+
/// Keep in mind that keys present on Azure but not included in the passed
82+
/// metadata parameter will be deleted. If you want to keep the preexisting
83+
/// key-value pairs, retrieve them with GetMetadata first and then
84+
/// update/add to the received Metadata struct. Then pass the Metadata back
85+
/// to SetQueueMetadata. If you just want to clear the metadata, just pass
86+
/// an empty Metadata struct.
87+
pub fn set_metadata(&self, metadata: Metadata) -> SetQueueMetadataBuilder {
88+
SetQueueMetadataBuilder::new(self.clone(), metadata)
6889
}
6990

7091
/// Get the queue metadata.
92+
7193
pub fn get_metadata(&self) -> GetQueueMetadataBuilder {
72-
GetQueueMetadataBuilder::new(self)
94+
GetQueueMetadataBuilder::new(self.clone())
7395
}
7496

7597
/// Get the queue ACL. This call returns
7698
/// all the stored access policies associated
7799
/// to the current queue.
78100
pub fn get_acl(&self) -> GetQueueACLBuilder {
79-
GetQueueACLBuilder::new(self)
101+
GetQueueACLBuilder::new(self.clone())
80102
}
81103

82-
/// Set the queue ACL. You can call this function
83-
/// to change or remove already existing stored
84-
/// access policies by modifying the list returned
104+
/// Set the queue ACL. You can call this function to change or remove
105+
/// already existing stored access policies by modifying the list returned
85106
/// by `get_acl`.
86-
pub fn set_acl(&self) -> SetQueueACLBuilder {
87-
SetQueueACLBuilder::new(self)
107+
///
108+
/// While this SDK does not enforce any limit, keep in mind Azure supports a
109+
/// limited number of stored access policies for each queue. More info here
110+
/// [https://docs.microsoft.com/rest/api/storageservices/set-queue-acl#remarks](https://docs.microsoft.com/rest/api/storageservices/set-queue-acl#remarks).
111+
pub fn set_acl(&self, policies: Vec<QueueStoredAccessPolicy>) -> SetQueueACLBuilder {
112+
SetQueueACLBuilder::new(self.clone(), policies)
88113
}
89114

90115
/// Puts a message in the queue. The body will be passed
91116
/// to the `execute` function of the returned struct.
92-
pub fn put_message(&self) -> PutMessageBuilder {
93-
PutMessageBuilder::new(self)
117+
pub fn put_message<S: Into<String>>(&self, message: S) -> PutMessageBuilder {
118+
PutMessageBuilder::new(self.clone(), message.into())
94119
}
95120

96121
/// Peeks, without removing, one or more messages.
97122
pub fn peek_messages(&self) -> PeekMessagesBuilder {
98-
PeekMessagesBuilder::new(self)
123+
PeekMessagesBuilder::new(self.clone())
99124
}
100125

101126
/// Gets, shadowing them, one or more messages.
102127
pub fn get_messages(&self) -> GetMessagesBuilder {
103-
GetMessagesBuilder::new(self)
128+
GetMessagesBuilder::new(self.clone())
104129
}
105130

106131
/// Removes all messages from the queue.
107132
pub fn clear_messages(&self) -> ClearMessagesBuilder {
108-
ClearMessagesBuilder::new(self)
133+
ClearMessagesBuilder::new(self.clone())
109134
}
110135
}
111136

112137
#[cfg(test)]
113138
#[cfg(feature = "test_integration")]
114139
mod integration_tests {
115140
use super::*;
116-
use crate::core::prelude::*;
117-
use crate::queue::clients::AsQueueClient;
141+
use crate::{core::prelude::*, queue::clients::AsQueueClient};
118142

119143
fn get_emulator_client(queue_name: &str) -> Arc<QueueClient> {
120144
let storage_account = StorageAccountClient::new_emulator_default().storage_client();

0 commit comments

Comments
 (0)