Skip to content

Commit 2399f54

Browse files
committed
Revert "removed AmqpCloudEvent"
This reverts commit 1214976. Signed-off-by: minghuaw <[email protected]>
1 parent 07f4729 commit 2399f54

File tree

3 files changed

+77
-67
lines changed

3 files changed

+77
-67
lines changed

src/binding/fe2o3_amqp/deserializer.rs

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ use crate::{
1313

1414
use super::{
1515
constants::{prefixed, DATACONTENTTYPE},
16-
ATTRIBUTE_PREFIX, AmqpMessage,
16+
AmqpCloudEvent, ATTRIBUTE_PREFIX,
1717
};
1818

19-
impl BinaryDeserializer for AmqpMessage {
19+
impl BinaryDeserializer for AmqpCloudEvent {
2020
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(
2121
mut self,
2222
mut serializer: V,
@@ -27,8 +27,6 @@ impl BinaryDeserializer for AmqpMessage {
2727
let spec_version = {
2828
let value = self
2929
.application_properties
30-
.as_mut()
31-
.ok_or(Error::WrongEncoding { })?
3230
.remove(prefixed::SPECVERSION)
3331
.ok_or(Error::WrongEncoding {})
3432
.map(|val| match val {
@@ -40,24 +38,23 @@ impl BinaryDeserializer for AmqpMessage {
4038
serializer = serializer.set_spec_version(spec_version.clone())?;
4139

4240
// datacontenttype
43-
serializer = match self.properties.map(|p| p.content_type) {
44-
Some(Some(Symbol(content_type))) => serializer
41+
serializer = match self.content_type {
42+
Some(Symbol(content_type)) => serializer
4543
.set_attribute(DATACONTENTTYPE, MessageAttributeValue::String(content_type))?,
46-
_ => serializer,
44+
None => serializer,
4745
};
4846

4947
// remaining attributes
5048
let attributes = spec_version.attribute_names();
51-
if let Some(application_properties) = self.application_properties {
52-
for (key, value) in application_properties.0.into_iter() {
53-
if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) {
54-
if attributes.contains(&key) {
55-
let value = MessageAttributeValue::try_from((key, value))?;
56-
serializer = serializer.set_attribute(key, value)?;
57-
} else {
58-
let value = MessageAttributeValue::try_from(value)?;
59-
serializer = serializer.set_extension(key, value)?;
60-
}
49+
50+
for (key, value) in self.application_properties.0.into_iter() {
51+
if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) {
52+
if attributes.contains(&key) {
53+
let value = MessageAttributeValue::try_from((key, value))?;
54+
serializer = serializer.set_attribute(key, value)?;
55+
} else {
56+
let value = MessageAttributeValue::try_from(value)?;
57+
serializer = serializer.set_extension(key, value)?;
6158
}
6259
}
6360
}
@@ -73,7 +70,7 @@ impl BinaryDeserializer for AmqpMessage {
7370
}
7471
}
7572

76-
impl StructuredDeserializer for AmqpMessage {
73+
impl StructuredDeserializer for AmqpCloudEvent {
7774
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(
7875
self,
7976
serializer: V,
@@ -89,14 +86,12 @@ impl StructuredDeserializer for AmqpMessage {
8986
}
9087
}
9188

92-
impl MessageDeserializer for AmqpMessage {
89+
impl MessageDeserializer for AmqpCloudEvent {
9390
fn encoding(&self) -> Encoding {
9491
match self
95-
.properties
92+
.content_type
9693
.as_ref()
97-
.map(|p| p.content_type.as_ref()
98-
.map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
99-
).flatten()
94+
.map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
10095
{
10196
Some(true) => Encoding::STRUCTURED,
10297
Some(false) => Encoding::BINARY,

src/binding/fe2o3_amqp/mod.rs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
//! Implements AMQP 1.0 binding for CloudEvents
22
3+
use std::collections::HashMap;
34
use std::convert::TryFrom;
45

56
use chrono::{Utc, TimeZone};
6-
use fe2o3_amqp_lib::types::messaging::{Body, Message};
7-
use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Timestamp, Value};
7+
use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Properties};
8+
use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value};
89

9-
use crate::event::{AttributeValue};
10+
use crate::event::{AttributeValue, ExtensionValue};
1011
use crate::message::{Error, MessageAttributeValue};
12+
use crate::Event;
1113

1214
use self::constants::{
1315
prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE,
@@ -25,8 +27,43 @@ mod constants;
2527
/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of
2628
/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For
2729
/// convenience, this type alias chose `Value` as the value of the generic parameter
28-
type AmqpMessage = Message<Value>;
29-
type AmqpBody = Body<Value>;
30+
pub type AmqpMessage = Message<Value>;
31+
32+
pub type AmqpBody = Body<Value>;
33+
34+
pub type Extensions = HashMap<String, ExtensionValue>;
35+
36+
/// The receiver of the event can distinguish between the two modes by inspecting the content-type
37+
/// message property field. If the value is prefixed with the CloudEvents media type
38+
/// application/cloudevents, indicating the use of a known event format, the receiver uses
39+
/// structured mode, otherwise it defaults to binary mode.
40+
pub struct AmqpCloudEvent {
41+
content_type: Option<Symbol>,
42+
application_properties: ApplicationProperties,
43+
body: AmqpBody,
44+
}
45+
46+
impl AmqpCloudEvent {
47+
pub fn from_event(event: Event) -> Result<Self, Error> {
48+
todo!()
49+
}
50+
}
51+
52+
impl From<AmqpCloudEvent> for AmqpMessage {
53+
fn from(event: AmqpCloudEvent) -> Self {
54+
let mut properties = Properties::default();
55+
properties.content_type = event.content_type;
56+
Message {
57+
header: None,
58+
delivery_annotations: None,
59+
message_annotations: None,
60+
properties: Some(properties),
61+
application_properties: Some(event.application_properties),
62+
body: event.body,
63+
footer: None,
64+
}
65+
}
66+
}
3067

3168
impl<'a> From<AttributeValue<'a>> for SimpleValue {
3269
fn from(value: AttributeValue) -> Self {

src/binding/fe2o3_amqp/serializer.rs

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,37 @@
1-
use fe2o3_amqp_types::messaging::{Data as AmqpData, Properties, ApplicationProperties};
2-
use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol};
1+
use fe2o3_amqp_types::primitives::{SimpleValue, Symbol, Binary};
2+
use fe2o3_amqp_types::messaging::{Data as AmqpData};
33

4-
use crate::binding::header_prefix;
54
use crate::message::StructuredSerializer;
6-
use crate::{
7-
event::SpecVersion,
8-
message::{BinarySerializer, Error, MessageAttributeValue},
9-
};
5+
use crate::{message::{BinarySerializer, MessageAttributeValue, Error}, event::SpecVersion};
106

117
use super::constants::DATACONTENTTYPE;
12-
use super::{AmqpBody, AmqpMessage, ATTRIBUTE_PREFIX};
8+
use super::{AmqpCloudEvent, ATTRIBUTE_PREFIX, AmqpBody};
139

14-
impl BinarySerializer<AmqpMessage> for AmqpMessage {
10+
impl BinarySerializer<AmqpCloudEvent> for AmqpCloudEvent {
1511
fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result<Self> {
1612
let key = String::from("cloudEvents:specversion");
1713
let value = String::from(spec_version.as_str());
18-
self.application_properties
19-
.get_or_insert(ApplicationProperties::default())
20-
.insert(key, SimpleValue::from(value));
14+
self.application_properties.insert(key, SimpleValue::from(value));
2115
Ok(self)
2216
}
2317

24-
fn set_attribute(
25-
mut self,
26-
name: &str,
27-
value: MessageAttributeValue,
28-
) -> crate::message::Result<Self> {
18+
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result<Self> {
2919
// For the binary mode, the AMQP content-type property field value maps directly to the
3020
// CloudEvents datacontenttype attribute.
31-
//
21+
//
3222
// All CloudEvents attributes with exception of datacontenttype MUST be individually mapped
3323
// to and from the AMQP application-properties section.
3424
if name == DATACONTENTTYPE {
35-
self.properties
36-
.get_or_insert(Properties::default())
37-
.content_type = match value {
25+
self.content_type = match value {
3826
MessageAttributeValue::String(s) => Some(Symbol::from(s)),
39-
_ => return Err(Error::WrongEncoding {}),
27+
_ => return Err(Error::WrongEncoding { })
4028
}
4129
} else {
4230
// CloudEvent attributes are prefixed with "cloudEvents:" for use in the
4331
// application-properties section
44-
let key = header_prefix(ATTRIBUTE_PREFIX, name);
32+
let key = format!("{}:{}", ATTRIBUTE_PREFIX, name);
4533
let value = SimpleValue::from(value);
46-
self.application_properties
47-
.get_or_insert(ApplicationProperties::default())
48-
.insert(key, value);
34+
self.application_properties.insert(key, value);
4935
}
5036

5137
Ok(self)
@@ -57,16 +43,10 @@ impl BinarySerializer<AmqpMessage> for AmqpMessage {
5743
// systems that also process the message. Extension specifications that do this SHOULD specify
5844
// how receivers are to interpret messages if the copied values differ from the cloud-event
5945
// serialized values.
60-
fn set_extension(
61-
mut self,
62-
name: &str,
63-
value: MessageAttributeValue,
64-
) -> crate::message::Result<Self> {
65-
let key = header_prefix(ATTRIBUTE_PREFIX, name);
46+
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result<Self> {
47+
let key = format!("{}:{}", ATTRIBUTE_PREFIX, name);
6648
let value = SimpleValue::from(value);
67-
self.application_properties
68-
.get_or_insert(ApplicationProperties::default())
69-
.insert(key, value);
49+
self.application_properties.insert(key, value);
7050
Ok(self)
7151
}
7252

@@ -81,11 +61,9 @@ impl BinarySerializer<AmqpMessage> for AmqpMessage {
8161
}
8262
}
8363

84-
impl StructuredSerializer<AmqpMessage> for AmqpMessage {
64+
impl StructuredSerializer<AmqpCloudEvent> for AmqpCloudEvent {
8565
fn set_structured_event(mut self, bytes: Vec<u8>) -> crate::message::Result<Self> {
86-
self.properties
87-
.get_or_insert(Properties::default())
88-
.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8"));
66+
self.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8"));
8967
self.body = AmqpBody::Data(AmqpData(Binary::from(bytes)));
9068
Ok(self)
9169
}

0 commit comments

Comments
 (0)