Skip to content

Commit d688a1a

Browse files
Updated AMQP builders to match Rust SDK guidelines (Azure#1857)
* Updated AMQP builders to match Rust SDK guidelines * PR feedback
1 parent e6b21a0 commit d688a1a

File tree

9 files changed

+456
-367
lines changed

9 files changed

+456
-367
lines changed

sdk/core/azure_core_amqp/src/fe2o3/messaging/message_fields.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -340,43 +340,54 @@ impl From<fe2o3_amqp_types::messaging::Properties> for AmqpMessageProperties {
340340
let mut amqp_message_properties_builder = AmqpMessageProperties::builder();
341341

342342
if let Some(message_id) = properties.message_id {
343-
amqp_message_properties_builder.with_message_id(message_id);
343+
amqp_message_properties_builder =
344+
amqp_message_properties_builder.with_message_id(message_id);
344345
}
345346
if let Some(user_id) = properties.user_id {
346-
amqp_message_properties_builder.with_user_id(user_id.to_vec());
347+
amqp_message_properties_builder =
348+
amqp_message_properties_builder.with_user_id(user_id.to_vec());
347349
}
348350
if let Some(to) = properties.to {
349-
amqp_message_properties_builder.with_to(to);
351+
amqp_message_properties_builder = amqp_message_properties_builder.with_to(to);
350352
}
351353
if let Some(subject) = properties.subject {
352-
amqp_message_properties_builder.with_subject(subject);
354+
amqp_message_properties_builder = amqp_message_properties_builder.with_subject(subject);
353355
}
354356
if let Some(reply_to) = properties.reply_to {
355-
amqp_message_properties_builder.with_reply_to(reply_to);
357+
amqp_message_properties_builder =
358+
amqp_message_properties_builder.with_reply_to(reply_to);
356359
}
357360
if let Some(correlation_id) = properties.correlation_id {
358-
amqp_message_properties_builder.with_correlation_id(correlation_id);
361+
amqp_message_properties_builder =
362+
amqp_message_properties_builder.with_correlation_id(correlation_id);
359363
}
360364
if let Some(content_type) = properties.content_type {
361-
amqp_message_properties_builder.with_content_type(content_type);
365+
amqp_message_properties_builder =
366+
amqp_message_properties_builder.with_content_type(content_type);
362367
}
363368
if let Some(content_encoding) = properties.content_encoding {
364-
amqp_message_properties_builder.with_content_encoding(content_encoding);
369+
amqp_message_properties_builder =
370+
amqp_message_properties_builder.with_content_encoding(content_encoding);
365371
}
366372
if let Some(absolute_expiry_time) = properties.absolute_expiry_time {
367-
amqp_message_properties_builder.with_absolute_expiry_time(absolute_expiry_time);
373+
amqp_message_properties_builder =
374+
amqp_message_properties_builder.with_absolute_expiry_time(absolute_expiry_time);
368375
}
369376
if let Some(creation_time) = properties.creation_time {
370-
amqp_message_properties_builder.with_creation_time(creation_time);
377+
amqp_message_properties_builder =
378+
amqp_message_properties_builder.with_creation_time(creation_time);
371379
}
372380
if let Some(group_id) = properties.group_id {
373-
amqp_message_properties_builder.with_group_id(group_id);
381+
amqp_message_properties_builder =
382+
amqp_message_properties_builder.with_group_id(group_id);
374383
}
375384
if let Some(group_sequence) = properties.group_sequence {
376-
amqp_message_properties_builder.with_group_sequence(group_sequence);
385+
amqp_message_properties_builder =
386+
amqp_message_properties_builder.with_group_sequence(group_sequence);
377387
}
378388
if let Some(reply_to_group_id) = properties.reply_to_group_id {
379-
amqp_message_properties_builder.with_reply_to_group_id(reply_to_group_id);
389+
amqp_message_properties_builder =
390+
amqp_message_properties_builder.with_reply_to_group_id(reply_to_group_id);
380391
}
381392
amqp_message_properties_builder.build()
382393
}

sdk/core/azure_core_amqp/src/fe2o3/messaging/message_source.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ impl From<fe2o3_amqp_types::messaging::Source> for AmqpSource {
7777
let mut amqp_source_builder = AmqpSource::builder();
7878

7979
if let Some(address) = source.address {
80-
amqp_source_builder.with_address(address);
80+
amqp_source_builder = amqp_source_builder.with_address(address);
8181
}
82-
amqp_source_builder
82+
amqp_source_builder = amqp_source_builder
8383
.with_durable(source.durable.into())
8484
.with_expiry_policy(source.expiry_policy.into())
8585
.with_timeout(source.timeout)
@@ -92,26 +92,29 @@ impl From<fe2o3_amqp_types::messaging::Source> for AmqpSource {
9292
.map(|(k, v)| (k.into(), v.into()))
9393
.collect();
9494

95-
amqp_source_builder.with_dynamic_node_properties(dynamic_node_properties);
95+
amqp_source_builder =
96+
amqp_source_builder.with_dynamic_node_properties(dynamic_node_properties);
9697
}
9798
if let Some(distribution_mode) = source.distribution_mode {
98-
amqp_source_builder.with_distribution_mode(distribution_mode.into());
99+
amqp_source_builder =
100+
amqp_source_builder.with_distribution_mode(distribution_mode.into());
99101
}
100102
if let Some(filter) = source.filter {
101103
let filter: AmqpOrderedMap<AmqpSymbol, AmqpValue> = filter
102104
.into_iter()
103105
.map(|(k, v)| (k.into(), v.into()))
104106
.collect();
105-
amqp_source_builder.with_filter(filter);
107+
amqp_source_builder = amqp_source_builder.with_filter(filter);
106108
}
107109
if let Some(default_outcome) = source.default_outcome {
108-
amqp_source_builder.with_default_outcome(default_outcome.into());
110+
amqp_source_builder = amqp_source_builder.with_default_outcome(default_outcome.into());
109111
}
110112
if let Some(outcomes) = source.outcomes {
111-
amqp_source_builder.with_outcomes(outcomes.into_iter().map(|o| o.into()).collect());
113+
amqp_source_builder =
114+
amqp_source_builder.with_outcomes(outcomes.into_iter().map(|o| o.into()).collect());
112115
}
113116
if let Some(capabilities) = source.capabilities {
114-
amqp_source_builder
117+
amqp_source_builder = amqp_source_builder
115118
.with_capabilities(capabilities.into_iter().map(|c| c.into()).collect());
116119
}
117120
amqp_source_builder.build()

sdk/core/azure_core_amqp/src/fe2o3/messaging/mod.rs

Lines changed: 111 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,14 @@ where
7777
let mut amqp_message_builder = AmqpMessage::builder();
7878

7979
if let Some(application_properties) = message.application_properties {
80-
amqp_message_builder.with_application_properties(application_properties.into());
80+
amqp_message_builder =
81+
amqp_message_builder.with_application_properties(application_properties.into());
8182
}
8283

8384
let body = message.body;
8485
if body.is_empty() {
8586
let body = AmqpMessageBody::Empty;
86-
amqp_message_builder.with_body(body);
87+
amqp_message_builder = amqp_message_builder.with_body(body);
8788
} else if body.is_data() {
8889
let data = body.try_into_data().map_err(|_| {
8990
Error::message(
@@ -92,7 +93,7 @@ where
9293
)
9394
})?;
9495
let body = AmqpMessageBody::Binary(data.map(|x| x.to_vec()).collect());
95-
amqp_message_builder.with_body(body);
96+
amqp_message_builder = amqp_message_builder.with_body(body);
9697
} else if body.is_value() {
9798
let value = body.try_into_value().map_err(|_| {
9899
Error::message(
@@ -103,7 +104,7 @@ where
103104
// Because a conversion exists between fe2o3 values and AmqpValue types,
104105
// this try_into will always succeed.
105106
let value = value.try_into().unwrap();
106-
amqp_message_builder.with_body(AmqpMessageBody::Value(value));
107+
amqp_message_builder = amqp_message_builder.with_body(AmqpMessageBody::Value(value));
107108
} else if body.is_sequence() {
108109
let sequence = body.try_into_sequence().map_err(|_| {
109110
Error::message(
@@ -125,27 +126,29 @@ where
125126
})
126127
.collect(),
127128
);
128-
amqp_message_builder.with_body(body);
129+
amqp_message_builder = amqp_message_builder.with_body(body);
129130
}
130131

131132
if let Some(header) = message.header {
132-
amqp_message_builder.with_header(header.into());
133+
amqp_message_builder = amqp_message_builder.with_header(header.into());
133134
}
134135

135136
if let Some(properties) = message.properties {
136-
amqp_message_builder.with_properties(properties);
137+
amqp_message_builder = amqp_message_builder.with_properties(properties);
137138
}
138139

139140
if let Some(delivery_annotations) = message.delivery_annotations {
140-
amqp_message_builder.with_delivery_annotations(delivery_annotations.0.into());
141+
amqp_message_builder =
142+
amqp_message_builder.with_delivery_annotations(delivery_annotations.0.into());
141143
}
142144

143145
if let Some(message_annotations) = message.message_annotations {
144-
amqp_message_builder.with_message_annotations(message_annotations.0.into());
146+
amqp_message_builder =
147+
amqp_message_builder.with_message_annotations(message_annotations.0.into());
145148
}
146149

147150
if let Some(footer) = message.footer {
148-
amqp_message_builder.with_footer(footer.0.into());
151+
amqp_message_builder = amqp_message_builder.with_footer(footer.0.into());
149152
}
150153

151154
Ok(amqp_message_builder.build())
@@ -211,114 +214,134 @@ impl
211214
) -> Self {
212215
let mut amqp_message_builder = AmqpMessage::builder();
213216

214-
amqp_message_builder.with_body(AmqpMessageBody::Empty);
217+
amqp_message_builder = amqp_message_builder.with_body(AmqpMessageBody::Empty);
215218

216219
if let Some(application_properties) = message.application_properties {
217-
amqp_message_builder.with_application_properties(application_properties.into());
220+
amqp_message_builder =
221+
amqp_message_builder.with_application_properties(application_properties.into());
218222
}
219223

220224
if let Some(header) = message.header {
221-
amqp_message_builder.with_header(header.into());
225+
amqp_message_builder = amqp_message_builder.with_header(header.into());
222226
}
223227

224228
if let Some(properties) = message.properties {
225229
info!("Converting properties to AmqpMessageProperties");
226-
amqp_message_builder.with_properties(properties);
230+
amqp_message_builder = amqp_message_builder.with_properties(properties);
227231
}
228232

229233
if let Some(delivery_annotations) = message.delivery_annotations {
230-
amqp_message_builder.with_delivery_annotations(delivery_annotations.0.into());
234+
amqp_message_builder =
235+
amqp_message_builder.with_delivery_annotations(delivery_annotations.0.into());
231236
}
232237

233238
if let Some(message_annotations) = message.message_annotations {
234-
amqp_message_builder.with_message_annotations(message_annotations.0.into());
239+
amqp_message_builder =
240+
amqp_message_builder.with_message_annotations(message_annotations.0.into());
235241
}
236242

237243
if let Some(footer) = message.footer {
238-
amqp_message_builder.with_footer(footer.0.into());
244+
amqp_message_builder = amqp_message_builder.with_footer(footer.0.into());
239245
}
240246

241247
amqp_message_builder.build()
242248
}
243249
}
244250

251+
fn fe2o3_message_from_amqp_message(
252+
message: &AmqpMessage,
253+
) -> fe2o3_amqp_types::messaging::Message<
254+
fe2o3_amqp_types::messaging::Body<fe2o3_amqp_types::primitives::Value>,
255+
> {
256+
let message_builder = fe2o3_amqp_types::messaging::Message::builder()
257+
.application_properties(message.application_properties().map(|x| x.clone().into()))
258+
.properties(message.properties().map(|p| p.clone().into()))
259+
.header(message.header().map(|x| x.clone().into()))
260+
.delivery_annotations(message.delivery_annotations().map(|x| x.clone().into()))
261+
.message_annotations(message.message_annotations().map(|x| x.clone().into()))
262+
.footer(message.footer().map(|x| x.clone().into()));
263+
264+
match message.body() {
265+
AmqpMessageBody::Value(value) => {
266+
let value: fe2o3_amqp_types::primitives::Value = value.clone().into();
267+
let value = fe2o3_amqp_types::messaging::Body::Value(value.into_body());
268+
let message_builder = message_builder.body(value);
269+
message_builder.build()
270+
}
271+
AmqpMessageBody::Binary(data) => {
272+
let data: Vec<serde_bytes::ByteBuf> = data
273+
.clone()
274+
.into_iter()
275+
.map(serde_bytes::ByteBuf::from)
276+
.collect();
277+
let message_builder = message_builder.body(fe2o3_amqp_types::messaging::Body::Data(
278+
data.into_iter().map(|x| x.into()).collect(),
279+
));
280+
message_builder.build()
281+
}
282+
AmqpMessageBody::Empty => message_builder
283+
.body(fe2o3_amqp_types::messaging::Body::Empty)
284+
.build(),
285+
AmqpMessageBody::Sequence(sequence) => {
286+
let sequence: TransparentVec<
287+
fe2o3_amqp_types::primitives::List<fe2o3_amqp_types::primitives::Value>,
288+
> = sequence
289+
.iter()
290+
.map(|x| {
291+
let mut l = fe2o3_amqp_types::primitives::List::new();
292+
let c = x
293+
.clone()
294+
.0
295+
.into_iter()
296+
.map(|v| Into::<fe2o3_amqp_types::primitives::Value>::into(v.clone()));
297+
for v in c {
298+
l.push(v);
299+
}
300+
l
301+
})
302+
.collect();
303+
let amqp_sequence = TransparentVec::<
304+
fe2o3_amqp_types::messaging::AmqpSequence<fe2o3_amqp_types::primitives::Value>,
305+
>::new(
306+
sequence
307+
.into_iter()
308+
.map(|x| {
309+
x.into_iter()
310+
.collect::<fe2o3_amqp_types::primitives::List<
311+
fe2o3_amqp_types::primitives::Value,
312+
>>()
313+
.into()
314+
})
315+
.collect::<Vec<
316+
fe2o3_amqp_types::messaging::AmqpSequence<
317+
fe2o3_amqp_types::primitives::Value,
318+
>,
319+
>>(),
320+
);
321+
322+
let message_builder =
323+
message_builder.body(fe2o3_amqp_types::messaging::Body::Sequence(amqp_sequence));
324+
message_builder.build()
325+
}
326+
}
327+
}
328+
245329
impl From<AmqpMessage>
246330
for fe2o3_amqp_types::messaging::Message<
247331
fe2o3_amqp_types::messaging::Body<fe2o3_amqp_types::primitives::Value>,
248332
>
249333
{
250334
fn from(message: AmqpMessage) -> Self {
251-
let message_builder = fe2o3_amqp_types::messaging::Message::builder()
252-
.application_properties(message.application_properties().map(|x| x.clone().into()))
253-
.properties(message.properties().map(|p| p.clone().into()))
254-
.header(message.header().map(|x| x.clone().into()))
255-
.delivery_annotations(message.delivery_annotations().map(|x| x.clone().into()))
256-
.message_annotations(message.message_annotations().map(|x| x.clone().into()))
257-
.footer(message.footer().map(|x| x.clone().into()));
258-
259-
match message.body() {
260-
AmqpMessageBody::Value(value) => {
261-
let value: fe2o3_amqp_types::primitives::Value = value.clone().into();
262-
let value = fe2o3_amqp_types::messaging::Body::Value(value.into_body());
263-
let message_builder = message_builder.body(value);
264-
message_builder.build()
265-
}
266-
AmqpMessageBody::Binary(data) => {
267-
let data: Vec<serde_bytes::ByteBuf> = data
268-
.clone()
269-
.into_iter()
270-
.map(serde_bytes::ByteBuf::from)
271-
.collect();
272-
let message_builder =
273-
message_builder.body(fe2o3_amqp_types::messaging::Body::Data(
274-
data.into_iter().map(|x| x.into()).collect(),
275-
));
276-
message_builder.build()
277-
}
278-
AmqpMessageBody::Empty => message_builder
279-
.body(fe2o3_amqp_types::messaging::Body::Empty)
280-
.build(),
281-
AmqpMessageBody::Sequence(sequence) => {
282-
let sequence: TransparentVec<
283-
fe2o3_amqp_types::primitives::List<fe2o3_amqp_types::primitives::Value>,
284-
> = sequence
285-
.iter()
286-
.map(|x| {
287-
let mut l = fe2o3_amqp_types::primitives::List::new();
288-
let c =
289-
x.clone().0.into_iter().map(|v| {
290-
Into::<fe2o3_amqp_types::primitives::Value>::into(v.clone())
291-
});
292-
for v in c {
293-
l.push(v);
294-
}
295-
l
296-
})
297-
.collect();
298-
let amqp_sequence = TransparentVec::<
299-
fe2o3_amqp_types::messaging::AmqpSequence<fe2o3_amqp_types::primitives::Value>,
300-
>::new(
301-
sequence
302-
.into_iter()
303-
.map(|x| {
304-
x.into_iter()
305-
.collect::<fe2o3_amqp_types::primitives::List<
306-
fe2o3_amqp_types::primitives::Value,
307-
>>()
308-
.into()
309-
})
310-
.collect::<Vec<
311-
fe2o3_amqp_types::messaging::AmqpSequence<
312-
fe2o3_amqp_types::primitives::Value,
313-
>,
314-
>>(),
315-
);
316-
317-
let message_builder = message_builder
318-
.body(fe2o3_amqp_types::messaging::Body::Sequence(amqp_sequence));
319-
message_builder.build()
320-
}
321-
}
335+
fe2o3_message_from_amqp_message(&message)
336+
}
337+
}
338+
impl From<&AmqpMessage>
339+
for fe2o3_amqp_types::messaging::Message<
340+
fe2o3_amqp_types::messaging::Body<fe2o3_amqp_types::primitives::Value>,
341+
>
342+
{
343+
fn from(message: &AmqpMessage) -> Self {
344+
fe2o3_message_from_amqp_message(message)
322345
}
323346
}
324347

0 commit comments

Comments
 (0)