Skip to content

Commit f34bbe5

Browse files
committed
impl From<AmqpMessage> for AmqpCloudEvent
Signed-off-by: minghuaw <[email protected]>
1 parent 8ea7cdc commit f34bbe5

File tree

5 files changed

+74
-36
lines changed

5 files changed

+74
-36
lines changed

src/binding/fe2o3_amqp/constants.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
// Required
32
pub(super) const ID: &str = "id";
43
pub(super) const SOURCE: &str = "source";
@@ -22,4 +21,4 @@ pub(super) mod prefixed {
2221
pub const DATASCHEMA: &str = "cloudEvents:dataschema";
2322
pub const SUBJECT: &str = "cloudEvents:subject";
2423
pub const TIME: &str = "cloudEvents:time";
25-
}
24+
}

src/binding/fe2o3_amqp/deserializer.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ impl BinaryDeserializer for AmqpCloudEvent {
2727
let spec_version = {
2828
let value = self
2929
.application_properties
30+
.as_mut()
31+
.ok_or(Error::WrongEncoding {})?
3032
.remove(prefixed::SPECVERSION)
3133
.ok_or(Error::WrongEncoding {})
3234
.map(|val| match val {
@@ -47,14 +49,16 @@ impl BinaryDeserializer for AmqpCloudEvent {
4749
// remaining attributes
4850
let attributes = spec_version.attribute_names();
4951

50-
for (key, value) in self.application_properties.0.into_iter() {
51-
if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) {
52-
if attributes.contains(&key) {
53-
let value = MessageAttributeValue::try_from((key, value))?;
54-
serializer = serializer.set_attribute(key, value)?;
55-
} else {
56-
let value = MessageAttributeValue::try_from(value)?;
57-
serializer = serializer.set_extension(key, value)?;
52+
if let Some(application_properties) = self.application_properties {
53+
for (key, value) in application_properties.0.into_iter() {
54+
if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) {
55+
if attributes.contains(&key) {
56+
let value = MessageAttributeValue::try_from((key, value))?;
57+
serializer = serializer.set_attribute(key, value)?;
58+
} else {
59+
let value = MessageAttributeValue::try_from(value)?;
60+
serializer = serializer.set_extension(key, value)?;
61+
}
5862
}
5963
}
6064
}
@@ -79,8 +83,7 @@ impl StructuredDeserializer for AmqpCloudEvent {
7983
let bytes = match self.body {
8084
Body::Data(data) => data.0.into_vec(),
8185
Body::Nothing => vec![],
82-
Body::Sequence(_)
83-
| Body::Value(_) => return Err(Error::WrongEncoding { }),
86+
Body::Sequence(_) | Body::Value(_) => return Err(Error::WrongEncoding {}),
8487
};
8588
serializer.set_structured_event(bytes)
8689
}

src/binding/fe2o3_amqp/mod.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
use std::collections::HashMap;
44
use std::convert::TryFrom;
55

6-
use chrono::{Utc, TimeZone};
6+
use chrono::{TimeZone, Utc};
77
use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Properties};
88
use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value};
99

1010
use crate::event::{AttributeValue, ExtensionValue};
11-
use crate::message::{Error, MessageAttributeValue};
11+
use crate::message::{BinaryDeserializer, Error, MessageAttributeValue};
1212
use crate::Event;
1313

1414
use self::constants::{
@@ -39,13 +39,21 @@ pub type Extensions = HashMap<String, ExtensionValue>;
3939
/// structured mode, otherwise it defaults to binary mode.
4040
pub struct AmqpCloudEvent {
4141
content_type: Option<Symbol>,
42-
application_properties: ApplicationProperties,
42+
application_properties: Option<ApplicationProperties>,
4343
body: AmqpBody,
4444
}
4545

4646
impl AmqpCloudEvent {
47-
pub fn from_event(event: Event) -> Result<Self, Error> {
48-
todo!()
47+
fn new() -> Self {
48+
Self {
49+
content_type: None,
50+
application_properties: None,
51+
body: Body::Nothing,
52+
}
53+
}
54+
55+
pub fn from_binary_event(event: Event) -> Result<Self, Error> {
56+
BinaryDeserializer::deserialize_binary(event, Self::new())
4957
}
5058
}
5159

@@ -58,13 +66,24 @@ impl From<AmqpCloudEvent> for AmqpMessage {
5866
delivery_annotations: None,
5967
message_annotations: None,
6068
properties: Some(properties),
61-
application_properties: Some(event.application_properties),
69+
application_properties: event.application_properties,
6270
body: event.body,
6371
footer: None,
6472
}
6573
}
6674
}
6775

76+
impl From<AmqpMessage> for AmqpCloudEvent {
77+
fn from(message: AmqpMessage) -> Self {
78+
let content_type = message.properties.map(|p| p.content_type).flatten();
79+
Self {
80+
content_type,
81+
application_properties: message.application_properties,
82+
body: message.body,
83+
}
84+
}
85+
}
86+
6887
impl<'a> From<AttributeValue<'a>> for SimpleValue {
6988
fn from(value: AttributeValue) -> Self {
7089
match value {
@@ -149,10 +168,10 @@ impl TryFrom<SimpleValue> for MessageAttributeValue {
149168
SimpleValue::Timestamp(val) => {
150169
let datetime = Utc.timestamp_millis(val.into_inner());
151170
Ok(MessageAttributeValue::DateTime(datetime))
152-
},
171+
}
153172
SimpleValue::Binary(val) => Ok(MessageAttributeValue::Binary(val.into_vec())),
154173
SimpleValue::String(val) => Ok(MessageAttributeValue::String(val)),
155-
_ => Err(Error::WrongEncoding { })
174+
_ => Err(Error::WrongEncoding {}),
156175
}
157176
}
158177
}
@@ -163,13 +182,13 @@ impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue {
163182
fn try_from((key, value): (&'a str, SimpleValue)) -> Result<Self, Self::Error> {
164183
match key {
165184
// String
166-
ID | prefixed::ID
185+
ID | prefixed::ID
167186
// String
168-
| SPECVERSION | prefixed::SPECVERSION
187+
| SPECVERSION | prefixed::SPECVERSION
169188
// String
170189
| TYPE | prefixed::TYPE
171190
// String
172-
| DATACONTENTTYPE
191+
| DATACONTENTTYPE
173192
// String
174193
| SUBJECT | prefixed::SUBJECT => {
175194
let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?;

src/binding/fe2o3_amqp/serializer.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,49 @@
1-
use fe2o3_amqp_types::primitives::{SimpleValue, Symbol, Binary};
2-
use fe2o3_amqp_types::messaging::{Data as AmqpData};
1+
use fe2o3_amqp_types::messaging::{ApplicationProperties, Data as AmqpData};
2+
use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol};
33

44
use crate::binding::header_prefix;
55
use crate::message::StructuredSerializer;
6-
use crate::{message::{BinarySerializer, MessageAttributeValue, Error}, event::SpecVersion};
6+
use crate::{
7+
event::SpecVersion,
8+
message::{BinarySerializer, Error, MessageAttributeValue},
9+
};
710

811
use super::constants::DATACONTENTTYPE;
9-
use super::{AmqpCloudEvent, ATTRIBUTE_PREFIX, AmqpBody};
12+
use super::{AmqpBody, AmqpCloudEvent, ATTRIBUTE_PREFIX};
1013

1114
impl BinarySerializer<AmqpCloudEvent> for AmqpCloudEvent {
1215
fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result<Self> {
1316
let key = String::from("cloudEvents:specversion");
1417
let value = String::from(spec_version.as_str());
15-
self.application_properties.insert(key, SimpleValue::from(value));
18+
self.application_properties
19+
.get_or_insert(ApplicationProperties::default())
20+
.insert(key, SimpleValue::from(value));
1621
Ok(self)
1722
}
1823

19-
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result<Self> {
24+
fn set_attribute(
25+
mut self,
26+
name: &str,
27+
value: MessageAttributeValue,
28+
) -> crate::message::Result<Self> {
2029
// For the binary mode, the AMQP content-type property field value maps directly to the
2130
// CloudEvents datacontenttype attribute.
22-
//
31+
//
2332
// All CloudEvents attributes with exception of datacontenttype MUST be individually mapped
2433
// to and from the AMQP application-properties section.
2534
if name == DATACONTENTTYPE {
2635
self.content_type = match value {
2736
MessageAttributeValue::String(s) => Some(Symbol::from(s)),
28-
_ => return Err(Error::WrongEncoding { })
37+
_ => return Err(Error::WrongEncoding {}),
2938
}
3039
} else {
3140
// CloudEvent attributes are prefixed with "cloudEvents:" for use in the
3241
// application-properties section
3342
let key = header_prefix(ATTRIBUTE_PREFIX, name);
3443
let value = SimpleValue::from(value);
35-
self.application_properties.insert(key, value);
44+
self.application_properties
45+
.get_or_insert(ApplicationProperties::default())
46+
.insert(key, value);
3647
}
3748

3849
Ok(self)
@@ -44,10 +55,16 @@ impl BinarySerializer<AmqpCloudEvent> for AmqpCloudEvent {
4455
// systems that also process the message. Extension specifications that do this SHOULD specify
4556
// how receivers are to interpret messages if the copied values differ from the cloud-event
4657
// serialized values.
47-
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result<Self> {
58+
fn set_extension(
59+
mut self,
60+
name: &str,
61+
value: MessageAttributeValue,
62+
) -> crate::message::Result<Self> {
4863
let key = header_prefix(ATTRIBUTE_PREFIX, name);
4964
let value = SimpleValue::from(value);
50-
self.application_properties.insert(key, value);
65+
self.application_properties
66+
.get_or_insert(ApplicationProperties::default())
67+
.insert(key, value);
5168
Ok(self)
5269
}
5370

src/binding/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
pub mod actix;
55
#[cfg(feature = "axum")]
66
pub mod axum;
7+
#[cfg(feature = "fe2o3-amqp")]
8+
pub mod fe2o3_amqp;
79
#[cfg(any(
810
feature = "http-binding",
911
feature = "actix",
@@ -23,8 +25,6 @@ pub mod rdkafka;
2325
pub mod reqwest;
2426
#[cfg(feature = "warp")]
2527
pub mod warp;
26-
#[cfg(feature = "fe2o3-amqp")]
27-
pub mod fe2o3_amqp;
2828

2929
#[cfg(feature = "rdkafka")]
3030
pub(crate) mod kafka {

0 commit comments

Comments
 (0)