Skip to content

Commit 6a95acf

Browse files
authored
Add response headers to PeekLockResponse. (#1282)
1 parent abab2ba commit 6a95acf

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

sdk/messaging_servicebus/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@ edition = "2021"
1616

1717
[dependencies]
1818
azure_core = { path = "../core", version = "0.12" }
19-
time = "0.3.10"
19+
time = { version = "0.3.10", features = ["serde-well-known"] }
2020
log = "0.4"
2121
url = "2.2"
2222
hmac = "0.12"
2323
sha2 = "0.10"
2424
ring = "0.16"
2525
bytes = "1.0"
26-
26+
serde = "1.0"
27+
serde_json = "1.0"
2728
[dev-dependencies]
2829
futures = "0.3"
2930
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }

sdk/messaging_servicebus/src/service_bus/mod.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use azure_core::{
22
base64, error::Error, headers, CollectedResponse, HttpClient, Method, Request, StatusCode, Url,
33
};
44
use ring::hmac;
5+
use serde::Deserialize;
6+
use std::str::FromStr;
57
use std::time::Duration;
68
use std::{ops::Add, sync::Arc};
79
use time::OffsetDateTime;
@@ -168,14 +170,19 @@ async fn peek_lock_message2(
168170
let res = http_client.execute_request(&req).await?;
169171

170172
let status = res.status();
171-
let lock_location = res
173+
let headers = res.headers().clone();
174+
let broker_properties = res
172175
.headers()
176+
.get_optional_as(&headers::HeaderName::from("brokerproperties"))?;
177+
let lock_location = headers
173178
.get_optional_string(&headers::LOCATION)
174179
.unwrap_or_default();
175180
let body = res.into_body().collect_string().await?;
176181

177182
Ok(PeekLockResponse {
178183
body,
184+
headers,
185+
broker_properties,
179186
lock_location,
180187
status,
181188
http_client: http_client.clone(),
@@ -187,6 +194,8 @@ async fn peek_lock_message2(
187194
/// `PeekLockResponse` object that is returned by `peek_lock_message2`
188195
pub struct PeekLockResponse {
189196
body: String,
197+
headers: headers::Headers,
198+
broker_properties: Option<BrokerProperties>,
190199
lock_location: String,
191200
status: StatusCode,
192201
http_client: Arc<dyn HttpClient>,
@@ -200,6 +209,17 @@ impl PeekLockResponse {
200209
self.body.clone()
201210
}
202211

212+
/// Get the broker properties from the message in the lock
213+
#[must_use]
214+
pub fn broker_properties(&self) -> Option<BrokerProperties> {
215+
self.broker_properties.clone()
216+
}
217+
218+
/// Get custom message headers from the message in the lock
219+
pub fn custom_properties<T: TryFrom<headers::Headers>>(&self) -> Result<T, T::Error> {
220+
self.headers.clone().try_into()
221+
}
222+
203223
/// Get the status of the peek
204224
pub fn status(&self) -> &StatusCode {
205225
&self.status
@@ -255,3 +275,37 @@ impl PeekLockResponse {
255275
Ok(())
256276
}
257277
}
278+
279+
/// `BrokerProperties` object decoded from the message headers
280+
#[derive(Clone, Debug, Deserialize)]
281+
#[serde(rename_all = "PascalCase")]
282+
pub struct BrokerProperties {
283+
pub delivery_count: i32,
284+
pub enqueued_sequence_number: Option<i32>,
285+
#[serde(deserialize_with = "BrokerProperties::option_rfc2822")]
286+
pub enqueued_time_utc: Option<OffsetDateTime>,
287+
pub lock_token: String,
288+
#[serde(with = "time::serde::rfc2822")]
289+
pub locked_until_utc: OffsetDateTime,
290+
pub message_id: String,
291+
pub sequence_number: i32,
292+
pub state: String,
293+
pub time_to_live: i64,
294+
}
295+
296+
impl BrokerProperties {
297+
fn option_rfc2822<'de, D>(value: D) -> Result<Option<OffsetDateTime>, D::Error>
298+
where
299+
D: serde::Deserializer<'de>,
300+
{
301+
Ok(Some(time::serde::rfc2822::deserialize(value)?))
302+
}
303+
}
304+
305+
impl FromStr for BrokerProperties {
306+
type Err = serde_json::Error;
307+
308+
fn from_str(s: &str) -> Result<Self, Self::Err> {
309+
serde_json::from_str(s)
310+
}
311+
}

0 commit comments

Comments
 (0)