Skip to content

Commit 31c269b

Browse files
committed
implemented From<Event> for AmqpCloudEvent
Signed-off-by: minghuaw <[email protected]>
1 parent 567b8af commit 31c269b

File tree

2 files changed

+190
-21
lines changed

2 files changed

+190
-21
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ warp = ["warp-lib", "bytes", "http", "hyper"]
2525
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
2626
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
2727
nats = ["nats-lib"]
28-
fe2o3-amqp = ["fe2o3-amqp-lib"]
28+
fe2o3-amqp = ["fe2o3-amqp-lib", "fe2o3-amqp-types"]
2929

3030
[dependencies]
3131
serde = { version = "^1.0", features = ["derive"] }
@@ -54,6 +54,7 @@ http-body = { version = "^0.4", optional = true }
5454
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
5555
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
5656
fe2o3-amqp-lib = { version = "0.4.0", optional = true, package = "fe2o3-amqp" }
57+
fe2o3-amqp-types = { version = "0.3.3", optional = true }
5758

5859
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
5960
hostname = "^0.3"

src/binding/fe2o3_amqp/mod.rs

Lines changed: 188 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,53 +2,221 @@
22
33
use std::convert::TryFrom;
44

5-
use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Message, Body, Data as AmqpData, Properties};
6-
use fe2o3_amqp_lib::types::primitives::{Value, Binary};
5+
use fe2o3_amqp_lib::types::messaging::{
6+
ApplicationProperties, Body, Data as AmqpData, Message, Properties,
7+
};
8+
use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value};
79

8-
use crate::Event;
9-
use crate::message::Error;
10+
use crate::event::{AttributeValue, Attributes};
11+
use crate::message::{Error, MessageAttributeValue};
12+
use crate::{Event};
1013

1114
/// Type alias for an AMQP 1.0 message
12-
///
15+
///
1316
/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of
14-
/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For
17+
/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For
1518
/// convenience, this type alias chose `Value` as the value of the generic parameter
1619
pub type AmqpMessage = Message<Value>;
1720

21+
pub type AmqpBody = Body<Value>;
22+
1823
pub struct AmqpCloudEvent {
1924
properties: Properties,
2025
application_properties: ApplicationProperties,
21-
data: Binary,
26+
body: AmqpBody,
2227
}
2328

2429
impl From<AmqpCloudEvent> for AmqpMessage {
2530
fn from(event: AmqpCloudEvent) -> Self {
26-
Message::builder()
27-
.properties(event.properties)
28-
.application_properties(event.application_properties)
29-
.data(event.data)
30-
.build()
31+
Message {
32+
header: None,
33+
delivery_annotations: None,
34+
message_annotations: None,
35+
properties: Some(event.properties),
36+
application_properties: Some(event.application_properties),
37+
body: event.body,
38+
footer: None,
39+
}
3140
}
3241
}
3342

3443
impl TryFrom<AmqpMessage> for AmqpCloudEvent {
3544
type Error = Error;
3645

3746
fn try_from(value: AmqpMessage) -> Result<Self, Self::Error> {
38-
let data = match value.body {
39-
Body::Data(AmqpData(data)) => data,
40-
_ => return Err(Error::WrongEncoding { })
47+
let body = match value.body {
48+
Body::Data(data) => Body::Data(data),
49+
_ => return Err(Error::WrongEncoding {}),
4150
};
42-
let properties = value.properties
43-
.ok_or(Error::WrongEncoding { })?;
44-
let application_properties = value.application_properties
45-
.ok_or(Error::WrongEncoding { })?;
51+
let properties = value.properties.ok_or(Error::WrongEncoding {})?;
52+
let application_properties = value
53+
.application_properties
54+
.ok_or(Error::WrongEncoding {})?;
4655
Ok(Self {
4756
properties,
4857
application_properties,
49-
data,
58+
body,
5059
})
5160
}
5261
}
5362

63+
impl<'a> From<AttributeValue<'a>> for SimpleValue {
64+
fn from(value: AttributeValue) -> Self {
65+
match value {
66+
AttributeValue::SpecVersion(spec_ver) => {
67+
SimpleValue::String(String::from(spec_ver.as_str()))
68+
}
69+
AttributeValue::String(s) => SimpleValue::String(String::from(s)),
70+
AttributeValue::URI(uri) => SimpleValue::String(String::from(uri.as_str())),
71+
AttributeValue::URIRef(uri) => SimpleValue::String(uri.clone()),
72+
AttributeValue::Boolean(val) => SimpleValue::Bool(*val),
73+
AttributeValue::Integer(val) => SimpleValue::Long(*val),
74+
AttributeValue::Time(datetime) => {
75+
let millis = datetime.timestamp_millis();
76+
let timestamp = Timestamp::from_milliseconds(millis);
77+
SimpleValue::Timestamp(timestamp)
78+
}
79+
}
80+
}
81+
}
82+
83+
impl<'a> From<AttributeValue<'a>> for Value {
84+
fn from(value: AttributeValue) -> Self {
85+
match value {
86+
AttributeValue::SpecVersion(spec_ver) => Value::String(String::from(spec_ver.as_str())),
87+
AttributeValue::String(s) => Value::String(String::from(s)),
88+
AttributeValue::URI(uri) => Value::String(String::from(uri.as_str())),
89+
AttributeValue::URIRef(uri) => Value::String(uri.clone()),
90+
AttributeValue::Boolean(val) => Value::Bool(*val),
91+
AttributeValue::Integer(val) => Value::Long(*val),
92+
AttributeValue::Time(datetime) => {
93+
let millis = datetime.timestamp_millis();
94+
let timestamp = Timestamp::from_milliseconds(millis);
95+
Value::Timestamp(timestamp)
96+
}
97+
}
98+
}
99+
}
100+
101+
impl From<MessageAttributeValue> for SimpleValue {
102+
fn from(value: MessageAttributeValue) -> Self {
103+
match value {
104+
MessageAttributeValue::String(s) => SimpleValue::String(String::from(s)),
105+
MessageAttributeValue::Uri(uri) => SimpleValue::String(String::from(uri.as_str())),
106+
MessageAttributeValue::UriRef(uri) => SimpleValue::String(uri.clone()),
107+
MessageAttributeValue::Boolean(val) => SimpleValue::Bool(val),
108+
MessageAttributeValue::Integer(val) => SimpleValue::Long(val),
109+
MessageAttributeValue::DateTime(datetime) => {
110+
let millis = datetime.timestamp_millis();
111+
let timestamp = Timestamp::from_milliseconds(millis);
112+
SimpleValue::Timestamp(timestamp)
113+
}
114+
MessageAttributeValue::Binary(val) => SimpleValue::Binary(Binary::from(val)),
115+
}
116+
}
117+
}
118+
119+
impl From<MessageAttributeValue> for Value {
120+
fn from(value: MessageAttributeValue) -> Self {
121+
match value {
122+
MessageAttributeValue::String(s) => Value::String(String::from(s)),
123+
MessageAttributeValue::Uri(uri) => Value::String(String::from(uri.as_str())),
124+
MessageAttributeValue::UriRef(uri) => Value::String(uri.clone()),
125+
MessageAttributeValue::Boolean(val) => Value::Bool(val),
126+
MessageAttributeValue::Integer(val) => Value::Long(val),
127+
MessageAttributeValue::DateTime(datetime) => {
128+
let millis = datetime.timestamp_millis();
129+
let timestamp = Timestamp::from_milliseconds(millis);
130+
Value::Timestamp(timestamp)
131+
}
132+
MessageAttributeValue::Binary(val) => Value::Binary(Binary::from(val)),
133+
}
134+
}
135+
}
136+
137+
/// The `BinarySerializer`/`StructuredSerializer` traits are not implemented because
138+
/// "datacontenttype" needs special treatment in AMQP. However, `StructureSerializer` doesn't
139+
/// provide access to "datacontenttype"
140+
impl TryFrom<Event> for AmqpCloudEvent {
141+
type Error = Error;
142+
143+
fn try_from(mut event: Event) -> Result<Self, Self::Error> {
144+
let mut properties = Properties::default();
145+
properties.content_type = match &mut event.attributes {
146+
Attributes::V03(attributes) => attributes.datacontenttype.take(),
147+
Attributes::V10(attributes) => attributes.datacontenttype.take(),
148+
}.map(Symbol::from);
149+
150+
let mut application_properties = ApplicationProperties::default();
151+
for (key, value) in event.attributes.iter() {
152+
if key == "datacontenttype" {
153+
continue;
154+
} else {
155+
let key = format!("cloudEvents:{}", key);
156+
application_properties.insert(key, SimpleValue::from(value));
157+
}
158+
}
159+
160+
let body = match event.data {
161+
Some(data) => match data {
162+
crate::Data::Binary(data) => Body::Data(AmqpData(Binary::from(data))),
163+
crate::Data::String(val) => Body::Data(AmqpData(Binary::from(val))),
164+
crate::Data::Json(val) => {
165+
let bytes = serde_json::to_vec(&val)?;
166+
Body::Data(AmqpData(Binary::from(bytes)))
167+
},
168+
},
169+
None => AmqpBody::Nothing,
170+
};
171+
172+
Ok(Self {
173+
properties,
174+
application_properties,
175+
body,
176+
})
177+
}
178+
}
179+
180+
// impl BinarySerializer<AmqpCloudEvent> for AmqpCloudEvent {
181+
// fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result<Self> {
182+
// let key = String::from("cloudEvents:specversion");
183+
// let value = String::from(spec_version.as_str());
184+
// self.application_properties.insert(key, SimpleValue::from(value));
185+
// Ok(self)
186+
// }
187+
188+
// fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result<Self> {
189+
// if name == "datacontenttype" {
190+
// self.properties.content_type = match value {
191+
// MessageAttributeValue::String(s) => Some(Symbol::from(s)),
192+
// _ => return Err(Error::WrongEncoding { })
193+
// }
194+
// } else {
195+
// let key = format!("cloudEvents:{}", name);
196+
// let value = SimpleValue::from(value);
197+
// self.application_properties.insert(key, value);
198+
// }
199+
200+
// Ok(self)
201+
// }
202+
203+
// fn set_extension(self, name: &str, value: MessageAttributeValue) -> crate::message::Result<Self> {
204+
// todo!()
205+
// }
206+
207+
// fn end_with_data(mut self, bytes: Vec<u8>) -> crate::message::Result<Self> {
208+
// let data = Binary::from(bytes);
209+
// self.body = Body::Data(AmqpData(data));
210+
// Ok(self)
211+
// }
212+
213+
// fn end(self) -> crate::message::Result<Self> {
214+
// Ok(self)
215+
// }
216+
// }
54217

218+
// impl StructuredSerializer<AmqpCloudEvent> for AmqpCloudEvent {
219+
// fn set_structured_event(self, bytes: Vec<u8>) -> crate::message::Result<Self> {
220+
// todo!()
221+
// }
222+
// }

0 commit comments

Comments
 (0)