diff --git a/Cargo.lock b/Cargo.lock index f769de8d4e..20bc1a5abc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,6 +479,24 @@ dependencies = [ name = "azure_storage_common" version = "0.1.0" +[[package]] +name = "azure_storage_queue" +version = "0.1.0" +dependencies = [ + "azure_core", + "azure_core_test", + "azure_identity", + "futures", + "quick-xml", + "rand 0.9.2", + "reqwest", + "serde", + "serde_json", + "time", + "tokio", + "typespec_client_core", +] + [[package]] name = "azure_template" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index d5fae13115..2f8fb32082 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "sdk/template/azure_template", "sdk/storage/azure_storage_common", "sdk/storage/azure_storage_blob", + "sdk/storage/azure_storage_queue", ] exclude = [ "eng/scripts", diff --git a/sdk/storage/.dict.txt b/sdk/storage/.dict.txt index 1c4831b113..00f331d958 100644 --- a/sdk/storage/.dict.txt +++ b/sdk/storage/.dict.txt @@ -24,3 +24,5 @@ testblob1 testblob2 testblob3 testblob4 +numofmessages +peekonly diff --git a/sdk/storage/azure_storage_blob/assets.json b/sdk/storage/azure_storage_blob/assets.json index 8443d02da4..7ae59a2b38 100644 --- a/sdk/storage/azure_storage_blob/assets.json +++ b/sdk/storage/azure_storage_blob/assets.json @@ -3,4 +3,4 @@ "AssetsRepoPrefixPath": "rust", "Tag": "rust/azure_storage_blob_873d66a4ea", "TagPrefix": "rust/azure_storage_blob" -} \ No newline at end of file +} diff --git a/sdk/storage/azure_storage_queue/CHANGELOG.md b/sdk/storage/azure_storage_queue/CHANGELOG.md new file mode 100644 index 0000000000..77d9ac51c2 --- /dev/null +++ b/sdk/storage/azure_storage_queue/CHANGELOG.md @@ -0,0 +1,7 @@ +# Release History + +## 0.1.0 (Unreleased) + +### Features Added + +* Initial supported release. diff --git a/sdk/storage/azure_storage_queue/Cargo.toml b/sdk/storage/azure_storage_queue/Cargo.toml new file mode 100644 index 0000000000..aa083e7c06 --- /dev/null +++ b/sdk/storage/azure_storage_queue/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "azure_storage_queue" +version = "0.1.0" +description = "Microsoft Azure Queue client library for Rust" +readme = "README.md" +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +homepage = "https://github.com/azure/azure-sdk-for-rust" +documentation = "https://docs.rs/azure_storage_queue" +keywords = ["sdk", "azure", "storage", "queue", "queues"] +categories = ["api-bindings"] + +# Manual example definitions pointing to samples/ directory +[[example]] +name = "queue_client" +path = "samples/queue_client.rs" + +[[example]] +name = "queue_service_client" +path = "samples/queue_service_client.rs" + +[features] +default = ["reqwest"] +reqwest = ["dep:reqwest"] + +[dependencies] +azure_core = { workspace = true, features = ["xml"] } +quick-xml.workspace = true +rand.workspace = true +reqwest = { workspace = true, optional = true } +serde = { workspace = true } +serde_json = { workspace = true } +time = { workspace = true } +typespec_client_core = { workspace = true, features = ["derive"] } + +[lints] +workspace = true + +[dev-dependencies] +azure_core_test.workspace = true +azure_identity.workspace = true +futures.workspace = true +tokio = { workspace = true, features = ["macros"] } diff --git a/sdk/storage/azure_storage_queue/README.md b/sdk/storage/azure_storage_queue/README.md new file mode 100644 index 0000000000..3f4e349299 --- /dev/null +++ b/sdk/storage/azure_storage_queue/README.md @@ -0,0 +1,98 @@ +# Azure Queue client library for Rust + +Azure Queue Storage is a service for storing large numbers of messages. + +[Source code] | [Package (crates.io)] | [API reference documentation] | [REST API documentation] | [Product documentation] + +## Getting started + +**⚠️ Note: The `azure_storage_queue` crate is currently under active development and not all features may be implemented or work as intended. This crate is in beta and not suitable for Production environments. For any general feedback or usage issues, please open a GitHub issue [here](https://github.com/Azure/azure-sdk-for-rust/issues).** + +### Install the package + +Install the Azure Storage Queue client library for Rust with [cargo]: + +```sh +cargo add azure_storage_queue +``` + +### Prerequisites + +* You must have an [Azure subscription] and an [Azure storage account] to use this package. + +### Create a storage account + +If you wish to create a new storage account, you can use the +[Azure Portal], [Azure PowerShell], or [Azure CLI]: + +```sh +# Create a new resource group to hold the storage account - +# if using an existing resource group, skip this step +az group create --name my-resource-group --location westus2 + +# Create the storage account +az storage account create -n my-storage-account-name -g my-resource-group +``` + +#### Authenticate the client + +In order to interact with the Azure Queue service, you'll need to create an instance of a client, `QueueClient`. The [Azure Identity] library makes it easy to add Microsoft Entra ID support for authenticating Azure SDK clients with their corresponding Azure services: + +```rust no_run +use azure_storage_queue::{QueueClient, QueueClientOptions}; +use azure_identity::DeveloperToolsCredential; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create a QueueClient that will authenticate through Microsoft Entra ID + let credential = DeveloperToolsCredential::new(None)?; + let queue_client = QueueClient::new( + "https://.blob.core.windows.net/", // endpoint + "queue-name", // queue name + credential, // credential + Some(QueueClientOptions::default()), // QueueClient options + )?; + Ok(()) +} +``` + +#### Permissions + +You may need to specify RBAC roles to access Queues via Microsoft Entra ID. Please see [Assign an Azure role for access to queue data] for more details. + +## Examples + + +You can find executable examples for all major SDK functions in: +- [queue_client.rs] +- [queue_service_client.rs] + +## Next steps + +### Provide feedback + +If you encounter bugs or have suggestions, [open an issue](https://github.com/Azure/azure-sdk-for-rust/issues). + +## Contributing + +This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit [https://cla.microsoft.com](https://cla.microsoft.com). + +When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You'll only need to do this once across all repos using our CLA. + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information, see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. + + + +[Azure subscription]: https://azure.microsoft.com/free/ +[Azure storage account]: https://learn.microsoft.com/azure/storage/common/storage-account-overview +[Azure Portal]: https://learn.microsoft.com/azure/storage/common/storage-quickstart-create-account?tabs=azure-portal +[Azure PowerShell]: https://learn.microsoft.com/azure/storage/common/storage-quickstart-create-account?tabs=azure-powershell +[Azure CLI]: https://learn.microsoft.com/azure/storage/common/storage-quickstart-create-account?tabs=azure-cli +[cargo]: https://dev-doc.rust-lang.org/stable/cargo/commands/cargo.html +[Azure Identity]: https://github.com/Azure/azure-sdk-for-rust/tree/main/sdk/identity/azure_identity + + +[Source code]: https://github.com/Azure/azure-sdk-for-rust/tree/main/sdk/storage/azure_storage_queue +[REST API documentation]: https://learn.microsoft.com/rest/api/storageservices/blob-service-rest-api +[Product documentation]: https://learn.microsoft.com/azure/storage/blobs/storage-blobs-overview +[Assign an Azure role for access to queue data]: https://learn.microsoft.com/azure/storage/queues/assign-azure-role-data-access?tabs=portal diff --git a/sdk/storage/azure_storage_queue/assets.json b/sdk/storage/azure_storage_queue/assets.json new file mode 100644 index 0000000000..c2218341de --- /dev/null +++ b/sdk/storage/azure_storage_queue/assets.json @@ -0,0 +1,6 @@ +{ + "AssetsRepo": "Azure/azure-sdk-assets", + "AssetsRepoPrefixPath": "rust", + "Tag": "rust/azure_storage_queue_3ebef0dc87", + "TagPrefix": "rust/azure_storage_queue" +} diff --git a/sdk/storage/azure_storage_queue/samples/README.md b/sdk/storage/azure_storage_queue/samples/README.md new file mode 100644 index 0000000000..ca3d8bb033 --- /dev/null +++ b/sdk/storage/azure_storage_queue/samples/README.md @@ -0,0 +1,10 @@ +# Storage Queue Examples + +This directory contains a set of example for the use of the Storage Queue clients. + +## Setup + +The following environment variables need to be set: + +- AZURE_QUEUE_STORAGE_ACCOUNT_NAME= + diff --git a/sdk/storage/azure_storage_queue/samples/helpers/endpoint.rs b/sdk/storage/azure_storage_queue/samples/helpers/endpoint.rs new file mode 100644 index 0000000000..8f5f77eab3 --- /dev/null +++ b/sdk/storage/azure_storage_queue/samples/helpers/endpoint.rs @@ -0,0 +1,32 @@ +pub fn get_endpoint() -> String { + // Retrieve the storage account endpoint from environment variable. + let storage_account_name = std::env::var("AZURE_QUEUE_STORAGE_ACCOUNT_NAME"); + let storage_account_name = match storage_account_name { + Ok(url) => url, + Err(_) => { + eprintln!("Environment variable AZURE_QUEUE_STORAGE_ACCOUNT_NAME is not set"); + std::process::exit(1); + } + }; + + format!("https://{}.queue.core.windows.net/", storage_account_name) +} + +// This function is used only for the queue service client example, hence the `allow(dead_code)` attribute. +#[allow(dead_code)] +pub fn get_secondary_endpoint() -> String { + // Retrieve the storage account endpoint from environment variable. + let storage_account_name = std::env::var("AZURE_QUEUE_STORAGE_ACCOUNT_NAME"); + let storage_account_name = match storage_account_name { + Ok(url) => url, + Err(_) => { + eprintln!("Environment variable AZURE_QUEUE_STORAGE_ACCOUNT_NAME is not set"); + std::process::exit(1); + } + }; + + format!( + "https://{}-secondary.queue.core.windows.net/", + storage_account_name + ) +} diff --git a/sdk/storage/azure_storage_queue/samples/helpers/logs.rs b/sdk/storage/azure_storage_queue/samples/helpers/logs.rs new file mode 100644 index 0000000000..ea99c50bca --- /dev/null +++ b/sdk/storage/azure_storage_queue/samples/helpers/logs.rs @@ -0,0 +1,25 @@ +use azure_core::{error::Error, http::StatusCode}; + +/// Helper function to log operation results +pub fn log_operation_result(result: &Result, operation: &str) +where + T: std::fmt::Debug, +{ + match result { + Ok(response) => println!("Successfully {}: {:?}", operation, response), + Err(e) => match e.http_status() { + Some(StatusCode::NotFound) => println!("Unable to {}, resource not found", operation), + Some(StatusCode::Forbidden) => println!( + "Unable to {}, access forbidden - check credentials", + operation + ), + _ => { + eprintln!("Error during {}: {}", operation, e); + if let Some(status) = e.http_status() { + eprintln!("HTTP Status: {}", status); + } + eprintln!("Full Error: {:#?}", e); + } + }, + } +} diff --git a/sdk/storage/azure_storage_queue/samples/helpers/mod.rs b/sdk/storage/azure_storage_queue/samples/helpers/mod.rs new file mode 100644 index 0000000000..ef00b550b4 --- /dev/null +++ b/sdk/storage/azure_storage_queue/samples/helpers/mod.rs @@ -0,0 +1,3 @@ +pub mod endpoint; +pub mod logs; +pub mod random_queue_name; diff --git a/sdk/storage/azure_storage_queue/samples/helpers/random_queue_name.rs b/sdk/storage/azure_storage_queue/samples/helpers/random_queue_name.rs new file mode 100644 index 0000000000..18303effeb --- /dev/null +++ b/sdk/storage/azure_storage_queue/samples/helpers/random_queue_name.rs @@ -0,0 +1,7 @@ +/// Generates a random queue name with a suffix to ensure uniqueness. +pub fn get_random_queue_name() -> String { + use rand::Rng; + let mut rng = rand::rng(); + let random_suffix: u32 = rng.random_range(1000..9999); + format!("sdk-test-queue-{}", random_suffix) +} diff --git a/sdk/storage/azure_storage_queue/samples/queue_client.rs b/sdk/storage/azure_storage_queue/samples/queue_client.rs new file mode 100644 index 0000000000..30b46ac2d4 --- /dev/null +++ b/sdk/storage/azure_storage_queue/samples/queue_client.rs @@ -0,0 +1,230 @@ +use std::collections::HashMap; + +mod helpers; +use helpers::endpoint::get_endpoint; +use helpers::logs::log_operation_result; +use helpers::random_queue_name::get_random_queue_name; + +use azure_core::{ + http::{Response, XmlFormat}, + Error, +}; +use azure_identity::DeveloperToolsCredential; +use azure_storage_queue::{ + models::{ + QueueClientGetMetadataResultHeaders, QueueClientPeekMessagesOptions, + QueueClientReceiveMessagesOptions, QueueClientUpdateOptions, QueueMessage, SentMessage, + }, + QueueClient, +}; + +async fn send_message( + queue_client: &QueueClient, + message: &str, +) -> Result, Error> { + let queue_message = QueueMessage { + message_text: Some(message.to_owned()), + }; + + queue_client + .send_message(queue_message.try_into()?, None) + .await +} + +async fn send_and_delete_message( + queue_client: &QueueClient, + message: &str, +) -> Result<(), Box> { + let result = send_message(queue_client, message).await; + + if let Ok(response) = result { + let message = response.into_body().await?; + + if let (Some(message_id), Some(pop_receipt)) = (message.message_id, message.pop_receipt) { + println!( + "Deleting message with ID: {} and pop receipt: {}", + message_id, pop_receipt + ); + let delete_result = queue_client + .delete_message(&message_id, &pop_receipt, None) + .await; + log_operation_result(&delete_result, "delete_message"); + } + } + + Ok(()) +} + +async fn send_and_update_message( + queue_client: &QueueClient, + message: &str, +) -> Result<(), Box> { + let result = send_message(queue_client, message).await; + + if let Ok(response) = result { + let message = response.into_body().await?; + + if let (Some(message_id), Some(pop_receipt)) = (message.message_id, message.pop_receipt) { + println!( + "Updating message with ID: {} and pop receipt: {}", + message_id, pop_receipt + ); + let queue_message = QueueMessage { + message_text: Some("Updated message text from Rust".to_string()), + }; + let update_option = QueueClientUpdateOptions { + // Serialize the message text as bytes for the update + queue_message: Some(queue_message.try_into()?), + ..Default::default() + }; + let update_result = queue_client + .update_message(&message_id.clone(), &pop_receipt, 50, Some(update_option)) + .await; + log_operation_result(&update_result, "update_message"); + } + } + + Ok(()) +} + +async fn set_and_get_metadata( + queue_client: &QueueClient, +) -> Result<(), Box> { + let result = queue_client + .set_metadata( + HashMap::from([ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ]), + None, + ) + .await; + log_operation_result(&result, "set_metadata"); + + let result = queue_client.get_metadata(None).await; + log_operation_result(&result, "get_metadata"); + + let metadata = result.unwrap().metadata().unwrap_or_default(); + for (key, value) in metadata { + println!("Metadata - {}: {}", key, value); + } + + let result = queue_client.set_metadata(HashMap::new(), None).await; + log_operation_result(&result, "set_metadata_empty"); + + let result = queue_client.get_metadata(None).await; + log_operation_result(&result, "get_metadata_empty"); + + let metadata = result.unwrap().metadata().unwrap_or_default(); + for (key, value) in metadata { + println!("Metadata - {}: {}", key, value); + } + + Ok(()) +} + +async fn peek_and_receive_messages( + queue_client: &QueueClient, +) -> Result<(), Box> { + _ = send_message(queue_client, "Message 1 from Rust Queue SDK").await; + _ = send_message(queue_client, "Message 2 from Rust Queue SDK").await; + + let options = QueueClientPeekMessagesOptions { + number_of_messages: Some(5), + ..Default::default() + }; + + let result = queue_client.peek_messages(Some(options)).await; + log_operation_result(&result, "peek_messages"); + + if let Ok(response) = result { + let messages = response.into_body().await?; + if let Some(messages) = messages.items { + for msg in messages { + println!( + "Successfully peeked message ({}): {}", + msg.message_id.unwrap(), + msg.message_text.unwrap_or_default() + ); + } + } + } + + let options = QueueClientReceiveMessagesOptions { + number_of_messages: Some(5), + ..Default::default() + }; + + let result = queue_client.receive_messages(Some(options)).await; + log_operation_result(&result, "receive_messages"); + + if let Ok(response) = result { + let messages = response.into_body().await?; + if let Some(messages) = messages.items { + for msg in messages { + println!( + "Successfully received message ({}): {}", + msg.message_id.unwrap(), + msg.message_text.unwrap_or_default() + ); + } + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let credential = DeveloperToolsCredential::new(None)?; + + // Retrieve the storage account endpoint from environment variable. + let endpoint = get_endpoint(); + + let queue_name = get_random_queue_name(); + let queue_client = QueueClient::new(&endpoint, &queue_name, credential.clone(), None)?; + + // Create and manage queue + let result = queue_client.create(None).await; + log_operation_result(&result, "create"); + + let result = queue_client.exists().await; + log_operation_result(&result, "check_exists"); + + // Set and get queue metadata + set_and_get_metadata(&queue_client).await?; + + let result = send_message(&queue_client, "Example Message").await; + log_operation_result(&result, "send_message"); + + send_and_update_message( + &queue_client, + "Example message created from Rust, ready for update", + ) + .await?; + + // Clear messages + let result = queue_client.clear(None).await; + log_operation_result(&result, "clear"); + + // Send and process messages + send_and_delete_message( + &queue_client, + "Example message created from Rust, ready for deletion", + ) + .await?; + + // Peek and Receive messages + peek_and_receive_messages(&queue_client).await?; + + // Cleanup + let result = queue_client.delete(None).await; + log_operation_result(&result, "delete"); + + let non_existing_queue_client = + QueueClient::new(&endpoint, "non-existent-queue", credential.clone(), None)?; + let result = non_existing_queue_client.exists().await; + log_operation_result(&result, "check_non_existent"); + + Ok(()) +} diff --git a/sdk/storage/azure_storage_queue/samples/queue_service_client.rs b/sdk/storage/azure_storage_queue/samples/queue_service_client.rs new file mode 100644 index 0000000000..f11553c617 --- /dev/null +++ b/sdk/storage/azure_storage_queue/samples/queue_service_client.rs @@ -0,0 +1,151 @@ +use std::sync::Arc; + +use azure_storage_queue::models::{ + CorsRule, ListQueuesIncludeType, ListQueuesResponse, QueueServiceClientListQueuesOptions, + QueueServiceProperties, +}; + +mod helpers; +use helpers::endpoint::{get_endpoint, get_secondary_endpoint}; +use helpers::logs::log_operation_result; +use helpers::random_queue_name::get_random_queue_name; + +use azure_identity::DeveloperToolsCredential; +use azure_storage_queue::QueueServiceClient; + +use futures::StreamExt; + +async fn set_and_get_properties( + queue_client: &QueueServiceClient, +) -> Result<(), Box> { + // Set queue service properties + let properties = QueueServiceProperties { + cors: Some(vec![CorsRule { + allowed_origins: Some("https://example.com".to_string()), + allowed_methods: Some("GET,POST".to_string()), + max_age_in_seconds: Some(3600), + exposed_headers: Some("x-ms-meta-data".to_string()), + allowed_headers: Some("x-ms-meta-target".to_string()), + }]), + ..Default::default() + }; + let result = queue_client + .set_properties(properties.try_into()?, None) + .await; + log_operation_result(&result, "set_properties"); + + // Get queue service properties + let result = queue_client.get_properties(None).await; + log_operation_result(&result, "get_properties"); + + if let Ok(response) = result { + let properties = response.into_body().await?; + println!("Queue Service Properties:"); + println!("Logging: {:#?}", properties.logging); + println!("Hour Metrics: {:#?}", properties.hour_metrics); + println!("Minute Metrics: {:#?}", properties.minute_metrics); + + if let Some(cors_rules) = &properties.cors { + println!("CORS Rules ({} rules):", cors_rules.len()); + for (index, rule) in cors_rules.iter().enumerate() { + println!(" Rule {}:", index + 1); + println!(" Allowed Origins: {:?}", rule.allowed_origins); + println!(" Allowed Methods: {:?}", rule.allowed_methods); + println!(" Allowed Headers: {:?}", rule.allowed_headers); + println!(" Exposed Headers: {:?}", rule.exposed_headers); + println!(" Max Age in Seconds: {:?}", rule.max_age_in_seconds); + } + } else { + println!("CORS Rules: None"); + } + } else { + eprintln!("Failed to get queue service properties."); + } + + Ok(()) +} + +async fn list_queues(queue_client: &QueueServiceClient) -> Result<(), Box> { + let options = QueueServiceClientListQueuesOptions { + maxresults: Some(1), + include: Some(vec![ListQueuesIncludeType::Metadata]), // Include metadata in the response + ..Default::default() + }; + let result = queue_client.list_queues(Some(options)); + log_operation_result(&result, "list_queues_segment"); + + if let Ok(mut pager_response) = result { + while let Some(response_result) = pager_response.next().await { + println!("Processing next page of queues..."); + match response_result { + Ok(response) => { + let queue_list: ListQueuesResponse = response.into_body().await?; + for queue in queue_list.queue_items { + println!("Queue: {}", queue.name.unwrap_or_default()); + for (key, value) in queue.metadata.unwrap_or_default() { + println!(" Metadata - {}: {}", key, value); + } + } + } + Err(e) => { + eprintln!("Error getting queue page: {}", e); + } + } + } + } + + Ok(()) +} + +async fn get_statistics( + credential: Arc, +) -> Result<(), Box> { + let secondary_endpoint = get_secondary_endpoint(); + let secondary_queue_client = + QueueServiceClient::new(&secondary_endpoint, credential.clone(), None)?; + let result = secondary_queue_client.get_statistics(None).await; + log_operation_result(&result, "get_statistics"); + + if let Ok(response) = result { + let stats = response.into_body().await?; + let geo_replication = stats.geo_replication.as_ref().unwrap(); + println!( + "Geo-replication status: {}, Last sync time: {}", + geo_replication.status.as_ref().unwrap(), + geo_replication.last_sync_time.unwrap() + ); + } else { + eprintln!("Failed to get queue service statistics. Ensure the queue service is geo-replicated and the secondary endpoint is accessible."); + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let credential = DeveloperToolsCredential::new(None)?; + + // Retrieve the storage account endpoint from environment variable. + let endpoint = get_endpoint(); + + let queue_name = get_random_queue_name(); + let queue_client = QueueServiceClient::new(&endpoint, credential.clone(), None)?; + + // Create and manage queue + let result = queue_client.create_queue(&queue_name, None).await; + log_operation_result(&result, "create_queue"); + + set_and_get_properties(&queue_client).await?; + + // List queues + list_queues(&queue_client).await?; + + // Get statistics + get_statistics(credential.clone()).await?; + + // Cleanup + let result = queue_client.delete_queue(&queue_name, None).await; + log_operation_result(&result, "delete_queue"); + + Ok(()) +} diff --git a/sdk/storage/azure_storage_queue/src/clients/mod.rs b/sdk/storage/azure_storage_queue/src/clients/mod.rs new file mode 100644 index 0000000000..17417c6221 --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/clients/mod.rs @@ -0,0 +1,4 @@ +pub mod queue_client; +pub mod queue_service_client; +pub use queue_client::QueueClient; +pub use queue_service_client::QueueServiceClient; diff --git a/sdk/storage/azure_storage_queue/src/clients/queue_client.rs b/sdk/storage/azure_storage_queue/src/clients/queue_client.rs new file mode 100644 index 0000000000..3c300e84af --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/clients/queue_client.rs @@ -0,0 +1,353 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +use crate::generated::{ + clients::{QueueClient as GeneratedQueueClient, QueueClientOptions}, + models::*, +}; +use azure_core::{ + credentials::TokenCredential, + http::{NoFormat, RawResponse, RequestContent, Response, StatusCode, Url, XmlFormat}, + xml, Bytes, Result, +}; +use std::{collections::HashMap, sync::Arc}; + +/// A client to interact with a specific Azure storage queue, although that queue may not yet exist. +pub struct QueueClient { + pub(super) endpoint: Url, + pub(super) client: GeneratedQueueClient, +} + +impl QueueClient { + /// Returns the endpoint URL of the Azure storage account this client is associated with. + /// + /// # Returns + /// + /// A reference to the URL of the storage account. + pub fn endpoint(&self) -> &Url { + self.client.endpoint() + } + + /// Returns the name of the queue this client is associated with. + /// + /// # Returns + /// + /// A reference to the name of the queue. + pub fn queue_name(&self) -> &str { + &self.client.queue_name + } + + /// Creates a new QueueClient using Entra ID authentication. + /// + /// # Arguments + /// + /// * `endpoint` - The full URL of the Azure storage account, for example `https://.queue.core.windows.net/` + /// * `queue_name` - The name of the queue to interact with + /// * `credential` - An implementation of [`TokenCredential`] that can provide an Entra ID token for authentication + /// * `options` - Optional configuration for the client + /// + /// # Returns + /// + /// Returns a `Result` containing the new `QueueClient` if successful, or an error if the endpoint URL is invalid + pub fn new( + endpoint: &str, + queue_name: &str, + credential: Arc, + options: Option, + ) -> Result { + let options = options.unwrap_or_default(); + + let client = GeneratedQueueClient::new( + endpoint, + credential.clone(), + queue_name.to_string(), + Some(options), + )?; + Ok(Self { + endpoint: endpoint.parse()?, + client, + }) + } + + /// Creates a new queue under the given account. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request + /// + /// # Returns + /// + /// Returns a `Result` containing the response if successful + /// + /// # Errors + /// + /// Returns an error if the queue already exists or if the request fails + pub async fn create( + &self, + options: Option>, + ) -> Result> { + self.client.create(options).await + } + + /// Permanently deletes the specified queue. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request + /// + /// # Returns + /// + /// Returns a `Result` containing the response if successful + /// + /// # Errors + /// + /// Returns an error if the queue doesn't exist or if the request fails + pub async fn delete( + &self, + options: Option>, + ) -> Result> { + self.client.delete(options).await + } + + /// Checks if the queue exists. + /// + /// # Returns + /// + /// Returns a `Result` containing: + /// - `Ok(true)` if the queue exists + /// - `Ok(false)` if the queue does not exist + /// + /// # Errors + /// + /// Returns an error if the request fails for any reason other than a non-existent queue + pub async fn exists(&self) -> Result { + match self.get_metadata(None).await { + Ok(_) => Ok(true), + Err(e) if e.http_status() == Some(StatusCode::NotFound) => { + // If the queue does not exist, we return false. + Ok(false) + } + Err(e) => { + // Propagate other errors. + Err(e) + } + } + } + + /// Clears all messages in the specified queue. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request + /// + /// # Returns + /// + /// Returns a `Result` containing the response if successful + /// + /// # Errors + /// + /// Returns an error if the queue doesn't exist or if the request fails + pub async fn clear( + &self, + options: Option>, + ) -> Result> { + self.client.clear(options).await + } + + /// Sets the metadata for the specified queue. + /// + /// # Arguments + /// + /// * `metadata` - A HashMap containing the metadata key-value pairs to set for the queue. + /// This will replace all existing metadata on the queue. If an empty HashMap is provided, all + /// existing metadata will be deleted from the queue. + /// * `options` - Optional parameters for the request + /// + /// # Returns + /// + /// Returns a `Result` containing the response if successful + /// + /// # Errors + /// + /// Returns an error if the queue doesn't exist or if the request fails + pub async fn set_metadata( + &self, + metadata: HashMap, + options: Option>, + ) -> Result> { + self.client.set_metadata(metadata, options).await + } + + /// Retrieves the metadata of the specified queue. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request + /// + /// # Returns + /// + /// Returns a `Result` containing the queue's metadata if successful + /// + /// # Errors + /// + /// Returns an error if the queue doesn't exist or if the request fails + pub async fn get_metadata( + &self, + options: Option>, + ) -> Result> { + self.client.get_metadata(options).await + } + + /// Enqueues a message to the specified queue. + /// + /// # Arguments + /// + /// * `message` - The message text to be added to the queue + /// * `options` - Optional parameters for the enqueue operation, including visibility timeout and message time-to-live + /// + /// # Returns + /// + /// Returns a `Result` containing the enqueued message details if successful + /// + /// # Errors + /// + /// Returns an error if the queue doesn't exist, if no message was sent, or if the request fails + pub async fn send_message( + &self, + queue_message: RequestContent, + options: Option>, + ) -> Result> { + let response = self.client.send_message(queue_message, options).await?; + + Self::extract_first_message(response, |list: &ListOfSentMessage| { + list.items.clone().unwrap_or_default() + }) + .await + } + + /// Deletes a specific message from the queue. + /// + /// # Arguments + /// + /// * `message_id` - The ID of the message to delete + /// * `pop_receipt` - The pop receipt obtained when the message was retrieved + /// * `options` - Optional parameters for the delete operation + /// + /// # Returns + /// + /// Returns a `Result` containing the response if successful + /// + /// # Errors + /// + /// Returns an error if the message doesn't exist, the pop receipt is invalid, + /// or if the request fails + pub async fn delete_message( + &self, + message_id: &str, + pop_receipt: &str, + options: Option>, + ) -> Result> { + self.client + .delete_message(message_id, pop_receipt, options) + .await + } + + /// Updates a specific message in the queue. + /// + /// # Arguments + /// + /// * `message_id` - The ID of the message to update + /// * `pop_receipt` - The pop receipt obtained when the message was retrieved + /// * `visibility_timeout` - The new visibility timeout for the message, in seconds + /// * `options` - Optional parameters for the update operation + /// + /// # Returns + /// + /// Returns a `Result` containing the response if successful + /// + /// # Errors + /// + /// Returns an error if the message doesn't exist, the pop receipt is invalid, + /// or if the request fails + pub async fn update_message( + &self, + message_id: &str, + pop_receipt: &str, + visibility_timeout: i32, + options: Option>, + ) -> Result> { + self.client + .update(message_id, pop_receipt, visibility_timeout, options) + .await + } + + /// The Dequeue operation retrieves one or more messages from the front of the queue. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the dequeue operation. Use `number_of_messages` to specify + /// how many messages to retrieve (up to 32) and set the visibility timeout + /// + /// # Returns + /// + /// Returns a `Result` containing the dequeued messages if successful. The messages will be invisible + /// to other consumers for the duration specified in the visibility timeout + /// + /// # Errors + /// + /// Returns an error if the queue doesn't exist or if the request fails + pub async fn receive_messages( + &self, + options: Option>, + ) -> Result> { + self.client.receive_messages(options).await + } + + /// Peeks multiple messages from the front of the queue without removing them. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the peek operation. Use `number_of_messages` + /// to specify how many messages to peek (up to 32) + /// + /// # Returns + /// + /// Returns a `Result` containing the messages at the front of the queue if successful. + /// The messages remain visible to other consumers + /// + /// # Errors + /// + /// Returns an error if the queue doesn't exist or if the request fails + pub async fn peek_messages( + &self, + options: Option>, + ) -> Result> { + self.client.peek_messages(options).await + } + + /// Helper function to extract the first message from a list response and convert it to a single message response + async fn extract_first_message( + response: Response, + extract_fn: impl Fn(&T) -> Vec, + ) -> Result> + where + T: serde::de::DeserializeOwned, + U: serde::Serialize + Clone, + { + let status = response.status(); + let headers = response.headers().clone(); + let message_list = response.into_body().await?; + + let messages = extract_fn(&message_list); + let first_message = messages.into_iter().next().ok_or_else(|| { + azure_core::Error::message( + azure_core::error::ErrorKind::DataConversion, + "No messages found in the response", + ) + })?; + + let xml_body = xml::to_xml(&first_message)?; + let raw_response = RawResponse::from_bytes(status, headers, xml_body); + Ok(raw_response.into()) + } +} diff --git a/sdk/storage/azure_storage_queue/src/clients/queue_service_client.rs b/sdk/storage/azure_storage_queue/src/clients/queue_service_client.rs new file mode 100644 index 0000000000..36ce30657e --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/clients/queue_service_client.rs @@ -0,0 +1,204 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +use crate::{ + clients::QueueClient, + generated::{ + clients::{QueueServiceClient as GeneratedQueueClient, QueueServiceClientOptions}, + models::*, + }, +}; +use azure_core::{ + credentials::TokenCredential, + http::{ + policies::{BearerTokenCredentialPolicy, Policy}, + Context, Method, NoFormat, PageIterator, RawResponse, Request, RequestContent, Response, + StatusCode, Url, XmlFormat, + }, + xml, Bytes, Result, +}; +use std::{collections::HashMap, sync::Arc}; + +/// A client to interact with a specific Azure storage queue, although that queue may not yet exist. +pub struct QueueServiceClient { + pub(super) endpoint: Url, + pub(super) client: GeneratedQueueClient, +} + +impl QueueServiceClient { + /// Returns the endpoint URL of the Azure storage account this client is associated with. + /// + /// # Returns + /// + /// A reference to the URL of the storage account. + pub fn endpoint(&self) -> &Url { + self.client.endpoint() + } + + /// Creates a new QueueServiceClient using Entra ID authentication. + /// + /// # Arguments + /// + /// * `endpoint` - The full URL of the Azure storage account, for example `https://.queue.core.windows.net/` + /// * `credential` - An implementation of [`TokenCredential`] that can provide an Entra ID token for authentication + /// * `options` - Optional configuration for the client + /// + /// # Returns + /// + /// Returns a `Result` containing the new `QueueServiceClient` if successful, or an error if the endpoint URL is invalid + pub fn new( + endpoint: &str, + credential: Arc, + options: Option, + ) -> Result { + let options = options.unwrap_or_default(); + + let client = GeneratedQueueClient::new(endpoint, credential.clone(), Some(options))?; + Ok(Self { + endpoint: endpoint.parse()?, + client, + }) + } + + /// Returns a new instance of QueueClient. + /// + /// # Arguments + /// + /// * `queue_name` - The name of the queue. + /// + /// # Returns + /// + /// Returns a `QueueClient` that can be used to interact with the specified queue. + pub fn queue_client(&self, queue_name: String) -> QueueClient { + QueueClient { + endpoint: self.endpoint.clone(), + client: self.client.get_queue_client(queue_name), + } + } + + /// Creates a new queue under the given account. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request + /// * `queue_name` - The name of the queue to create + /// + /// # Returns + /// + /// Returns a `Result` containing the response if successful + /// + /// # Errors + /// + /// Returns an error if the queue already exists or if the request fails + pub async fn create_queue( + &self, + queue_name: &str, + options: Option>, + ) -> Result> { + self.client + .get_queue_client(queue_name.to_string()) + .create(options) + .await + } + + /// Permanently deletes the specified queue. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request + /// * `queue_name` - The name of the queue to create + /// + /// # Returns + /// + /// Returns a `Result` containing the response if successful + /// + /// # Errors + /// + /// Returns an error if the queue doesn't exist or if the request fails + pub async fn delete_queue( + &self, + queue_name: &str, + options: Option>, + ) -> Result> { + self.client + .get_queue_client(queue_name.to_string()) + .delete(options) + .await + } + + /// Retrieves the properties of the queue service. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request + /// + /// # Returns + /// + /// Returns a `Result` containing the service properties response if successful + /// + /// # Note + /// + /// This returns properties for the entire service, not just a single queue. + pub async fn get_properties( + &self, + options: Option>, + ) -> Result> { + self.client.get_properties(options).await + } + + /// Sets the properties of the queue service. + /// + /// # Arguments + /// + /// * `storage_service_properties` - The properties to set for the queue service + /// * `content_type` - The content type of the request body, typically "application/xml" + /// * `options` - Optional parameters for the request + /// + /// # Returns + /// + /// Returns a `Result` containing the response if successful. + pub async fn set_properties( + &self, + queue_service_properties: RequestContent, + options: Option>, + ) -> Result> { + self.client + .set_properties(queue_service_properties, options) + .await + } + + /// Lists queues in the storage account, returning a segment of results. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request, such as prefix and max results + /// + /// # Returns + /// + /// Returns a `Result` containing a `PageIterator` for paginated results, or an error if the request fails. + /// + /// The `PageIterator` can be used to iterate through the results page by page. + pub fn list_queues( + &self, + options: Option>, + ) -> Result>> { + self.client.list_queues(options) + } + + /// Retrieves statistics related to replication for the Queue service. It is only available on the secondary location endpoint + /// when read-access geo-redundant replication is enabled for the storage account. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + /// + /// # Returns + /// + /// Returns a `Result` containing the service statistics response if successful. + pub async fn get_statistics( + &self, + options: Option>, + ) -> Result> { + self.client.get_statistics(options).await + } +} diff --git a/sdk/storage/azure_storage_queue/src/generated/clients/mod.rs b/sdk/storage/azure_storage_queue/src/generated/clients/mod.rs new file mode 100644 index 0000000000..23ea68f73d --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/clients/mod.rs @@ -0,0 +1,9 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +mod queue_client; +mod queue_service_client; +pub use queue_client::*; +pub use queue_service_client::*; diff --git a/sdk/storage/azure_storage_queue/src/generated/clients/queue_client.rs b/sdk/storage/azure_storage_queue/src/generated/clients/queue_client.rs new file mode 100644 index 0000000000..7ff3eafe50 --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/clients/queue_client.rs @@ -0,0 +1,633 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +use crate::generated::models::{ + ListOfPeekedMessage, ListOfReceivedMessage, ListOfSentMessage, ListOfSignedIdentifier, + QueueClientClearOptions, QueueClientCreateOptions, QueueClientDeleteMessageOptions, + QueueClientDeleteOptions, QueueClientGetAccessPolicyOptions, QueueClientGetMetadataOptions, + QueueClientGetMetadataResult, QueueClientPeekMessagesOptions, + QueueClientReceiveMessagesOptions, QueueClientSendMessageOptions, + QueueClientSetAccessPolicyOptions, QueueClientSetAccessPolicyResult, + QueueClientSetMetadataOptions, QueueClientUpdateOptions, QueueMessage, +}; +use azure_core::{ + credentials::TokenCredential, + error::{ErrorKind, HttpError}, + fmt::SafeDebug, + http::{ + policies::{BearerTokenCredentialPolicy, Policy}, + ClientOptions, Context, Method, NoFormat, Pipeline, Request, RequestContent, Response, Url, + XmlFormat, + }, + tracing, Error, Result, +}; +use std::{collections::HashMap, sync::Arc}; + +#[tracing::client] +pub struct QueueClient { + pub(crate) endpoint: Url, + pub(crate) pipeline: Pipeline, + pub(crate) queue_name: String, + pub(crate) version: String, +} + +/// Options used when creating a `QueueClient` +#[derive(Clone, SafeDebug)] +pub struct QueueClientOptions { + /// Allows customization of the client. + pub client_options: ClientOptions, + /// Specifies the version of the operation to use for this request. + pub version: String, +} + +impl QueueClient { + /// Creates a new QueueClient, using Entra ID authentication. + /// + /// # Arguments + /// + /// * `endpoint` - Service host + /// * `credential` - An implementation of [`TokenCredential`](azure_core::credentials::TokenCredential) that can provide an + /// Entra ID token to use when authenticating. + /// * `queue_name` - The name of the queue. + /// * `options` - Optional configuration for the client. + #[tracing::new("azure_storage_queue")] + pub fn new( + endpoint: &str, + credential: Arc, + queue_name: String, + options: Option, + ) -> Result { + let options = options.unwrap_or_default(); + let mut endpoint = Url::parse(endpoint)?; + if !endpoint.scheme().starts_with("http") { + return Err(azure_core::Error::message( + azure_core::error::ErrorKind::Other, + format!("{endpoint} must use http(s)"), + )); + } + endpoint.set_query(None); + let auth_policy: Arc = Arc::new(BearerTokenCredentialPolicy::new( + credential, + vec!["https://storage.azure.com/.default"], + )); + Ok(Self { + endpoint, + queue_name, + version: options.version, + pipeline: Pipeline::new( + option_env!("CARGO_PKG_NAME"), + option_env!("CARGO_PKG_VERSION"), + options.client_options, + Vec::default(), + vec![auth_policy], + ), + }) + } + + /// Returns the Url associated with this client. + pub fn endpoint(&self) -> &Url { + &self.endpoint + } + + /// The Clear operation deletes all messages from the specified queue. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.clear")] + pub async fn clear( + &self, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + let mut path = String::from("{queueName}/messages"); + path = path.replace("{queueName}", &self.queue_name); + url = url.join(&path)?; + let mut request = Request::new(url, Method::Delete); + request.insert_header("accept", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// Creates a new queue under the specified account. If the queue with the same name already exists, the operation fails. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.create")] + pub async fn create( + &self, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + url = url.join(&self.queue_name)?; + if let Some(timeout) = options.timeout { + url.query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + let mut request = Request::new(url, Method::Put); + request.insert_header("accept", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + if let Some(metadata) = options.metadata { + for (k, v) in &metadata { + request.insert_header(format!("x-ms-meta-{k}"), v); + } + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// operation permanently deletes the specified queue + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.delete")] + pub async fn delete( + &self, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + url = url.join(&self.queue_name)?; + if let Some(timeout) = options.timeout { + url.query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + let mut request = Request::new(url, Method::Delete); + request.insert_header("accept", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + if let Some(metadata) = options.metadata { + for (k, v) in &metadata { + request.insert_header(format!("x-ms-meta-{k}"), v); + } + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// The Delete operation deletes the specified message. + /// + /// # Arguments + /// + /// * `message_id` - The id of the queue message. + /// * `pop_receipt` - Required. Specifies the valid pop receipt value returned from an earlier call to the Get Messages or + /// Update Message operation. + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.deleteMessage")] + pub async fn delete_message( + &self, + message_id: &str, + pop_receipt: &str, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + let mut path = String::from("{queueName}/messages/{messageId}"); + path = path.replace("{messageId}", message_id); + path = path.replace("{queueName}", &self.queue_name); + url = url.join(&path)?; + url.query_pairs_mut().append_pair("popReceipt", pop_receipt); + let mut request = Request::new(url, Method::Delete); + request.insert_header("accept", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// gets the permissions for the specified queue. The permissions indicate whether queue data may be accessed publicly. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.getAccessPolicy")] + pub async fn get_access_policy( + &self, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + let mut path = String::from("{queueName}/"); + path = path.replace("{queueName}", &self.queue_name); + url = url.join(&path)?; + url.query_pairs_mut().append_pair("comp", "acl"); + if let Some(timeout) = options.timeout { + url.query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/xml"); + request.insert_header("content-type", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// returns all user-defined metadata and system properties for the specified queue. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.getMetadata")] + pub async fn get_metadata( + &self, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + let mut path = String::from("{queueName}/"); + path = path.replace("{queueName}", &self.queue_name); + url = url.join(&path)?; + url.query_pairs_mut().append_pair("comp", "metadata"); + if let Some(timeout) = options.timeout { + url.query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// The Peek operation retrieves one or more messages from the front of the queue, + /// but does not alter the visibility of the message. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.peekMessages")] + pub async fn peek_messages( + &self, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + let mut path = String::from("{queueName}/messages"); + path = path.replace("{queueName}", &self.queue_name); + url = url.join(&path)?; + url.query_pairs_mut().append_pair("peekonly", "true"); + if let Some(number_of_messages) = options.number_of_messages { + url.query_pairs_mut() + .append_pair("numofmessages", &number_of_messages.to_string()); + } + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/xml"); + request.insert_header("content-type", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// The Dequeue operation retrieves one or more messages from the front of the + /// queue. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.receiveMessages")] + pub async fn receive_messages( + &self, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + let mut path = String::from("{queueName}/messages"); + path = path.replace("{queueName}", &self.queue_name); + url = url.join(&path)?; + if let Some(number_of_messages) = options.number_of_messages { + url.query_pairs_mut() + .append_pair("numofmessages", &number_of_messages.to_string()); + } + if let Some(timeout) = options.timeout { + url.query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + if let Some(visibility_timeout) = options.visibility_timeout { + url.query_pairs_mut() + .append_pair("visibilityTimeout", &visibility_timeout.to_string()); + } + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/xml"); + request.insert_header("content-type", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// The Enqueue operation adds a new message to the back of the message queue. A + /// visibility timeout can also be specified to make the message invisible until + /// the visibility timeout expires. A message must be in a format that can be + /// included in an XML request with UTF-8 encoding. The encoded message can be up + /// to 64 KB in size for versions 2011-08-18 and newer, or 8 KB in size for + /// previous versions. + /// + /// # Arguments + /// + /// * `queue_message` - A Message object which can be stored in a Queue + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.sendMessage")] + pub async fn send_message( + &self, + queue_message: RequestContent, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + let mut path = String::from("{queueName}/messages"); + path = path.replace("{queueName}", &self.queue_name); + url = url.join(&path)?; + if let Some(message_time_to_live) = options.message_time_to_live { + url.query_pairs_mut() + .append_pair("messageTtl", &message_time_to_live.to_string()); + } + if let Some(visibility_timeout) = options.visibility_timeout { + url.query_pairs_mut() + .append_pair("visibilityTimeout", &visibility_timeout.to_string()); + } + let mut request = Request::new(url, Method::Post); + request.insert_header("accept", "application/xml"); + request.insert_header("content-type", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + request.set_body(queue_message); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// sets the permissions for the specified queue. + /// + /// # Arguments + /// + /// * `queue_acl` - The access control list for the queue. + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.setAccessPolicy")] + pub async fn set_access_policy( + &self, + queue_acl: RequestContent, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + let mut path = String::from("{queueName}/"); + path = path.replace("{queueName}", &self.queue_name); + url = url.join(&path)?; + url.query_pairs_mut().append_pair("comp", "acl"); + if let Some(timeout) = options.timeout { + url.query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + let mut request = Request::new(url, Method::Put); + request.insert_header("accept", "application/xml"); + request.insert_header("content-type", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + request.set_body(queue_acl); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// operation sets one or more user-defined name-value pairs for the specified queue. + /// + /// # Arguments + /// + /// * `metadata` - The metadata headers. + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.setMetadata")] + pub async fn set_metadata( + &self, + metadata: HashMap, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + let mut path = String::from("{queueName}/"); + path = path.replace("{queueName}", &self.queue_name); + url = url.join(&path)?; + url.query_pairs_mut().append_pair("comp", "metadata"); + if let Some(timeout) = options.timeout { + url.query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + let mut request = Request::new(url, Method::Put); + request.insert_header("accept", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + for (k, v) in &metadata { + request.insert_header(format!("x-ms-meta-{k}"), v); + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// The Update operation was introduced with version 2011-08-18 of the Queue + /// service API. The Update Message operation updates the visibility timeout of a + /// message. You can also use this operation to update the contents of a message. A + /// message must be in a format that can be included in an XML request with UTF-8 + /// encoding, and the encoded message can be up to 64KB in size. + /// + /// # Arguments + /// + /// * `message_id` - The id of the queue message. + /// * `pop_receipt` - Required. Specifies the valid pop receipt value returned from an earlier call to the Get Messages or + /// Update Message operation. + /// * `visibility_timeout` - Specifies the new visibility timeout value, in seconds, relative to server time. The default + /// value is 30 seconds. A specified value must be larger than or equal to 1 second, and cannot be larger than 7 days, or + /// larger than 2 hours on REST protocol versions prior to version 2011-08-18. The visibility timeout of a message can be + /// set to a value later than the expiry time. + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.Queue.update")] + pub async fn update( + &self, + message_id: &str, + pop_receipt: &str, + visibility_timeout: i32, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + let mut path = String::from("{queueName}/messages/{messageId}"); + path = path.replace("{messageId}", message_id); + path = path.replace("{queueName}", &self.queue_name); + url = url.join(&path)?; + url.query_pairs_mut().append_pair("popReceipt", pop_receipt); + url.query_pairs_mut() + .append_pair("visibilityTimeout", &visibility_timeout.to_string()); + let mut request = Request::new(url, Method::Put); + request.insert_header("accept", "application/xml"); + request.insert_header("content-type", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + if let Some(queue_message) = options.queue_message { + request.set_body(queue_message); + } + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } +} + +impl Default for QueueClientOptions { + fn default() -> Self { + Self { + client_options: ClientOptions::default(), + version: String::from("2018-03-28"), + } + } +} diff --git a/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs b/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs new file mode 100644 index 0000000000..c9734cc1c1 --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/clients/queue_service_client.rs @@ -0,0 +1,329 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +use crate::generated::{ + clients::QueueClient, + models::{ + ListQueuesResponse, QueueServiceClientGetPropertiesOptions, + QueueServiceClientGetStatisticsOptions, QueueServiceClientListQueuesOptions, + QueueServiceClientSetPropertiesOptions, QueueServiceProperties, QueueServiceStats, + }, +}; +use azure_core::{ + credentials::TokenCredential, + error::{ErrorKind, HttpError}, + fmt::SafeDebug, + http::{ + policies::{BearerTokenCredentialPolicy, Policy}, + ClientOptions, Context, Method, NoFormat, PageIterator, PagerResult, PagerState, Pipeline, + RawResponse, Request, RequestContent, Response, Url, XmlFormat, + }, + tracing, xml, Error, Result, +}; +use std::sync::Arc; + +#[tracing::client] +pub struct QueueServiceClient { + pub(crate) endpoint: Url, + pub(crate) pipeline: Pipeline, + pub(crate) version: String, +} + +/// Options used when creating a `QueueServiceClient` +#[derive(Clone, SafeDebug)] +pub struct QueueServiceClientOptions { + /// Allows customization of the client. + pub client_options: ClientOptions, + /// Specifies the version of the operation to use for this request. + pub version: String, +} + +impl QueueServiceClient { + /// Creates a new QueueServiceClient, using Entra ID authentication. + /// + /// # Arguments + /// + /// * `endpoint` - Service host + /// * `credential` - An implementation of [`TokenCredential`](azure_core::credentials::TokenCredential) that can provide an + /// Entra ID token to use when authenticating. + /// * `options` - Optional configuration for the client. + #[tracing::new("azure_storage_queue")] + pub fn new( + endpoint: &str, + credential: Arc, + options: Option, + ) -> Result { + let options = options.unwrap_or_default(); + let mut endpoint = Url::parse(endpoint)?; + if !endpoint.scheme().starts_with("http") { + return Err(azure_core::Error::message( + azure_core::error::ErrorKind::Other, + format!("{endpoint} must use http(s)"), + )); + } + endpoint.set_query(None); + let auth_policy: Arc = Arc::new(BearerTokenCredentialPolicy::new( + credential, + vec!["https://storage.azure.com/.default"], + )); + Ok(Self { + endpoint, + version: options.version, + pipeline: Pipeline::new( + option_env!("CARGO_PKG_NAME"), + option_env!("CARGO_PKG_VERSION"), + options.client_options, + Vec::default(), + vec![auth_policy], + ), + }) + } + + /// Returns the Url associated with this client. + pub fn endpoint(&self) -> &Url { + &self.endpoint + } + + /// Retrieves properties of a storage account's Queue service, including properties for Storage Analytics and CORS (Cross-Origin + /// Resource Sharing) rules. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.getProperties")] + pub async fn get_properties( + &self, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + url.query_pairs_mut() + .append_pair("comp", "properties") + .append_pair("restype", "service"); + if let Some(timeout) = options.timeout { + url.query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/xml"); + request.insert_header("content-type", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// Returns a new instance of QueueClient. + /// + /// # Arguments + /// + /// * `queue_name` - The name of the queue. + #[tracing::subclient] + pub fn get_queue_client(&self, queue_name: String) -> QueueClient { + QueueClient { + endpoint: self.endpoint.clone(), + pipeline: self.pipeline.clone(), + queue_name, + version: self.version.clone(), + } + } + + /// Retrieves statistics related to replication for the Queue service. It is only available on the secondary location endpoint + /// when read-access geo-redundant replication is enabled for the storage account. + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.getStatistics")] + pub async fn get_statistics( + &self, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + url.query_pairs_mut() + .append_pair("comp", "stats") + .append_pair("restype", "service"); + if let Some(timeout) = options.timeout { + url.query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/xml"); + request.insert_header("content-type", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } + + /// returns a list of the queues under the specified account + /// + /// # Arguments + /// + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.getQueues")] + pub fn list_queues( + &self, + options: Option>, + ) -> Result>> { + let options = options.unwrap_or_default().into_owned(); + let pipeline = self.pipeline.clone(); + let mut first_url = self.endpoint.clone(); + first_url.query_pairs_mut().append_pair("comp", "list"); + if let Some(include) = options.include { + first_url.query_pairs_mut().append_pair( + "include", + &include + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(","), + ); + } + if let Some(marker) = options.marker { + first_url.query_pairs_mut().append_pair("marker", &marker); + } + if let Some(maxresults) = options.maxresults { + first_url + .query_pairs_mut() + .append_pair("maxresults", &maxresults.to_string()); + } + if let Some(prefix) = options.prefix { + first_url.query_pairs_mut().append_pair("prefix", &prefix); + } + if let Some(timeout) = options.timeout { + first_url + .query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + let version = self.version.clone(); + Ok(PageIterator::from_callback( + move |marker: PagerState| { + let mut url = first_url.clone(); + if let PagerState::More(marker) = marker { + if url.query_pairs().any(|(name, _)| name.eq("marker")) { + let mut new_url = url.clone(); + new_url + .query_pairs_mut() + .clear() + .extend_pairs(url.query_pairs().filter(|(name, _)| name.ne("marker"))); + url = new_url; + } + url.query_pairs_mut().append_pair("marker", &marker); + } + let mut request = Request::new(url, Method::Get); + request.insert_header("accept", "application/xml"); + request.insert_header("content-type", "application/xml"); + if let Some(client_request_id) = &options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &version); + let ctx = options.method_options.context.clone(); + let pipeline = pipeline.clone(); + async move { + let rsp: RawResponse = pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + let (status, headers, body) = rsp.deconstruct(); + let bytes = body.collect().await?; + let res: ListQueuesResponse = xml::read_xml(&bytes)?; + let rsp = RawResponse::from_bytes(status, headers, bytes).into(); + Ok(match res.next_marker { + Some(next_marker) if !next_marker.is_empty() => PagerResult::More { + response: rsp, + continuation: next_marker, + }, + _ => PagerResult::Done { response: rsp }, + }) + } + }, + )) + } + + /// Sets properties for a storage account's Queue service endpoint, including properties for Storage Analytics and CORS (Cross-Origin + /// Resource Sharing) rules + /// + /// # Arguments + /// + /// * `queue_service_properties` - The storage service properties to set. + /// * `options` - Optional parameters for the request. + #[tracing::function("Storage.Queues.setProperties")] + pub async fn set_properties( + &self, + queue_service_properties: RequestContent, + options: Option>, + ) -> Result> { + let options = options.unwrap_or_default(); + let ctx = Context::with_context(&options.method_options.context); + let mut url = self.endpoint.clone(); + url.query_pairs_mut() + .append_pair("comp", "properties") + .append_pair("restype", "service"); + if let Some(timeout) = options.timeout { + url.query_pairs_mut() + .append_pair("timeout", &timeout.to_string()); + } + let mut request = Request::new(url, Method::Put); + request.insert_header("accept", "application/xml"); + request.insert_header("content-type", "application/xml"); + if let Some(client_request_id) = options.client_request_id { + request.insert_header("x-ms-client-request-id", client_request_id); + } + request.insert_header("x-ms-version", &self.version); + request.set_body(queue_service_properties); + let rsp = self.pipeline.send(&ctx, &mut request).await?; + if !rsp.status().is_success() { + let status = rsp.status(); + let http_error = HttpError::new(rsp).await; + let error_kind = ErrorKind::http_response( + status, + http_error.error_code().map(std::borrow::ToOwned::to_owned), + ); + return Err(Error::new(error_kind, http_error)); + } + Ok(rsp.into()) + } +} + +impl Default for QueueServiceClientOptions { + fn default() -> Self { + Self { + client_options: ClientOptions::default(), + version: String::from("2018-03-28"), + } + } +} diff --git a/sdk/storage/azure_storage_queue/src/generated/mod.rs b/sdk/storage/azure_storage_queue/src/generated/mod.rs new file mode 100644 index 0000000000..c91d90d2bd --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/mod.rs @@ -0,0 +1,8 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +pub mod clients; +pub mod models; +pub use clients::{QueueClient, QueueClientOptions, QueueServiceClient, QueueServiceClientOptions}; diff --git a/sdk/storage/azure_storage_queue/src/generated/models/enums.rs b/sdk/storage/azure_storage_queue/src/generated/models/enums.rs new file mode 100644 index 0000000000..ed347cef40 --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/models/enums.rs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +use azure_core::{create_enum, create_extensible_enum}; + +create_extensible_enum!( + #[doc = r#"The geo replication status."#] + GeoReplicationStatusType, + #[doc = r#"The geo replication is bootstrap."#] + (Bootstrap, "bootstrap"), + #[doc = r#"The geo replication is live."#] + (Live, "live"), + #[doc = r#"The geo replication is unavailable."#] + (Unavailable, "unavailable") +); + +create_enum!( + #[doc = r#"Include this parameter to specify that the queue's metadata be returned as part of the response body."#] + ListQueuesIncludeType, + #[doc = r#"Include metadata"#] + (Metadata, "metadata") +); diff --git a/sdk/storage/azure_storage_queue/src/generated/models/header_traits.rs b/sdk/storage/azure_storage_queue/src/generated/models/header_traits.rs new file mode 100644 index 0000000000..7a40c8fcc6 --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/models/header_traits.rs @@ -0,0 +1,93 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +use super::{ + ListOfSignedIdentifier, QueueClientGetMetadataResult, QueueClientSetAccessPolicyResult, + QueueServiceStats, +}; +use azure_core::{ + http::{ + headers::{HeaderName, Headers}, + NoFormat, Response, XmlFormat, + }, + time::{parse_rfc7231, OffsetDateTime}, + Result, +}; +use std::collections::HashMap; + +const DATE: HeaderName = HeaderName::from_static("date"); +const META: &str = "x-ms-meta-"; + +/// Provides access to typed response headers for `QueueClient::get_access_policy()` +pub trait ListOfSignedIdentifierHeaders: private::Sealed { + fn date(&self) -> Result>; +} + +impl ListOfSignedIdentifierHeaders for Response { + /// UTC date/time value generated by the service that indicates the time at which the response was initiated + fn date(&self) -> Result> { + Headers::get_optional_with(self.headers(), &DATE, |h| parse_rfc7231(h.as_str())) + } +} + +/// Provides access to typed response headers for `QueueClient::get_metadata()` +pub trait QueueClientGetMetadataResultHeaders: private::Sealed { + fn metadata(&self) -> Result>; +} + +impl QueueClientGetMetadataResultHeaders for Response { + /// The metadata headers. + fn metadata(&self) -> Result> { + let mut values = HashMap::new(); + for h in self.headers().iter() { + let name = h.0.as_str(); + if name.len() > META.len() && name.starts_with(META) { + values.insert(name[META.len()..].to_owned(), h.1.as_str().to_owned()); + } + } + Ok(values) + } +} + +/// Provides access to typed response headers for `QueueClient::set_access_policy()` +pub trait QueueClientSetAccessPolicyResultHeaders: private::Sealed { + fn date(&self) -> Result>; +} + +impl QueueClientSetAccessPolicyResultHeaders + for Response +{ + /// UTC date/time value generated by the service that indicates the time at which the response was initiated + fn date(&self) -> Result> { + Headers::get_optional_with(self.headers(), &DATE, |h| parse_rfc7231(h.as_str())) + } +} + +/// Provides access to typed response headers for `QueueServiceClient::get_statistics()` +pub trait QueueServiceStatsHeaders: private::Sealed { + fn date(&self) -> Result>; +} + +impl QueueServiceStatsHeaders for Response { + /// UTC date/time value generated by the service that indicates the time at which the response was initiated + fn date(&self) -> Result> { + Headers::get_optional_with(self.headers(), &DATE, |h| parse_rfc7231(h.as_str())) + } +} + +mod private { + use super::{ + ListOfSignedIdentifier, QueueClientGetMetadataResult, QueueClientSetAccessPolicyResult, + QueueServiceStats, + }; + use azure_core::http::{NoFormat, Response, XmlFormat}; + + pub trait Sealed {} + + impl Sealed for Response {} + impl Sealed for Response {} + impl Sealed for Response {} + impl Sealed for Response {} +} diff --git a/sdk/storage/azure_storage_queue/src/generated/models/method_options.rs b/sdk/storage/azure_storage_queue/src/generated/models/method_options.rs new file mode 100644 index 0000000000..02a260642a --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/models/method_options.rs @@ -0,0 +1,276 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +use super::{ListQueuesIncludeType, QueueMessage}; +use azure_core::{ + fmt::SafeDebug, + http::{ClientMethodOptions, RequestContent, XmlFormat}, +}; +use std::collections::HashMap; + +/// Options to be passed to `QueueClient::clear()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientClearOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, +} + +/// Options to be passed to `QueueClient::create()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientCreateOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// The metadata headers. + pub metadata: Option>, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, +} + +/// Options to be passed to `QueueClient::delete()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientDeleteOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// The metadata headers. + pub metadata: Option>, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, +} + +/// Options to be passed to `QueueClient::delete_message()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientDeleteMessageOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, +} + +/// Options to be passed to `QueueClient::get_access_policy()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientGetAccessPolicyOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, +} + +/// Options to be passed to `QueueClient::get_metadata()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientGetMetadataOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, +} + +/// Options to be passed to `QueueClient::peek_messages()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientPeekMessagesOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// Optional. A nonzero integer value that specifies the number of messages to + /// retrieve from the queue, up to a maximum of 32. If fewer are visible, the + /// visible messages are returned. By default, a single message is retrieved from + /// the queue with this operation. + pub number_of_messages: Option, +} + +/// Options to be passed to `QueueClient::receive_messages()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientReceiveMessagesOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// Optional. A nonzero integer value that specifies the number of messages to + /// retrieve from the queue, up to a maximum of 32. If fewer are visible, the + /// visible messages are returned. By default, a single message is retrieved from + /// the queue with this operation. + pub number_of_messages: Option, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, + + /// Specifies the new visibility timeout value, in seconds, relative to server time. The default value is 30 seconds. A specified + /// value must be larger than or equal to 1 second, and cannot be larger than 7 days, or larger than 2 hours on REST protocol + /// versions prior to version 2011-08-18. The visibility timeout of a message can be set to a value later than the expiry + /// time. + pub visibility_timeout: Option, +} + +/// Options to be passed to `QueueClient::send_message()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientSendMessageOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Optional. Specifies the time-to-live interval for the message, in seconds. + /// Prior to version 2017-07-29, the maximum time-to-live allowed is 7 days. For + /// version 2017-07-29 or later, the maximum time-to-live can be any positive + /// number, as well as -1 indicating that the message does not expire. If this + /// parameter is omitted, the default time-to-live is 7 days. + pub message_time_to_live: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// Specifies the new visibility timeout value, in seconds, relative to server time. The default value is 30 seconds. A specified + /// value must be larger than or equal to 1 second, and cannot be larger than 7 days, or larger than 2 hours on REST protocol + /// versions prior to version 2011-08-18. The visibility timeout of a message can be set to a value later than the expiry + /// time. + pub visibility_timeout: Option, +} + +/// Options to be passed to `QueueClient::set_access_policy()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientSetAccessPolicyOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, +} + +/// Options to be passed to `QueueClient::set_metadata()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientSetMetadataOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, +} + +/// Options to be passed to `QueueClient::update()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueClientUpdateOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// A Message object which can be stored in a Queue + pub queue_message: Option>, +} + +/// Options to be passed to `QueueServiceClient::get_properties()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueServiceClientGetPropertiesOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, +} + +/// Options to be passed to `QueueServiceClient::get_statistics()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueServiceClientGetStatisticsOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, +} + +/// Options to be passed to `QueueServiceClient::list_queues()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueServiceClientListQueuesOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Include this parameter to specify that the queue's metadata be returned as part of the response body. + pub include: Option>, + + /// A string value that identifies the portion of the list of queues to be returned with the next listing operation. The operation + /// returns the NextMarker value within the response body if the listing operation did not return all queues remaining to + /// be listed with the current page. The NextMarker value can be used as the value for the marker parameter in a subsequent + /// call to request the next page of list items. The marker value is opaque to the client. + pub marker: Option, + + /// Specifies the maximum number of queues to return. If the request does not specify maxresults, or specifies a value greater + /// than 5000, the server will return up to 5000 items. + pub maxresults: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// Filters the results to return only queues whose name begins with the specified prefix. + pub prefix: Option, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, +} + +impl QueueServiceClientListQueuesOptions<'_> { + pub fn into_owned(self) -> QueueServiceClientListQueuesOptions<'static> { + QueueServiceClientListQueuesOptions { + client_request_id: self.client_request_id, + include: self.include, + marker: self.marker, + maxresults: self.maxresults, + method_options: ClientMethodOptions { + context: self.method_options.context.into_owned(), + }, + prefix: self.prefix, + timeout: self.timeout, + } + } +} + +/// Options to be passed to `QueueServiceClient::set_properties()` +#[derive(Clone, Default, SafeDebug)] +pub struct QueueServiceClientSetPropertiesOptions<'a> { + /// An opaque, globally-unique, client-generated string identifier for the request. + pub client_request_id: Option, + + /// Allows customization of the method call. + pub method_options: ClientMethodOptions<'a>, + + /// The timeout parameter is expressed in seconds. For more information, see [Setting Timeouts for Queue Service Operations.](https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations) + pub timeout: Option, +} diff --git a/sdk/storage/azure_storage_queue/src/generated/models/mod.rs b/sdk/storage/azure_storage_queue/src/generated/models/mod.rs new file mode 100644 index 0000000000..a7a0c14596 --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/models/mod.rs @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +mod enums; +mod header_traits; +mod method_options; +mod models_impl; +mod pub_models; +pub use enums::*; +pub use header_traits::*; +pub use method_options::*; +pub use pub_models::*; +pub(crate) mod xml_helpers; diff --git a/sdk/storage/azure_storage_queue/src/generated/models/models_impl.rs b/sdk/storage/azure_storage_queue/src/generated/models/models_impl.rs new file mode 100644 index 0000000000..297cce959e --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/models/models_impl.rs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +use super::{ListOfSignedIdentifier, QueueMessage, QueueServiceProperties}; +use azure_core::{ + http::{RequestContent, XmlFormat}, + xml::to_xml, + Result, +}; + +impl TryFrom for RequestContent { + type Error = azure_core::Error; + fn try_from(value: ListOfSignedIdentifier) -> Result { + RequestContent::try_from(to_xml(&value)?) + } +} + +impl TryFrom for RequestContent { + type Error = azure_core::Error; + fn try_from(value: QueueMessage) -> Result { + RequestContent::try_from(to_xml(&value)?) + } +} + +impl TryFrom for RequestContent { + type Error = azure_core::Error; + fn try_from(value: QueueServiceProperties) -> Result { + RequestContent::try_from(to_xml(&value)?) + } +} diff --git a/sdk/storage/azure_storage_queue/src/generated/models/pub_models.rs b/sdk/storage/azure_storage_queue/src/generated/models/pub_models.rs new file mode 100644 index 0000000000..e50e2c38d8 --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/models/pub_models.rs @@ -0,0 +1,419 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +use super::{ + xml_helpers::{CorsCorsRule, Queue_itemsQueue}, + GeoReplicationStatusType, +}; +use azure_core::{fmt::SafeDebug, time::OffsetDateTime}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Represents an access policy. +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +pub struct AccessPolicy { + /// The date-time the policy expires. + #[serde( + default, + rename = "Expiry", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc3339::option" + )] + pub expiry: Option, + + /// The permissions for acl the policy. + #[serde(rename = "Permission", skip_serializing_if = "Option::is_none")] + pub permission: Option, + + /// The date-time the policy is active. + #[serde( + default, + rename = "Start", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc3339::option" + )] + pub start: Option, +} + +/// CORS is an HTTP feature that enables a web application running under one domain to access resources in another domain. +/// Web browsers implement a security restriction known as same-origin policy that prevents a web page from calling APIs in +/// a different domain; CORS provides a secure way to allow one domain (the origin domain) to call APIs in another domain +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +pub struct CorsRule { + /// The allowed headers. + #[serde(rename = "AllowedHeaders", skip_serializing_if = "Option::is_none")] + pub allowed_headers: Option, + + /// The allowed methods. + #[serde(rename = "AllowedMethods", skip_serializing_if = "Option::is_none")] + pub allowed_methods: Option, + + /// The allowed origins. + #[serde(rename = "AllowedOrigins", skip_serializing_if = "Option::is_none")] + pub allowed_origins: Option, + + /// The exposed headers. + #[serde(rename = "ExposedHeaders", skip_serializing_if = "Option::is_none")] + pub exposed_headers: Option, + + /// The maximum age in seconds. + #[serde(rename = "MaxAgeInSeconds", skip_serializing_if = "Option::is_none")] + pub max_age_in_seconds: Option, +} + +/// Geo-Replication information for the Secondary Storage Service +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[non_exhaustive] +pub struct GeoReplication { + /// A GMT date/time value, to the second. All primary writes preceding this value are guaranteed to be available for read + /// operations at the secondary. Primary writes after this point in time may or may not be available for reads. + #[serde( + default, + rename = "LastSyncTime", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc7231::option" + )] + pub last_sync_time: Option, + + /// The status of the secondary location + #[serde(rename = "Status", skip_serializing_if = "Option::is_none")] + pub status: Option, +} + +/// List wrapper for PeekedMessageItem array +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[non_exhaustive] +#[serde(rename = "QueueMessagesList")] +pub struct ListOfPeekedMessage { + /// The list of peeked messages. + #[serde(rename = "QueueMessage", skip_serializing_if = "Option::is_none")] + pub items: Option>, +} + +/// List wrapper for DequeuedMessageItem array +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[non_exhaustive] +#[serde(rename = "QueueMessagesList")] +pub struct ListOfReceivedMessage { + /// The list of dequeued messages. + #[serde(rename = "QueueMessage", skip_serializing_if = "Option::is_none")] + pub items: Option>, +} + +/// List wrapper for EnqueuedMessage array +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[non_exhaustive] +#[serde(rename = "QueueMessagesList")] +pub struct ListOfSentMessage { + /// The list of enqueued messages. + #[serde(rename = "QueueMessage", skip_serializing_if = "Option::is_none")] + pub items: Option>, +} + +/// Represents an array of signed identifiers +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[serde(rename = "SignedIdentifiers")] +pub struct ListOfSignedIdentifier { + /// The list of signed identifiers. + #[serde(rename = "SignedIdentifier", skip_serializing_if = "Option::is_none")] + pub items: Option>, +} + +/// The list queue segment response +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[non_exhaustive] +#[serde(rename = "EnumerationResults")] +pub struct ListQueuesResponse { + /// The marker of the queues. + #[serde(rename = "Marker", skip_serializing_if = "Option::is_none")] + pub marker: Option, + + /// The max results of the queues. + #[serde(rename = "MaxResults", skip_serializing_if = "Option::is_none")] + pub max_results: Option, + + /// The next marker of the queues. + #[serde(rename = "NextMarker", skip_serializing_if = "Option::is_none")] + pub next_marker: Option, + + /// The prefix of the queues. + #[serde(rename = "Prefix", skip_serializing_if = "Option::is_none")] + pub prefix: Option, + + /// The queue segment. + #[serde( + default, + deserialize_with = "Queue_itemsQueue::unwrap", + rename = "Queues", + serialize_with = "Queue_itemsQueue::wrap" + )] + pub queue_items: Vec, + + /// The service endpoint. + #[serde(rename = "@ServiceEndpoint", skip_serializing_if = "Option::is_none")] + pub service_endpoint: Option, +} + +/// Azure Analytics Logging settings. +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +pub struct Logging { + /// Whether delete operation is logged. + #[serde(rename = "Delete", skip_serializing_if = "Option::is_none")] + pub delete: Option, + + /// Whether read operation is logged. + #[serde(rename = "Read", skip_serializing_if = "Option::is_none")] + pub read: Option, + + /// The retention policy of the logs. + #[serde(rename = "RetentionPolicy", skip_serializing_if = "Option::is_none")] + pub retention_policy: Option, + + /// The version of the logging properties. + #[serde(rename = "Version", skip_serializing_if = "Option::is_none")] + pub version: Option, + + /// Whether write operation is logged. + #[serde(rename = "Write", skip_serializing_if = "Option::is_none")] + pub write: Option, +} + +/// The metrics properties. +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +pub struct Metrics { + /// Whether it is enabled. + #[serde(rename = "Enabled", skip_serializing_if = "Option::is_none")] + pub enabled: Option, + + /// Whether to include API in the metrics. + #[serde(rename = "IncludeAPIs", skip_serializing_if = "Option::is_none")] + pub include_apis: Option, + + /// The retention policy of the metrics. + #[serde(rename = "RetentionPolicy", skip_serializing_if = "Option::is_none")] + pub retention_policy: Option, + + /// The version of the metrics properties. + #[serde(rename = "Version", skip_serializing_if = "Option::is_none")] + pub version: Option, +} + +/// The object returned in the QueueMessageList array when calling Peek Messages on +/// a Queue +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[non_exhaustive] +pub struct PeekedMessage { + /// The number of times the message has been dequeued. + #[serde(rename = "DequeueCount", skip_serializing_if = "Option::is_none")] + pub dequeue_count: Option, + + /// The time that the Message will expire and be automatically deleted. + #[serde( + default, + rename = "ExpirationTime", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc7231::option" + )] + pub expiration_time: Option, + + /// The time the Message was inserted into the Queue. + #[serde( + default, + rename = "InsertionTime", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc7231::option" + )] + pub insertion_time: Option, + + /// The Id of the Message. + #[serde(rename = "MessageId", skip_serializing_if = "Option::is_none")] + pub message_id: Option, + + /// The content of the Message. + #[serde(rename = "MessageText", skip_serializing_if = "Option::is_none")] + pub message_text: Option, +} + +/// Contains results for `QueueClient::get_metadata()` +#[derive(SafeDebug)] +pub struct QueueClientGetMetadataResult; + +/// Contains results for `QueueClient::set_access_policy()` +#[derive(SafeDebug)] +pub struct QueueClientSetAccessPolicyResult; + +/// An Azure Storage Queue. +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[non_exhaustive] +#[serde(rename = "Queue")] +pub struct QueueItem { + /// The metadata of the container. + #[serde(rename = "Metadata", skip_serializing_if = "Option::is_none")] + pub metadata: Option>, + + /// The name of the queue. + #[serde(rename = "Name", skip_serializing_if = "Option::is_none")] + pub name: Option, +} + +/// A Message object which can be stored in a Queue +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +pub struct QueueMessage { + /// The content of the message + #[serde(rename = "MessageText", skip_serializing_if = "Option::is_none")] + pub message_text: Option, +} + +/// The service properties. +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[serde(rename = "StorageServiceProperties")] +pub struct QueueServiceProperties { + /// The CORS properties. + #[serde( + default, + deserialize_with = "CorsCorsRule::unwrap", + rename = "Cors", + serialize_with = "CorsCorsRule::wrap", + skip_serializing_if = "Option::is_none" + )] + pub cors: Option>, + + /// The hour metrics properties. + #[serde(rename = "HourMetrics", skip_serializing_if = "Option::is_none")] + pub hour_metrics: Option, + + /// The logging properties. + #[serde(rename = "Logging", skip_serializing_if = "Option::is_none")] + pub logging: Option, + + /// The minute metrics properties. + #[serde(rename = "MinuteMetrics", skip_serializing_if = "Option::is_none")] + pub minute_metrics: Option, +} + +/// Stats for the storage service. +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[non_exhaustive] +pub struct QueueServiceStats { + /// The geo replication stats. + #[serde(rename = "GeoReplication", skip_serializing_if = "Option::is_none")] + pub geo_replication: Option, +} + +/// The object returned in the QueueMessageList array when calling Get Messages on +/// a Queue. +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[non_exhaustive] +pub struct ReceivedMessage { + /// The number of times the message has been dequeued. + #[serde(rename = "DequeueCount", skip_serializing_if = "Option::is_none")] + pub dequeue_count: Option, + + /// The time that the Message will expire and be automatically deleted. + #[serde( + default, + rename = "ExpirationTime", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc7231::option" + )] + pub expiration_time: Option, + + /// The time the Message was inserted into the Queue. + #[serde( + default, + rename = "InsertionTime", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc7231::option" + )] + pub insertion_time: Option, + + /// The Id of the Message. + #[serde(rename = "MessageId", skip_serializing_if = "Option::is_none")] + pub message_id: Option, + + /// The content of the message + #[serde(rename = "MessageText", skip_serializing_if = "Option::is_none")] + pub message_text: Option, + + /// This value is required to delete the Message. If deletion fails using this + /// PopReceipt then the message has been dequeued by another client. + #[serde(rename = "PopReceipt", skip_serializing_if = "Option::is_none")] + pub pop_receipt: Option, + + /// The time that the message will again become visible in the Queue. + #[serde( + default, + rename = "TimeNextVisible", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc7231::option" + )] + pub time_next_visible: Option, +} + +/// The retention policy. +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +pub struct RetentionPolicy { + /// The number of days to retain the logs. + #[serde(rename = "Days", skip_serializing_if = "Option::is_none")] + pub days: Option, + + /// Whether to enable the retention policy. + #[serde(rename = "Enabled", skip_serializing_if = "Option::is_none")] + pub enabled: Option, +} + +/// The object returned in the QueueMessageList array when calling Put Message on a +/// Queue +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +#[non_exhaustive] +pub struct SentMessage { + /// The time that the Message will expire and be automatically deleted. + #[serde( + default, + rename = "ExpirationTime", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc7231::option" + )] + pub expiration_time: Option, + + /// The time the Message was inserted into the Queue. + #[serde( + default, + rename = "InsertionTime", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc7231::option" + )] + pub insertion_time: Option, + + /// The Id of the Message. + #[serde(rename = "MessageId", skip_serializing_if = "Option::is_none")] + pub message_id: Option, + + /// This value is required to delete the Message. If deletion fails using this + /// PopReceipt then the message has been dequeued by another client. + #[serde(rename = "PopReceipt", skip_serializing_if = "Option::is_none")] + pub pop_receipt: Option, + + /// The time that the message will again become visible in the Queue. + #[serde( + default, + rename = "TimeNextVisible", + skip_serializing_if = "Option::is_none", + with = "azure_core::time::rfc7231::option" + )] + pub time_next_visible: Option, +} + +/// The signed identifier. +#[derive(Clone, Default, Deserialize, SafeDebug, Serialize)] +pub struct SignedIdentifier { + /// The access policy for the signed identifier. + #[serde(rename = "AccessPolicy", skip_serializing_if = "Option::is_none")] + pub access_policy: Option, + + /// The unique ID for the signed identifier. + #[serde(rename = "Id", skip_serializing_if = "Option::is_none")] + pub id: Option, +} diff --git a/sdk/storage/azure_storage_queue/src/generated/models/xml_helpers.rs b/sdk/storage/azure_storage_queue/src/generated/models/xml_helpers.rs new file mode 100644 index 0000000000..c172925e26 --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/generated/models/xml_helpers.rs @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. DO NOT EDIT. + +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] + +use super::{CorsRule, QueueItem}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +#[derive(Deserialize, Serialize)] +#[serde(rename = "Cors")] +pub(crate) struct CorsCorsRule { + #[serde(default)] + CorsRule: Option>, +} + +impl CorsCorsRule { + pub fn unwrap<'de, D>(deserializer: D) -> Result>, D::Error> + where + D: Deserializer<'de>, + { + Ok(CorsCorsRule::deserialize(deserializer)?.CorsRule) + } + + pub fn wrap(to_serialize: &Option>, serializer: S) -> Result + where + S: Serializer, + { + CorsCorsRule { + CorsRule: to_serialize.to_owned(), + } + .serialize(serializer) + } +} + +#[derive(Deserialize, Serialize)] +#[serde(rename = "Queues")] +pub(crate) struct Queue_itemsQueue { + #[serde(default)] + Queue: Vec, +} + +impl Queue_itemsQueue { + pub fn unwrap<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + Ok(Queue_itemsQueue::deserialize(deserializer)?.Queue) + } + + pub fn wrap(to_serialize: &Vec, serializer: S) -> Result + where + S: Serializer, + { + Queue_itemsQueue { + Queue: to_serialize.to_owned(), + } + .serialize(serializer) + } +} diff --git a/sdk/storage/azure_storage_queue/src/lib.rs b/sdk/storage/azure_storage_queue/src/lib.rs new file mode 100644 index 0000000000..fa3b1ebd26 --- /dev/null +++ b/sdk/storage/azure_storage_queue/src/lib.rs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) Rust Code Generator. + +#![doc = include_str!("../README.md")] +#![allow(dead_code)] +#![allow(unused_imports)] + +#[expect(deprecated, reason = "requires emitter update")] +mod generated; + +pub mod models { + pub use crate::generated::models::*; +} + +mod clients; +pub use crate::clients::QueueClient; +pub use crate::clients::QueueServiceClient; +pub use crate::generated::clients::{QueueClientOptions, QueueServiceClientOptions}; diff --git a/sdk/storage/azure_storage_queue/tests/queue_client.rs b/sdk/storage/azure_storage_queue/tests/queue_client.rs new file mode 100644 index 0000000000..b1539bbcc3 --- /dev/null +++ b/sdk/storage/azure_storage_queue/tests/queue_client.rs @@ -0,0 +1,531 @@ +use azure_core::http::{ + RequestContent, {ClientOptions, Response}, +}; +use azure_core::Result; +use azure_core_test::{recorded, Recording, TestContext}; +use azure_storage_queue::{ + models::{ + QueueClientPeekMessagesOptions, QueueClientReceiveMessagesOptions, + QueueClientUpdateOptions, QueueMessage, + }, + QueueClient, QueueClientOptions, +}; + +use std::collections::HashMap; + +/// Creates a new queue under the given account. +#[recorded::test] +async fn test_create_queue(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-create-queue").await?; + + let response = queue_client.create(None).await?; + let test_result = async { + assert_successful_response(&response); + Ok::<(), azure_core::Error>(()) + } + .await; + + // Clean up by deleting the queue - this always executes + queue_client.delete(None).await.unwrap(); + + test_result?; + + Ok(()) +} + +/// Sends a message to the specified queue. +#[recorded::test] +async fn test_send_message(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-send-message").await?; + queue_client.create(None).await?; + let queue_message = QueueMessage { + message_text: Some("send_message".to_string()), + }; + + let test_result = async { + let response = queue_client + .send_message(queue_message.try_into()?, None) + .await?; + + assert!( + response.status() == 201, + "Expected status code 201, got {}", + response.status(), + ); + Ok::<(), azure_core::Error>(()) + } + .await; + + // Clean up by deleting the queue - this always executes + queue_client.delete(None).await.unwrap(); + + test_result?; + + Ok(()) +} + +/// Tests the deletion of a queue in Azure Storage Queue service. +#[recorded::test] +async fn test_delete_queue(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-delete-queue").await?; + + queue_client.create(None).await?; + + let response = queue_client.delete(None).await?; + + assert!( + response.status() == 204, + "Expected status code 204, got {}", + response.status(), + ); + Ok(()) +} + +/// Checks if a queue exists in the Azure Storage Queue service. +#[recorded::test] +async fn test_queue_exists(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-queue-exists").await?; + queue_client.create(None).await?; + + let test_result = async { + // Check if the queue exists + let exists_response = queue_client.exists().await?; + assert!(exists_response, "Queue should exist"); + + Ok::<(), azure_core::Error>(()) + } + .await; + + queue_client.delete(None).await?; + + // Check a non-existent queue + let non_existent_exists_response = queue_client.exists().await?; + assert!(!non_existent_exists_response, "Queue should not exist"); + + // Return the test result + test_result?; + + Ok(()) +} + +/// Sets metadata for a queue in Azure Storage Queue service. +#[recorded::test] +async fn test_set_metadata(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-queue-metadata").await?; + queue_client.create(None).await?; + + let test_result = async { + // Set metadata for the queue + + let response = queue_client + .set_metadata( + HashMap::from([ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ]), + None, + ) + .await?; + + assert_successful_response(&response); + + Ok::<(), azure_core::Error>(()) + } + .await; + + queue_client.delete(None).await?; + + // Return the test result + test_result?; + Ok(()) +} + +/// Clears all messages from a queue in Azure Storage Queue service. +#[recorded::test] +async fn test_clear_messages(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-clear-messages").await?; + queue_client.create(None).await?; + + // Run the test logic and ensure cleanup always happens + let test_result = async { + // Clear messages from the queue + let response = queue_client.clear(None).await?; + assert_successful_response(&response); + + Ok::<(), azure_core::Error>(()) + } + .await; + + // Clean up by deleting the queue - this always executes + queue_client.delete(None).await.unwrap(); + + // Return the test result + test_result?; + Ok(()) +} + +#[recorded::test] +async fn test_delete_message(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-delete-message").await?; + queue_client.create(None).await?; + + // Run the test logic and ensure cleanup always happens + let test_result = async { + // Send a message to the queue + // Note: The message ID and pop receipt are required for deletion, so we need to capture them. + let sent_message_response = queue_client + .send_message( + QueueMessage { + message_text: Some( + "Example message created from Rust, ready for deletion".to_string(), + ), + } + .try_into()?, + None, + ) + .await?; + + let send_message = sent_message_response.into_body().await?; + + let delete_response = queue_client + .delete_message( + &send_message.message_id.unwrap(), + &send_message.pop_receipt.unwrap(), + None, + ) + .await?; + assert_successful_response(&delete_response); + Ok::<(), azure_core::Error>(()) + } + .await; + + // Clean up by deleting the queue - this always executes + queue_client.delete(None).await.unwrap(); + + // Return the test result + test_result?; + Ok(()) +} + +#[recorded::test] +async fn test_update_message(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-update-message").await?; + queue_client.create(None).await?; + + // Run the test logic and ensure cleanup always happens + let test_result = async { + // Send a message to the queue + let send_message_response = queue_client + .send_message( + QueueMessage { + message_text: Some( + "Example message created from Rust, ready for update".to_string(), + ), + } + .try_into()?, + None, + ) + .await?; + + let sent_message = send_message_response.into_body().await?; + + // Update the message in the queue + let option = Some(QueueClientUpdateOptions { + queue_message: Some(RequestContent::from( + quick_xml::se::to_string(&QueueMessage { + message_text: Some("Updated message text from Rust".to_string()), + }) + .unwrap() + .into_bytes(), + )), + ..Default::default() + }); + + // Update the message in the queue + let update_response = queue_client + .update_message( + &sent_message.message_id.unwrap(), + &sent_message.pop_receipt.unwrap(), + 10, + option, + ) + .await?; + assert!( + update_response.status().is_success(), + "Expected successful status code, got {}", + update_response.status(), + ); + Ok::<(), azure_core::Error>(()) + } + .await; + + // Clean up by deleting the queue - this always executes + queue_client.delete(None).await.unwrap(); + + // Return the test result + test_result?; + Ok(()) +} + +/// Attempts to peek messages from an empty queue in Azure Storage Queue service. +#[recorded::test] +async fn test_peek_messages_empty(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-peek-messages-empty").await?; + queue_client.create(None).await?; + + // Run the test logic and ensure cleanup always happens + let test_result = async { + let response = queue_client.peek_messages(None).await?; + assert_successful_response(&response); + + let messages = response.into_body().await?; + + assert!( + messages.items.is_none(), + "Expected to receive no messages, but got Some" + ); + + Ok::<(), azure_core::Error>(()) + } + .await; + + // Clean up by deleting the queue - this always executes + queue_client.delete(None).await.unwrap(); + + test_result +} + +/// Receives all messages from a queue in Azure Storage Queue service. +#[recorded::test] +async fn test_peek_messages(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-peek-messages").await?; + let test_messages = ["Message 1", "Message 2"]; + + // Setup test queue with messages + setup_test_queue_with_messages(&queue_client, &test_messages).await?; + + // Run the test logic and ensure cleanup always happens + let test_result = async { + let options = Some(QueueClientPeekMessagesOptions { + number_of_messages: Some(10), + ..Default::default() + }); + + peek_and_assert( + &queue_client, + &test_messages, + test_messages.len(), + options.clone(), + ) + .await?; + + // The messages should not have been dequeued, so we can peek again + // and expect to receive both messages this time. + peek_and_assert( + &queue_client, + &test_messages, + test_messages.len(), + options.clone(), + ) + .await?; + + Ok(()) + } + .await; + + // Clean up by deleting the queue - this always executes + queue_client.delete(None).await.unwrap(); + + test_result +} + +/// Attempts to receive messages from an empty queue in Azure Storage Queue service. +#[recorded::test] +async fn test_receive_messages_empty(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-receive-messages-empty").await?; + queue_client.create(None).await?; + + // Run the test logic and ensure cleanup always happens + let test_result = async { + let response = queue_client.receive_messages(None).await?; + assert_successful_response(&response); + + let messages = response.into_body().await?; + + assert!( + messages.items.is_none(), + "Expected to dequeue no messages, but got Some" + ); + + Ok::<(), azure_core::Error>(()) + } + .await; + + // Clean up by deleting the queue - this always executes + queue_client.delete(None).await.unwrap(); + + test_result +} + +/// Dequeues all messages from a queue in Azure Storage Queue service. +#[recorded::test] +async fn test_receive_messages(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_client = get_queue_client(recording, "test-receive-messages").await?; + let test_messages = ["Message 1", "Message 2"]; + + // Setup test queue with messages + setup_test_queue_with_messages(&queue_client, &test_messages).await?; + + // Run the test logic and ensure cleanup always happens + let test_result = async { + let options = Some(QueueClientReceiveMessagesOptions { + number_of_messages: Some(10), + ..Default::default() + }); + + let response = queue_client.receive_messages(options).await?; + assert_successful_response(&response); + + let messages = response.into_body().await?; + let messages = messages.items.unwrap(); + + assert_eq!( + messages.len(), + test_messages.len(), + "Expected to dequeue {} messages, got {}", + test_messages.len(), + messages.len() + ); + + // Verify messages are received in order + for (i, message) in messages.iter().enumerate() { + assert_message_text(message.message_text.clone(), test_messages[i], i); + } + + Ok::<(), azure_core::Error>(()) + } + .await; + + // Clean up by deleting the queue - this always executes + queue_client.delete(None).await.unwrap(); + + test_result +} + +/// Returns an instance of a QueueClient. +/// +/// # Arguments +/// +/// * `recording` - A reference to a Recording instance. +pub async fn get_queue_client(recording: &Recording, queue_name: &str) -> Result { + let (options, endpoint) = recorded_test_setup(recording); + let queue_client_options = QueueClientOptions { + client_options: options.clone(), + ..Default::default() + }; + let queue_client = QueueClient::new( + &endpoint, + queue_name, + recording.credential(), + Option::Some(queue_client_options), + )?; + + Ok(queue_client) +} + +/// Takes in a Recording instance and returns an instrumented options bag and endpoint. +/// +/// # Arguments +/// +/// * `recording` - A reference to a Recording instance. +fn recorded_test_setup(recording: &Recording) -> (ClientOptions, String) { + let mut client_options = ClientOptions::default(); + recording.instrument(&mut client_options); + let endpoint = format!( + "https://{}.queue.core.windows.net/", + recording + .var("AZURE_QUEUE_STORAGE_ACCOUNT_NAME", None) + .as_str() + ); + + (client_options, endpoint) +} + +/// Helper function to set up a test queue with messages +async fn setup_test_queue_with_messages( + queue_client: &QueueClient, + messages: &[&str], +) -> Result<()> { + queue_client.create(None).await?; + for message in messages { + let queue_message = QueueMessage { + message_text: Some(message.to_string()), + }; + queue_client + .send_message(queue_message.try_into()?, None) + .await?; + } + Ok(()) +} + +/// Helper function to verify a successful response +fn assert_successful_response(response: &Response) { + assert!( + response.status().is_success(), + "Expected successful status code, got {}", + response.status() + ); +} + +/// Helper function to verify message contents +fn assert_message_text(actual: Option, expected: &str, message_index: usize) { + let actual = actual.unwrap(); + assert!( + actual == expected, + "Message at index {} has wrong text. Expected '{}', got '{}'", + message_index, + expected, + actual + ); +} + +async fn peek_and_assert<'a>( + queue_client: &QueueClient, + expected_messages: &[&str], + count: usize, + options: Option>, +) -> Result<()> { + // Peek the messages in the queue + let response = queue_client.peek_messages(options).await?; + assert_successful_response(&response); + + let messages = response.into_body().await?; + let messages = messages.items.unwrap(); + + assert_eq!( + messages.len(), + count, + "Expected to receive exactly {} messages, got {}", + count, + messages.len() + ); + + // Assert each message matches the expected text + for (i, message) in messages.iter().enumerate() { + assert_message_text(message.message_text.clone(), expected_messages[i], i); + } + + Ok(()) +} diff --git a/sdk/storage/azure_storage_queue/tests/queue_service_client.rs b/sdk/storage/azure_storage_queue/tests/queue_service_client.rs new file mode 100644 index 0000000000..698ca80c99 --- /dev/null +++ b/sdk/storage/azure_storage_queue/tests/queue_service_client.rs @@ -0,0 +1,251 @@ +use azure_core::http::{ + RequestContent, {ClientOptions, Response}, +}; +use azure_core::Result; +use azure_core_test::{recorded, Recording, TestContext}; +use azure_storage_queue::{ + models::{GeoReplicationStatusType, QueueServiceClientListQueuesOptions}, + QueueServiceClient, QueueServiceClientOptions, +}; +use futures::StreamExt; + +use std::option::Option; + +/// Creates a new queue under the given account. +#[recorded::test] +async fn test_create_queue(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_service_client = get_queue_service_client(recording).await?; + + let response = queue_service_client + .create_queue("test-service-create-queue", None) + .await?; + let test_result = async { + assert_successful_response(&response); + Ok::<(), azure_core::Error>(()) + } + .await; + + // Clean up by deleting the queue - this always executes + queue_service_client + .delete_queue("test-service-create-queue", None) + .await + .unwrap(); + + test_result?; + + Ok(()) +} + +/// Tests the deletion of a queue in Azure Storage Queue service. +#[recorded::test] +async fn test_delete_queue(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_service_client = get_queue_service_client(recording).await?; + + let queue_name = "test-service-delete-queue"; + + queue_service_client.create_queue(queue_name, None).await?; + + let response = queue_service_client.delete_queue(queue_name, None).await?; + + assert!( + response.status() == 204, + "Expected status code 204, got {}", + response.status(), + ); + Ok(()) +} + +/// Retrieves the properties of a storage account's Queue service. +#[recorded::test] +async fn test_get_queue_properties(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_service_client = get_queue_service_client(recording).await?; + + let response = queue_service_client.get_properties(None).await.unwrap(); + + assert!( + response.status() == 200, + "Expected status code 200, got {}", + response.status(), + ); + + Ok(()) +} + +/// Retrieves the properties of a storage account's Queue service. +#[recorded::test] +async fn test_set_queue_properties(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_service_client = get_queue_service_client(recording).await?; + + let properties = queue_service_client + .get_properties(None) + .await? + .into_body() + .await?; + let properties_xml = quick_xml::se::to_string(&properties).unwrap(); + let properties_bytes = properties_xml.into_bytes(); + + let response = queue_service_client + .set_properties(RequestContent::from(properties_bytes), None) + .await + .unwrap(); + + assert!( + response.status() == 202, + "Expected status code 202, got {}", + response.status(), + ); + + Ok(()) +} + +/// Lists all queues in the storage account, ensuring that at least one queue is present. +#[recorded::test] +pub async fn test_list_queues(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_service_client = get_queue_service_client(recording).await?; + + // Create a queue to ensure we have at least one queue to list + let queue_name = "test-service-list-queues"; + queue_service_client.create_queue(queue_name, None).await?; + + let options = QueueServiceClientListQueuesOptions { + maxresults: Some(1), + ..Default::default() + }; + + let mut page_iterator = queue_service_client.list_queues(Some(options))?; + let mut all_queue_names = Vec::new(); + + // Iterate through all pages + while let Some(page) = page_iterator.next().await { + let response = page?; + let queue_list = response.into_body().await?; + + //Collect queue names from this page + for queue_item in &queue_list.queue_items { + if let Some(queue_name_found) = &queue_item.name { + all_queue_names.push(queue_name_found.clone()); + } + } + } + + // Assert that our test queue is in the list + assert!( + all_queue_names.contains(&queue_name.to_string()), + "Expected queue '{}' to be found in the list of queues: {:?}", + queue_name, + all_queue_names + ); + + // Clean up by deleting the created queue + queue_service_client.delete_queue(queue_name, None).await?; + + Ok(()) +} + +/// Gets statistics for the Queue service, ensuring that the service is available and returns a successful response. +#[recorded::test] +pub async fn test_get_queue_statistics(ctx: TestContext) -> Result<()> { + let recording = ctx.recording(); + let queue_service_client = get_queue_service_client_secondary(recording).await?; + + let response = queue_service_client.get_statistics(None).await?; + assert!( + response.status() == 200, + "Expected status code 200, got {}", + response.status(), + ); + let stats = response.into_body().await?; + let geo_replication = stats.geo_replication.as_ref().unwrap(); + assert!( + geo_replication.status.as_ref().unwrap() == &GeoReplicationStatusType::Live, + "Geo-replication status should be Live" + ); + // assert that last_sync_time is greater than Fri, 1 Jun 2025 00:00:00 GMT + assert!( + geo_replication.last_sync_time.unwrap() + > time::OffsetDateTime::from_unix_timestamp(1748728800).unwrap(), + "Last sync time should be after 2025-06-01T00:00:00Z" + ); + + Ok(()) +} + +/// Returns an instance of a QueueServiceClient. +/// +/// # Arguments +/// +/// * `recording` - A reference to a Recording instance. +pub async fn get_queue_service_client(recording: &Recording) -> Result { + let (options, endpoint, _) = recorded_test_setup(recording); + let queue_client_options = QueueServiceClientOptions { + client_options: options.clone(), + ..Default::default() + }; + let queue_client = QueueServiceClient::new( + &endpoint, + recording.credential(), + Option::Some(queue_client_options), + )?; + + Ok(queue_client) +} + +/// Returns an instance of a QueueServiceClient on the secondary endpoint. +/// +/// # Arguments +/// +/// * `recording` - A reference to a Recording instance. +pub async fn get_queue_service_client_secondary( + recording: &Recording, +) -> Result { + let (options, _, endpoint) = recorded_test_setup(recording); + let queue_client_options = QueueServiceClientOptions { + client_options: options.clone(), + ..Default::default() + }; + let queue_client = QueueServiceClient::new( + &endpoint, + recording.credential(), + Option::Some(queue_client_options), + )?; + + Ok(queue_client) +} + +/// Takes in a Recording instance and returns an instrumented options bag and endpoint. +/// +/// # Arguments +/// +/// * `recording` - A reference to a Recording instance. +fn recorded_test_setup(recording: &Recording) -> (ClientOptions, String, String) { + let mut client_options = ClientOptions::default(); + recording.instrument(&mut client_options); + let endpoint = format!( + "https://{}.queue.core.windows.net/", + recording + .var("AZURE_QUEUE_STORAGE_ACCOUNT_NAME", None) + .as_str() + ); + let secondary_endpoint = format!( + "https://{}-secondary.queue.core.windows.net/", + recording + .var("AZURE_QUEUE_STORAGE_ACCOUNT_NAME", None) + .as_str() + ); + + (client_options, endpoint, secondary_endpoint) +} + +/// Helper function to verify a successful response +fn assert_successful_response(response: &Response) { + assert!( + response.status().is_success(), + "Expected successful status code, got {}", + response.status() + ); +} diff --git a/sdk/storage/azure_storage_queue/tsp-location.yaml b/sdk/storage/azure_storage_queue/tsp-location.yaml new file mode 100755 index 0000000000..4d20a16050 --- /dev/null +++ b/sdk/storage/azure_storage_queue/tsp-location.yaml @@ -0,0 +1,4 @@ +directory: specification/storage/Microsoft.QueueStorage +commit: 7d60037de342e7897ca3248aa98a39d3036ee214 +repo: Azure/azure-rest-api-specs +additionalDirectories: