Skip to content

Commit 433dd83

Browse files
bmc-msftdemoray
andauthored
Address feedback from Storage Queue pipeline pr (#858)
* instead of passing around Bytes, use u8 slice * fix clippy * address feedback from Pipeline PR * remove message_id and pop_receipt direct exposure * address clippy Co-authored-by: Brian Caswell <[email protected]>
1 parent d4a27ab commit 433dd83

File tree

4 files changed

+98
-55
lines changed

4 files changed

+98
-55
lines changed

sdk/storage_queues/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ edition = "2021"
1515
[dependencies]
1616
azure_core = { path = "../core", version = "0.3", default-features=false }
1717
azure_storage = { path = "../storage", version = "0.4", default-features=false, features=["account"] }
18-
bytes = "1.0"
1918
chrono = { version = "0.4", features = ["serde"] }
2019
futures = "0.3"
2120
http = "0.2"

sdk/storage_queues/src/clients/pop_receipt_client.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,9 @@ impl PopReceiptClient {
6464
Ok(url)
6565
}
6666

67-
/// Deletes the message. The message must not have been made visible again
68-
/// or this call would fail.
69-
pub fn delete(&self) -> DeleteMessageBuilder {
70-
DeleteMessageBuilder::new(self.clone())
71-
}
72-
73-
/// Updates the message. The message must not have been made visible again
67+
/// Updates the message.
68+
///
69+
/// The message must not have been made visible again
7470
/// or this call would fail.
7571
pub fn update(
7672
&self,
@@ -79,4 +75,12 @@ impl PopReceiptClient {
7975
) -> UpdateMessageBuilder {
8076
UpdateMessageBuilder::new(self.clone(), body.into(), visibility_timeout.into())
8177
}
78+
79+
/// Deletes the message.
80+
///
81+
/// The message must not have been made visible again
82+
/// or this call would fail.
83+
pub fn delete(&self) -> DeleteMessageBuilder {
84+
DeleteMessageBuilder::new(self.clone())
85+
}
8286
}

sdk/storage_queues/src/clients/queue_client.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,18 @@ impl QueueClient {
9494
GetQueueMetadataBuilder::new(self.clone())
9595
}
9696

97-
/// Get the queue ACL. This call returns
98-
/// all the stored access policies associated
99-
/// to the current queue.
97+
/// Get the queue ACL.
98+
///
99+
/// This call returns all the stored access policies associated to the
100+
/// current queue.
100101
pub fn get_acl(&self) -> GetQueueACLBuilder {
101102
GetQueueACLBuilder::new(self.clone())
102103
}
103104

104-
/// Set the queue ACL. You can call this function to change or remove
105-
/// already existing stored access policies by modifying the list returned
106-
/// by `get_acl`.
105+
/// Set the queue ACL.
106+
///
107+
/// You can call this function to change or remove already existing stored
108+
/// access policies by modifying the list returned by `get_acl`.
107109
///
108110
/// While this SDK does not enforce any limit, keep in mind Azure supports a
109111
/// limited number of stored access policies for each queue. More info here
@@ -112,8 +114,10 @@ impl QueueClient {
112114
SetQueueACLBuilder::new(self.clone(), policies)
113115
}
114116

115-
/// Puts a message in the queue. The body will be passed
116-
/// to the `execute` function of the returned struct.
117+
/// Puts a message in the queue.
118+
///
119+
/// The body will be passed to the `execute` function of the returned
120+
/// struct.
117121
pub fn put_message<S: Into<String>>(&self, message: S) -> PutMessageBuilder {
118122
PutMessageBuilder::new(self.clone(), message.into())
119123
}

sdk/storage_queues/src/operations/get_messages.rs

Lines changed: 75 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ use crate::{clients::QueueClient, prelude::*, PopReceipt};
22
use azure_core::{
33
collect_pinned_stream,
44
error::{ErrorKind, ResultExt},
5-
headers::utc_date_from_rfc2822,
65
prelude::*,
76
Context, Response as AzureResponse,
87
};
98
use azure_storage::core::{headers::CommonStorageResponseHeaders, xml::read_xml};
109
use chrono::{DateTime, Utc};
10+
use serde::Deserialize;
1111
use std::convert::TryInto;
1212

1313
#[derive(Debug, Clone)]
@@ -25,7 +25,6 @@ impl GetMessagesBuilder {
2525
queue_client,
2626
number_of_messages: None,
2727
visibility_timeout: None,
28-
2928
timeout: None,
3029
context: Context::new(),
3130
}
@@ -78,68 +77,105 @@ pub struct GetMessagesResponse {
7877
pub messages: Vec<Message>,
7978
}
8079

81-
#[derive(Debug, Clone)]
80+
#[derive(Debug, Clone, Deserialize)]
8281
pub struct Message {
83-
pub pop_receipt: PopReceipt,
82+
#[serde(rename = "MessageId")]
83+
message_id: String,
84+
#[serde(rename = "PopReceipt")]
85+
pop_receipt: String,
86+
#[serde(rename = "InsertionTime", deserialize_with = "deserialize_utc")]
8487
pub insertion_time: DateTime<Utc>,
88+
#[serde(rename = "ExpirationTime", deserialize_with = "deserialize_utc")]
8589
pub expiration_time: DateTime<Utc>,
90+
#[serde(rename = "TimeNextVisible", deserialize_with = "deserialize_utc")]
8691
pub time_next_visible: DateTime<Utc>,
92+
#[serde(rename = "DequeueCount")]
8793
pub dequeue_count: u64,
94+
#[serde(rename = "MessageText")]
8895
pub message_text: String,
8996
}
9097

91-
impl From<Message> for PopReceipt {
92-
fn from(message: Message) -> Self {
93-
message.pop_receipt
98+
impl Message {
99+
pub fn pop_receipt(&self) -> PopReceipt {
100+
PopReceipt::new(self.message_id.clone(), self.pop_receipt.clone())
94101
}
95102
}
96103

97-
#[derive(Debug, Clone, Serialize, Deserialize)]
98-
struct MessageInternal {
99-
#[serde(rename = "MessageId")]
100-
pub message_id: String,
101-
#[serde(rename = "InsertionTime")]
102-
pub insertion_time: String,
103-
#[serde(rename = "ExpirationTime")]
104-
pub expiration_time: String,
105-
#[serde(rename = "PopReceipt")]
106-
pub pop_receipt: String,
107-
#[serde(rename = "TimeNextVisible")]
108-
pub time_next_visible: String,
109-
#[serde(rename = "DequeueCount")]
110-
pub dequeue_count: u64,
111-
#[serde(rename = "MessageText")]
112-
pub message_text: String,
104+
impl From<Message> for PopReceipt {
105+
fn from(message: Message) -> Self {
106+
PopReceipt::new(message.message_id, message.pop_receipt)
107+
}
113108
}
114109

115-
#[derive(Debug, Clone, Serialize, Deserialize)]
116-
struct MessagesInternal {
110+
#[derive(Debug, Clone, Deserialize)]
111+
struct MessageList {
117112
#[serde(rename = "QueueMessage")]
118-
pub messages: Option<Vec<MessageInternal>>,
113+
pub messages: Option<Vec<Message>>,
119114
}
120115

121116
impl GetMessagesResponse {
117+
fn parse_messages(body: &[u8]) -> azure_core::Result<Vec<Message>> {
118+
let response: MessageList = read_xml(body).map_kind(ErrorKind::DataConversion)?;
119+
Ok(response.messages.unwrap_or_default())
120+
}
121+
122122
async fn try_from(response: AzureResponse) -> azure_core::Result<Self> {
123123
let (_, headers, body) = response.deconstruct();
124124
let body = collect_pinned_stream(body).await?;
125125

126-
let response: MessagesInternal = read_xml(&body).map_kind(ErrorKind::DataConversion)?;
127-
128-
let mut messages = Vec::new();
129-
for message in response.messages.unwrap_or_default().into_iter() {
130-
messages.push(Message {
131-
pop_receipt: PopReceipt::new(message.message_id, message.pop_receipt),
132-
insertion_time: utc_date_from_rfc2822(&message.insertion_time)?,
133-
expiration_time: utc_date_from_rfc2822(&message.expiration_time)?,
134-
time_next_visible: utc_date_from_rfc2822(&message.time_next_visible)?,
135-
dequeue_count: message.dequeue_count,
136-
message_text: message.message_text,
137-
})
138-
}
126+
let messages = Self::parse_messages(&body)?;
139127

140128
Ok(GetMessagesResponse {
141129
common_storage_response_headers: (&headers).try_into()?,
142130
messages,
143131
})
144132
}
145133
}
134+
135+
fn deserialize_utc<'de, D>(deserializer: D) -> std::result::Result<DateTime<Utc>, D::Error>
136+
where
137+
D: serde::Deserializer<'de>,
138+
{
139+
let s = String::deserialize(deserializer)?;
140+
let date = DateTime::parse_from_rfc2822(&s).map_err(serde::de::Error::custom)?;
141+
Ok(DateTime::from_utc(date.naive_utc(), Utc))
142+
}
143+
144+
#[cfg(test)]
145+
mod tests {
146+
use super::*;
147+
148+
#[test]
149+
fn test_parse_messages() -> azure_core::Result<()> {
150+
let message = b"\xef\xbb\xbf\
151+
<?xml version=\"1.0\" encoding=\"utf-8\"?>\
152+
<QueueMessagesList><QueueMessage>\
153+
<MessageId>00000000-0000-0000-0000-000000000000</MessageId>\
154+
<InsertionTime>Mon, 27 Jun 2022 13:38:48 GMT</InsertionTime>\
155+
<ExpirationTime>Mon, 04 Jul 2022 13:38:48 GMT</ExpirationTime>\
156+
<PopReceipt>REDACTED1</PopReceipt>\
157+
<TimeNextVisible>Mon, 27 Jun 2022 13:38:53 GMT</TimeNextVisible>\
158+
<DequeueCount>1</DequeueCount>\
159+
<MessageText>test1</MessageText>\
160+
</QueueMessage>\
161+
<QueueMessage>\
162+
<MessageId>11111111-1111-1111-1111-111111111111</MessageId>\
163+
<InsertionTime>Mon, 27 Jun 2022 13:38:48 GMT</InsertionTime>\
164+
<ExpirationTime>Mon, 04 Jul 2022 13:38:48 GMT</ExpirationTime>\
165+
<PopReceipt>REDACTED2</PopReceipt>\
166+
<TimeNextVisible>Mon, 27 Jun 2022 13:38:53 GMT</TimeNextVisible>\
167+
<DequeueCount>1</DequeueCount>\
168+
<MessageText>test2</MessageText>\
169+
</QueueMessage>\
170+
</QueueMessagesList>\
171+
";
172+
173+
let messages = GetMessagesResponse::parse_messages(message)?;
174+
175+
assert_eq!(messages.len(), 2);
176+
assert_eq!(messages[0].message_text, "test1");
177+
assert_eq!(messages[1].message_text, "test2");
178+
179+
Ok(())
180+
}
181+
}

0 commit comments

Comments
 (0)