@@ -7,10 +7,14 @@ pub use self::{
77} ;
88use crate :: utils:: { craft_peek_lock_url, get_head_url} ;
99use azure_core:: {
10- auth:: Secret , error:: Error , from_json, headers, hmac:: hmac_sha256, CollectedResponse ,
11- HttpClient , Method , Request , StatusCode , Url ,
10+ auth:: Secret ,
11+ error:: Error ,
12+ from_json,
13+ headers:: { self , HeaderName , HeaderValue , CONTENT_TYPE } ,
14+ hmac:: hmac_sha256,
15+ CollectedResponse , HttpClient , Method , Request , StatusCode , Url ,
1216} ;
13- use serde:: Deserialize ;
17+ use serde:: { Deserialize , Serialize } ;
1418use std:: {
1519 str:: FromStr ,
1620 time:: Duration ,
@@ -21,6 +25,7 @@ use url::form_urlencoded::{self, Serializer};
2125
2226/// Default duration for the SAS token in days — We might want to make this configurable at some point
2327const DEFAULT_SAS_DURATION : u64 = 3_600 ; // seconds = 1 hour
28+ const BROKER_PROPERTIES : HeaderName = HeaderName :: from_static ( "brokerproperties" ) ;
2429
2530/// Prepares an HTTP request
2631fn finalize_request (
@@ -89,17 +94,20 @@ async fn send_message(
8994 policy_name : & str ,
9095 signing_key : & Secret ,
9196 msg : & str ,
97+ send_message_options : Option < SendMessageOptions > ,
9298) -> azure_core:: Result < ( ) > {
9399 let url = format ! ( "https://{namespace}.servicebus.windows.net/{queue_or_topic}/messages" ) ;
94100
95- let req = finalize_request (
101+ let mut req = finalize_request (
96102 & url,
97103 Method :: Post ,
98104 Some ( msg. to_string ( ) ) ,
99105 policy_name,
100106 signing_key,
101107 ) ?;
102108
109+ req. insert_headers ( & send_message_options) ;
110+
103111 http_client
104112 . as_ref ( )
105113 . execute_request_check_status ( & req)
@@ -284,7 +292,7 @@ impl PeekLockResponse {
284292pub struct BrokerProperties {
285293 pub delivery_count : i32 ,
286294 pub enqueued_sequence_number : Option < i32 > ,
287- #[ serde( deserialize_with = "BrokerProperties::option_rfc2822 " ) ]
295+ #[ serde( with = "time::serde::rfc2822::option " ) ]
288296 pub enqueued_time_utc : Option < OffsetDateTime > ,
289297 pub lock_token : String ,
290298 #[ serde( with = "time::serde::rfc2822" ) ]
@@ -295,19 +303,105 @@ pub struct BrokerProperties {
295303 pub time_to_live : f64 ,
296304}
297305
298- impl BrokerProperties {
299- fn option_rfc2822 < ' de , D > ( value : D ) -> Result < Option < OffsetDateTime > , D :: Error >
300- where
301- D : serde:: Deserializer < ' de > ,
302- {
303- Ok ( Some ( time:: serde:: rfc2822:: deserialize ( value) ?) )
304- }
305- }
306-
307306impl FromStr for BrokerProperties {
308307 type Err = azure_core:: Error ;
309308
310309 fn from_str ( s : & str ) -> Result < Self , Self :: Err > {
311310 from_json ( s)
312311 }
313312}
313+
314+ #[ derive( Debug , Default ) ]
315+ pub struct SendMessageOptions {
316+ pub content_type : Option < String > ,
317+ pub broker_properties : Option < SettableBrokerProperties > ,
318+ }
319+
320+ impl headers:: AsHeaders for SendMessageOptions {
321+ type Iter = std:: vec:: IntoIter < ( HeaderName , HeaderValue ) > ;
322+
323+ fn as_headers ( & self ) -> Self :: Iter {
324+ let mut headers: Vec < ( HeaderName , HeaderValue ) > = vec ! [ ] ;
325+
326+ if let Some ( content_type) = & self . content_type {
327+ headers. push ( ( CONTENT_TYPE , content_type. into ( ) ) ) ;
328+ }
329+
330+ if let Some ( broker_properties) = & self . broker_properties {
331+ headers. push ( (
332+ BROKER_PROPERTIES ,
333+ serde_json:: to_string ( broker_properties) . unwrap ( ) . into ( ) ,
334+ ) ) ;
335+ }
336+
337+ headers. into_iter ( )
338+ }
339+ }
340+
341+ #[ derive( Clone , Debug , Serialize , Default ) ]
342+ #[ serde( rename_all = "PascalCase" ) ]
343+ pub struct SettableBrokerProperties {
344+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
345+ pub correlation_id : Option < String > ,
346+
347+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
348+ pub session_id : Option < String > ,
349+
350+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
351+ pub message_id : Option < String > ,
352+
353+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
354+ pub label : Option < String > ,
355+
356+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
357+ pub reply_to : Option < String > ,
358+
359+ #[ serde(
360+ skip_serializing_if = "Option::is_none" ,
361+ serialize_with = "duration_to_seconds_f64"
362+ ) ]
363+ pub time_to_live : Option < Duration > ,
364+
365+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
366+ pub to : Option < String > ,
367+
368+ #[ serde(
369+ with = "time::serde::rfc2822::option" ,
370+ skip_serializing_if = "Option::is_none"
371+ ) ]
372+ pub scheduled_enqueue_time_utc : Option < OffsetDateTime > ,
373+
374+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
375+ pub reply_to_session_id : Option < String > ,
376+
377+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
378+ pub partition_key : Option < String > ,
379+ }
380+
381+ fn duration_to_seconds_f64 < S > ( duration : & Option < Duration > , serializer : S ) -> Result < S :: Ok , S :: Error >
382+ where
383+ S : serde:: Serializer ,
384+ {
385+ if let Some ( duration) = duration {
386+ serializer. serialize_f64 ( duration. as_secs_f64 ( ) )
387+ } else {
388+ serializer. serialize_none ( )
389+ }
390+ }
391+
392+ #[ cfg( test) ]
393+ mod tests {
394+ use std:: time:: Duration ;
395+
396+ use crate :: service_bus:: SettableBrokerProperties ;
397+
398+ #[ test]
399+ fn test_duration_serialize ( ) {
400+ let dur = SettableBrokerProperties {
401+ time_to_live : Some ( Duration :: from_millis ( 4444 ) ) ,
402+ ..Default :: default ( )
403+ } ;
404+ let dur_str = serde_json:: to_string ( & dur) . unwrap ( ) ;
405+ assert_eq ! ( dur_str, r#"{"TimeToLive":4.444}"# ) ;
406+ }
407+ }
0 commit comments