Skip to content

Commit 3345512

Browse files
committed
This commit contains the following changes:
* Regenerated client based on new TSP * Switching set and get metadata to use the generated client. * Added get_statistics * Updated samples to show how to pass realistic parameters.
1 parent 3739f2c commit 3345512

21 files changed

+965
-920
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/storage/azure_storage_queue/examples/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ This directory contains a set of example for the use of the Storage Queue client
66

77
The following environment variables need to be set:
88

9-
- AZURE_QUEUE_STORAGE_ACCOUNT=https://<storage_account_name>.queue.core.windows.net/ - needs to include "https://" and trailing '/'
9+
- AZURE_QUEUE_STORAGE_ACCOUNT_NAME=<storage_account_name>
1010

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,32 @@
11
pub fn get_endpoint() -> String {
22
// Retrieve the storage account endpoint from environment variable.
3-
let endpoint = std::env::var("AZURE_QUEUE_STORAGE_ACCOUNT");
4-
let endpoint = match endpoint {
3+
let storage_account_name = std::env::var("AZURE_QUEUE_STORAGE_ACCOUNT_NAME");
4+
let storage_account_name = match storage_account_name {
55
Ok(url) => url,
66
Err(_) => {
7-
eprintln!("Environment variable AZURE_QUEUE_STORAGE_ACCOUNT is not set");
7+
eprintln!("Environment variable AZURE_QUEUE_STORAGE_ACCOUNT_NAME is not set");
88
std::process::exit(1);
99
}
1010
};
1111

12-
// Validate endpoint format
13-
if !endpoint.ends_with("/") || !endpoint.starts_with("https://") {
14-
eprintln!("Endpoint must start with 'https://' and end with '/'");
15-
std::process::exit(1);
16-
}
17-
endpoint
12+
format!("https://{}.queue.core.windows.net/", storage_account_name)
13+
}
14+
15+
// This function is used only for the queue service client example, hence the `allow(dead_code)` attribute.
16+
#[allow(dead_code)]
17+
pub fn get_secondary_endpoint() -> String {
18+
// Retrieve the storage account endpoint from environment variable.
19+
let storage_account_name = std::env::var("AZURE_QUEUE_STORAGE_ACCOUNT_NAME");
20+
let storage_account_name = match storage_account_name {
21+
Ok(url) => url,
22+
Err(_) => {
23+
eprintln!("Environment variable AZURE_QUEUE_STORAGE_ACCOUNT_NAME is not set");
24+
std::process::exit(1);
25+
}
26+
};
27+
28+
format!(
29+
"https://{}-secondary.queue.core.windows.net/",
30+
storage_account_name
31+
)
1832
}

sdk/storage/azure_storage_queue/examples/queue_client.rs

Lines changed: 87 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@ use helpers::endpoint::get_endpoint;
55
use helpers::logs::log_operation_result;
66
use helpers::random_queue_name::get_random_queue_name;
77

8-
use azure_core::http::RequestContent;
98
use azure_identity::DefaultAzureCredential;
109
use azure_storage_queue::{
1110
clients::QueueClient,
1211
models::{
13-
QueueMessage, QueueMessageIdOperationGroupClientUpdateOptions,
14-
QueueMessagesOperationGroupClientDequeueOptions,
15-
QueueMessagesOperationGroupClientPeekOptions,
12+
ListOfSignedIdentifier, QueueClientDequeueOptions, QueueClientGetMetadataResultHeaders,
13+
QueueClientPeekOptions, QueueClientSetMetadataOptions, QueueClientUpdateOptions,
14+
QueueMessage,
1615
},
1716
};
1817

@@ -59,12 +58,12 @@ async fn send_and_update_message(
5958
"Updating message with ID: {} and pop receipt: {}",
6059
message_id, pop_receipt
6160
);
62-
let message_xml_string = quick_xml::se::to_string(&QueueMessage {
61+
let queue_message = QueueMessage {
6362
message_text: Some("Updated message text from Rust".to_string()),
64-
});
65-
let update_option = QueueMessageIdOperationGroupClientUpdateOptions {
63+
};
64+
let update_option = QueueClientUpdateOptions {
6665
// Serialize the message text as bytes for the update
67-
queue_message: Some(RequestContent::from(message_xml_string?.into_bytes())),
66+
queue_message: Some(queue_message.try_into()?),
6867
..Default::default()
6968
};
7069
let update_result = queue_client
@@ -78,20 +77,86 @@ async fn send_and_update_message(
7877
Ok(())
7978
}
8079

81-
async fn get_and_set_access_policies(
80+
async fn set_and_get_access_policies(
8281
queue_client: &QueueClient,
8382
) -> Result<(), Box<dyn std::error::Error>> {
83+
// .checked_add(std::time::Duration::from_secs(3600)) // 1 hour from now
84+
// .ok_or("Failed to calculate expiry time")?;
85+
// let acl = ListOfSignedIdentifier {
86+
// items: Some(vec![SignedIdentifier {
87+
// id: Some("policy1".to_string()),
88+
// access_policy: Some(AccessPolicy {
89+
// start: Some(OffsetDateTime::now_utc()),
90+
// expiry: Some(expiry_time.into()),
91+
// permission: Some("raup".to_string()),
92+
// }),
93+
// }]),
94+
// };
95+
96+
// let acl_xml = quick_xml::se::to_string(&acl);
97+
// println!("Access Policy XML: {}", acl_xml?);
98+
99+
// let acl_xml = "<SignedIdentifiers>
100+
// <SignedIdentifier>
101+
// <AccessPolicy>
102+
// <Expiry>2025-06-27T15:02:39.351158345Z</Expiry>
103+
// <Permission>raup</Permission>
104+
// <Start>2025-06-26T14:02:39.351160525Z</Start>
105+
// </AccessPolicy>
106+
// <Id>MTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTI=</Id>
107+
// </SignedIdentifier>
108+
// </SignedIdentifiers>";
109+
110+
// let result = queue_client
111+
// .set_access_policy(acl_xml.try_into()?, None)
112+
// .await;
113+
// TODO: Fix set and get access policies
114+
let acl = ListOfSignedIdentifier {
115+
..Default::default()
116+
};
117+
let result = queue_client.set_access_policy(acl.try_into()?, None).await;
118+
log_operation_result(&result, "set_access_policy");
119+
84120
let result = queue_client.get_access_policy(None).await;
85121
log_operation_result(&result, "get_access_policy");
86-
87122
let properties = result.unwrap().into_body().await?;
88-
let properties_xml = quick_xml::se::to_string(&properties)?;
89-
let properties_bytes = properties_xml.into_bytes();
123+
if let Some(policies) = properties.items {
124+
for policy in policies {
125+
println!(
126+
"Access Policy - Id: {}, Start: {:?}, Expiry: {:?}, Permissions: {}",
127+
&policy.id.unwrap_or_default(),
128+
policy.access_policy.clone().unwrap().start.unwrap(),
129+
policy.access_policy.clone().unwrap().expiry.unwrap(),
130+
policy.access_policy.clone().unwrap().permission.unwrap()
131+
);
132+
}
133+
} else {
134+
println!("No access policies found.");
135+
}
90136

91-
let result = queue_client
92-
.set_access_policy(RequestContent::from(properties_bytes), None)
93-
.await;
94-
log_operation_result(&result, "set_access_policy");
137+
Ok(())
138+
}
139+
140+
async fn set_and_get_metadata(
141+
queue_client: &QueueClient,
142+
) -> Result<(), Box<dyn std::error::Error>> {
143+
let metadata_options = Some(QueueClientSetMetadataOptions {
144+
metadata: Some(HashMap::from([
145+
("key1".to_string(), "value1".to_string()),
146+
("key2".to_string(), "value2".to_string()),
147+
])),
148+
..Default::default()
149+
});
150+
let result = queue_client.set_metadata(metadata_options).await;
151+
log_operation_result(&result, "set_metadata");
152+
153+
let result = queue_client.get_metadata(None).await;
154+
log_operation_result(&result, "get_metadata");
155+
156+
let metadata = result.unwrap().metadata().unwrap_or_default();
157+
for (key, value) in metadata {
158+
println!("Metadata - {}: {}", key, value);
159+
}
95160

96161
Ok(())
97162
}
@@ -106,7 +171,7 @@ async fn peek_and_receive_messages(
106171
.enqueue_message("Message 2 from Rust Queue SDK", None)
107172
.await;
108173

109-
let options = QueueMessagesOperationGroupClientPeekOptions {
174+
let options = QueueClientPeekOptions {
110175
number_of_messages: Some(5),
111176
..Default::default()
112177
};
@@ -127,7 +192,7 @@ async fn peek_and_receive_messages(
127192
}
128193
}
129194

130-
let options = QueueMessagesOperationGroupClientDequeueOptions {
195+
let options = QueueClientDequeueOptions {
131196
number_of_messages: Some(5),
132197
..Default::default()
133198
};
@@ -161,10 +226,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
161226
let queue_name = get_random_queue_name();
162227
let queue_client = QueueClient::new(&endpoint, &queue_name, credential.clone(), None)?;
163228

164-
// Get queue service properties
165-
let result = queue_client.get_properties(None).await;
166-
log_operation_result(&result, "get_properties");
167-
168229
// Create and manage queue
169230
let result = queue_client.create(None).await;
170231
log_operation_result(&result, "create");
@@ -175,10 +236,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
175236
let result = queue_client.create_if_not_exists(None).await;
176237
log_operation_result(&result, "create_if_not_exists");
177238

178-
// Set queue metadata
179-
let metadata = HashMap::from([("key1", "value1"), ("key2", "value2")]);
180-
let result = queue_client.set_metadata(Some(metadata)).await;
181-
log_operation_result(&result, "set_metadata");
239+
// Set and get queue metadata
240+
set_and_get_metadata(&queue_client).await?;
182241

183242
let result = queue_client.enqueue_message("Example Message", None).await;
184243
log_operation_result(&result, "enqueue_message");
@@ -203,7 +262,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
203262
// Receive messages
204263
peek_and_receive_messages(&queue_client).await?;
205264

206-
get_and_set_access_policies(&queue_client).await?;
265+
// Set and get access policies
266+
set_and_get_access_policies(&queue_client).await?;
207267

208268
// Cleanup
209269
let result = queue_client.delete(None).await;

sdk/storage/azure_storage_queue/examples/queue_service_client.rs

Lines changed: 80 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,77 @@
1-
mod helpers;
1+
use std::sync::Arc;
2+
3+
use azure_storage_queue::models::{
4+
CorsRule, ListQueuesSegmentResponse, QueueServiceClientListQueuesSegmentOptions,
5+
StorageServiceProperties,
6+
};
27

3-
use azure_storage_queue::models::ListQueuesSegmentResponse;
4-
use helpers::endpoint::get_endpoint;
8+
mod helpers;
9+
use helpers::endpoint::{get_endpoint, get_secondary_endpoint};
510
use helpers::logs::log_operation_result;
611
use helpers::random_queue_name::get_random_queue_name;
712

8-
use azure_core::http::RequestContent;
913
use azure_identity::DefaultAzureCredential;
1014
use azure_storage_queue::clients::QueueServiceClient;
1115

1216
use futures::StreamExt;
1317

14-
async fn get_and_set_properties(
18+
async fn set_and_get_properties(
1519
queue_client: &QueueServiceClient,
1620
) -> Result<(), Box<dyn std::error::Error>> {
17-
// Get queue service properties
18-
let result = queue_client.get_properties(None).await;
19-
log_operation_result(&result, "get_properties");
20-
2121
// Set queue service properties
22-
let properties = queue_client.get_properties(None).await?.into_body().await?;
23-
let properties_xml = quick_xml::se::to_string(&properties)?;
24-
let properties_bytes = properties_xml.into_bytes();
25-
22+
let properties = StorageServiceProperties {
23+
cors: Some(vec![CorsRule {
24+
allowed_origins: Some("https://example.com".to_string()),
25+
allowed_methods: Some("GET,POST".to_string()),
26+
max_age_in_seconds: Some(3600),
27+
exposed_headers: Some("x-ms-meta-data".to_string()),
28+
allowed_headers: Some("x-ms-meta-target".to_string()),
29+
}]),
30+
..Default::default()
31+
};
2632
let result = queue_client
27-
.set_properties(RequestContent::from(properties_bytes), None)
33+
.set_properties(properties.try_into()?, None)
2834
.await;
2935
log_operation_result(&result, "set_properties");
3036

37+
// Get queue service properties
38+
let result = queue_client.get_properties(None).await;
39+
log_operation_result(&result, "get_properties");
40+
41+
if let Ok(response) = result {
42+
let properties: StorageServiceProperties = response.into_body().await?;
43+
println!("Queue Service Properties:");
44+
println!("Logging: {:#?}", properties.logging);
45+
println!("Hour Metrics: {:#?}", properties.hour_metrics);
46+
println!("Minute Metrics: {:#?}", properties.minute_metrics);
47+
48+
if let Some(cors_rules) = &properties.cors {
49+
println!("CORS Rules ({} rules):", cors_rules.len());
50+
for (index, rule) in cors_rules.iter().enumerate() {
51+
println!(" Rule {}:", index + 1);
52+
println!(" Allowed Origins: {:?}", rule.allowed_origins);
53+
println!(" Allowed Methods: {:?}", rule.allowed_methods);
54+
println!(" Allowed Headers: {:?}", rule.allowed_headers);
55+
println!(" Exposed Headers: {:?}", rule.exposed_headers);
56+
println!(" Max Age in Seconds: {:?}", rule.max_age_in_seconds);
57+
}
58+
} else {
59+
println!("CORS Rules: None");
60+
}
61+
} else {
62+
eprintln!("Failed to get queue service properties.");
63+
}
64+
3165
Ok(())
3266
}
3367

3468
async fn list_queues_segment(
3569
queue_client: &QueueServiceClient,
3670
) -> Result<(), Box<dyn std::error::Error>> {
37-
let options =
38-
azure_storage_queue::models::QueueServiceOperationGroupClientListQueuesSegmentOptions {
39-
maxresults: Some(1),
40-
..Default::default()
41-
};
71+
let options = QueueServiceClientListQueuesSegmentOptions {
72+
maxresults: Some(1),
73+
..Default::default()
74+
};
4275
let result = queue_client.list_queues_segment(Some(options));
4376
log_operation_result(&result, "list_queues_segment");
4477

@@ -62,6 +95,30 @@ async fn list_queues_segment(
6295
Ok(())
6396
}
6497

98+
async fn get_statistics(
99+
credential: Arc<DefaultAzureCredential>,
100+
) -> Result<(), Box<dyn std::error::Error>> {
101+
let secondary_endpoint = get_secondary_endpoint();
102+
let secondary_queue_client =
103+
QueueServiceClient::new(&secondary_endpoint, credential.clone(), None)?;
104+
let result = secondary_queue_client.get_statistics(None).await;
105+
log_operation_result(&result, "get_statistics");
106+
107+
if let Ok(response) = result {
108+
let stats = response.into_body().await?;
109+
let geo_replication = stats.geo_replication.as_ref().unwrap();
110+
println!(
111+
"Geo-replication status: {}, Last sync time: {}",
112+
geo_replication.status.as_ref().unwrap(),
113+
geo_replication.last_sync_time.unwrap()
114+
);
115+
} else {
116+
eprintln!("Failed to get queue service statistics. Ensure the queue service is geo-replicated and the secondary endpoint is accessible.");
117+
}
118+
119+
Ok(())
120+
}
121+
65122
#[tokio::main]
66123
async fn main() -> Result<(), Box<dyn std::error::Error>> {
67124
let credential = DefaultAzureCredential::new()?;
@@ -76,11 +133,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
76133
let result = queue_client.create_queue(&queue_name, None).await;
77134
log_operation_result(&result, "create_queue");
78135

79-
get_and_set_properties(&queue_client).await?;
136+
set_and_get_properties(&queue_client).await?;
80137

81138
// List queues
82139
list_queues_segment(&queue_client).await?;
83140

141+
// Get statistics
142+
get_statistics(credential.clone()).await?;
143+
84144
// Cleanup
85145
let result = queue_client.delete_queue(&queue_name, None).await;
86146
log_operation_result(&result, "delete_queue");

0 commit comments

Comments
 (0)