Skip to content

Commit 6409e82

Browse files
authored
Adding BrokerProperties when sending message to queue or topic (#1693)
* Adding BrokerProperties when sending message to queue or topic * PR feedback: BrokerProperties to SendMessageOptions
1 parent 2f22149 commit 6409e82

File tree

9 files changed

+169
-22
lines changed

9 files changed

+169
-22
lines changed

sdk/messaging_servicebus/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ tracing = "0.1.40"
2121
url = "2.2"
2222
bytes = "1.0"
2323
serde = "1.0"
24+
serde_json = "1.0"
2425

2526
[dev-dependencies]
2627
futures = "0.3"

sdk/messaging_servicebus/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async fn main() -> azure_core::Result<()> {
2323
policy_key,
2424
)?;
2525
26-
client.send_message("hello world").await?;
26+
client.send_message("hello world", None).await?;
2727
2828
let received_message = client.receive_and_delete_message().await?;
2929
println!("Received Message: {}", received_message);

sdk/messaging_servicebus/examples/service_bus00.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ async fn main() {
2828
let message_to_send = "hello, world!";
2929

3030
client
31-
.send_message(message_to_send)
31+
.send_message(message_to_send, None)
3232
.await
3333
.expect("Failed to send message while testing receive");
3434

sdk/messaging_servicebus/examples/service_bus01.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async fn main() {
3434
let message_to_send = "hello, world!";
3535

3636
sender
37-
.send_message(message_to_send)
37+
.send_message(message_to_send, None)
3838
.await
3939
.expect("Failed to send message while testing receive");
4040

sdk/messaging_servicebus/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ async fn main() -> azure_core::Result<()> {
2222
policy_key,
2323
)?;
2424
25-
client.send_message("hello world").await?;
25+
client.send_message("hello world", None).await?;
2626
2727
let received_message = client.receive_and_delete_message().await?;
2828
println!("Received Message: {}", received_message);

sdk/messaging_servicebus/src/service_bus/mod.rs

Lines changed: 108 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@ pub use self::{
77
};
88
use crate::utils::{craft_peek_lock_url, get_head_url};
99
use azure_core::{
10-
auth::Secret, error::Error, from_json, headers, hmac::hmac_sha256, CollectedResponse,
11-
HttpClient, Method, Request, StatusCode, Url,
10+
auth::Secret,
11+
error::Error,
12+
from_json,
13+
headers::{self, HeaderName, HeaderValue, CONTENT_TYPE},
14+
hmac::hmac_sha256,
15+
CollectedResponse, HttpClient, Method, Request, StatusCode, Url,
1216
};
13-
use serde::Deserialize;
17+
use serde::{Deserialize, Serialize};
1418
use std::{
1519
str::FromStr,
1620
time::Duration,
@@ -21,6 +25,7 @@ use url::form_urlencoded::{self, Serializer};
2125

2226
/// Default duration for the SAS token in days — We might want to make this configurable at some point
2327
const DEFAULT_SAS_DURATION: u64 = 3_600; // seconds = 1 hour
28+
const BROKER_PROPERTIES: HeaderName = HeaderName::from_static("brokerproperties");
2429

2530
/// Prepares an HTTP request
2631
fn finalize_request(
@@ -89,17 +94,20 @@ async fn send_message(
8994
policy_name: &str,
9095
signing_key: &Secret,
9196
msg: &str,
97+
send_message_options: Option<SendMessageOptions>,
9298
) -> azure_core::Result<()> {
9399
let url = format!("https://{namespace}.servicebus.windows.net/{queue_or_topic}/messages");
94100

95-
let req = finalize_request(
101+
let mut req = finalize_request(
96102
&url,
97103
Method::Post,
98104
Some(msg.to_string()),
99105
policy_name,
100106
signing_key,
101107
)?;
102108

109+
req.insert_headers(&send_message_options);
110+
103111
http_client
104112
.as_ref()
105113
.execute_request_check_status(&req)
@@ -284,7 +292,7 @@ impl PeekLockResponse {
284292
pub struct BrokerProperties {
285293
pub delivery_count: i32,
286294
pub enqueued_sequence_number: Option<i32>,
287-
#[serde(deserialize_with = "BrokerProperties::option_rfc2822")]
295+
#[serde(with = "time::serde::rfc2822::option")]
288296
pub enqueued_time_utc: Option<OffsetDateTime>,
289297
pub lock_token: String,
290298
#[serde(with = "time::serde::rfc2822")]
@@ -295,19 +303,105 @@ pub struct BrokerProperties {
295303
pub time_to_live: f64,
296304
}
297305

298-
impl BrokerProperties {
299-
fn option_rfc2822<'de, D>(value: D) -> Result<Option<OffsetDateTime>, D::Error>
300-
where
301-
D: serde::Deserializer<'de>,
302-
{
303-
Ok(Some(time::serde::rfc2822::deserialize(value)?))
304-
}
305-
}
306-
307306
impl FromStr for BrokerProperties {
308307
type Err = azure_core::Error;
309308

310309
fn from_str(s: &str) -> Result<Self, Self::Err> {
311310
from_json(s)
312311
}
313312
}
313+
314+
#[derive(Debug, Default)]
315+
pub struct SendMessageOptions {
316+
pub content_type: Option<String>,
317+
pub broker_properties: Option<SettableBrokerProperties>,
318+
}
319+
320+
impl headers::AsHeaders for SendMessageOptions {
321+
type Iter = std::vec::IntoIter<(HeaderName, HeaderValue)>;
322+
323+
fn as_headers(&self) -> Self::Iter {
324+
let mut headers: Vec<(HeaderName, HeaderValue)> = vec![];
325+
326+
if let Some(content_type) = &self.content_type {
327+
headers.push((CONTENT_TYPE, content_type.into()));
328+
}
329+
330+
if let Some(broker_properties) = &self.broker_properties {
331+
headers.push((
332+
BROKER_PROPERTIES,
333+
serde_json::to_string(broker_properties).unwrap().into(),
334+
));
335+
}
336+
337+
headers.into_iter()
338+
}
339+
}
340+
341+
#[derive(Clone, Debug, Serialize, Default)]
342+
#[serde(rename_all = "PascalCase")]
343+
pub struct SettableBrokerProperties {
344+
#[serde(skip_serializing_if = "Option::is_none")]
345+
pub correlation_id: Option<String>,
346+
347+
#[serde(skip_serializing_if = "Option::is_none")]
348+
pub session_id: Option<String>,
349+
350+
#[serde(skip_serializing_if = "Option::is_none")]
351+
pub message_id: Option<String>,
352+
353+
#[serde(skip_serializing_if = "Option::is_none")]
354+
pub label: Option<String>,
355+
356+
#[serde(skip_serializing_if = "Option::is_none")]
357+
pub reply_to: Option<String>,
358+
359+
#[serde(
360+
skip_serializing_if = "Option::is_none",
361+
serialize_with = "duration_to_seconds_f64"
362+
)]
363+
pub time_to_live: Option<Duration>,
364+
365+
#[serde(skip_serializing_if = "Option::is_none")]
366+
pub to: Option<String>,
367+
368+
#[serde(
369+
with = "time::serde::rfc2822::option",
370+
skip_serializing_if = "Option::is_none"
371+
)]
372+
pub scheduled_enqueue_time_utc: Option<OffsetDateTime>,
373+
374+
#[serde(skip_serializing_if = "Option::is_none")]
375+
pub reply_to_session_id: Option<String>,
376+
377+
#[serde(skip_serializing_if = "Option::is_none")]
378+
pub partition_key: Option<String>,
379+
}
380+
381+
fn duration_to_seconds_f64<S>(duration: &Option<Duration>, serializer: S) -> Result<S::Ok, S::Error>
382+
where
383+
S: serde::Serializer,
384+
{
385+
if let Some(duration) = duration {
386+
serializer.serialize_f64(duration.as_secs_f64())
387+
} else {
388+
serializer.serialize_none()
389+
}
390+
}
391+
392+
#[cfg(test)]
393+
mod tests {
394+
use std::time::Duration;
395+
396+
use crate::service_bus::SettableBrokerProperties;
397+
398+
#[test]
399+
fn test_duration_serialize() {
400+
let dur = SettableBrokerProperties {
401+
time_to_live: Some(Duration::from_millis(4444)),
402+
..Default::default()
403+
};
404+
let dur_str = serde_json::to_string(&dur).unwrap();
405+
assert_eq!(dur_str, r#"{"TimeToLive":4.444}"#);
406+
}
407+
}

sdk/messaging_servicebus/src/service_bus/queue_client.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use std::time::Duration;
1111

1212
use azure_core::{auth::Secret, error::Error, HttpClient};
1313

14+
use super::SendMessageOptions;
15+
1416
/// Client object that allows interaction with the `ServiceBus` API
1517
#[derive(Debug, Clone)]
1618
pub struct QueueClient {
@@ -49,14 +51,19 @@ impl QueueClient {
4951
}
5052

5153
/// Sends a message to the queue
52-
pub async fn send_message(&self, msg: &str) -> Result<(), Error> {
54+
pub async fn send_message(
55+
&self,
56+
msg: &str,
57+
send_message_options: Option<SendMessageOptions>,
58+
) -> Result<(), Error> {
5359
send_message(
5460
&self.http_client,
5561
&self.namespace,
5662
&self.queue,
5763
&self.policy_name,
5864
&self.signing_key,
5965
msg,
66+
send_message_options,
6067
)
6168
.await
6269
}

sdk/messaging_servicebus/src/service_bus/topic_client.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use std::time::Duration;
1111

1212
use azure_core::{auth::Secret, error::Error, HttpClient};
1313

14+
use super::SendMessageOptions;
15+
1416
/// Client object that allows interaction with the `ServiceBus` API
1517
#[derive(Debug, Clone)]
1618
pub struct TopicClient {
@@ -73,14 +75,19 @@ impl TopicSender {
7375
Self { topic_client }
7476
}
7577
/// Sends a message to the topic
76-
pub async fn send_message(&self, msg: &str) -> Result<(), Error> {
78+
pub async fn send_message(
79+
&self,
80+
msg: &str,
81+
send_message_options: Option<SendMessageOptions>,
82+
) -> Result<(), Error> {
7783
send_message(
7884
&self.topic_client.http_client,
7985
&self.topic_client.namespace,
8086
&self.topic_client.topic,
8187
&self.topic_client.policy_name,
8288
&self.topic_client.signing_key,
8389
msg,
90+
send_message_options,
8491
)
8592
.await
8693
}

sdk/messaging_servicebus/tests/service_bus.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,51 @@
11
#![cfg(all(test, feature = "test_e2e"))] // to run this, do: `cargo test --features test_e2e`
22

3-
use azure_messaging_servicebus::service_bus::QueueClient;
3+
use azure_messaging_servicebus::service_bus::{
4+
QueueClient, SendMessageOptions, SettableBrokerProperties,
5+
};
46
use std::time::Duration;
7+
use time::OffsetDateTime;
58

69
#[tokio::test]
710
async fn send_message_test() {
811
let client = create_client().unwrap();
912
client
10-
.send_message("hello, world!")
13+
.send_message("hello, world!", None)
14+
.await
15+
.expect("Failed to send message");
16+
}
17+
18+
#[tokio::test]
19+
async fn send_message_delayed_test() {
20+
let client = create_client().unwrap();
21+
client
22+
.send_message(
23+
"hello, world!",
24+
Some(SendMessageOptions {
25+
broker_properties: Some(SettableBrokerProperties {
26+
scheduled_enqueue_time_utc: Some(
27+
OffsetDateTime::now_utc() + Duration::from_secs(60),
28+
),
29+
..Default::default()
30+
}),
31+
..Default::default()
32+
}),
33+
)
34+
.await
35+
.expect("Failed to send message");
36+
}
37+
38+
#[tokio::test]
39+
async fn send_message_with_content_type() {
40+
let client = create_client().unwrap();
41+
client
42+
.send_message(
43+
r#"{"message": "content"}"#,
44+
Some(SendMessageOptions {
45+
content_type: Some("application/json".into()),
46+
..Default::default()
47+
}),
48+
)
1149
.await
1250
.expect("Failed to send message");
1351
}

0 commit comments

Comments
 (0)