Skip to content

Commit e516489

Browse files
committed
initial impl of deserializers
Signed-off-by: minghuaw <[email protected]>
1 parent 4017ffc commit e516489

File tree

6 files changed

+265
-196
lines changed

6 files changed

+265
-196
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ http-body = { version = "^0.4", optional = true }
5454
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
5555
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
5656
fe2o3-amqp-lib = { version = "0.4.0", optional = true, package = "fe2o3-amqp" }
57-
fe2o3-amqp-types = { version = "0.3.3", optional = true }
57+
fe2o3-amqp-types = { version = "0.3.4", optional = true }
5858

5959
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
6060
hostname = "^0.3"

src/binding/fe2o3_amqp/constants.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
// Required
3+
pub(super) const ID: &str = "id";
4+
pub(super) const SOURCE: &str = "source";
5+
pub(super) const SPECVERSION: &str = "specversion";
6+
pub(super) const TYPE: &str = "type";
7+
8+
// Optional
9+
pub(super) const DATACONTENTTYPE: &str = "datacontenttype";
10+
pub(super) const DATASCHEMA: &str = "dataschema";
11+
pub(super) const SUBJECT: &str = "subject";
12+
pub(super) const TIME: &str = "time";
13+
14+
pub(super) mod prefixed {
15+
// Required
16+
pub const ID: &str = "cloudEvents:id";
17+
pub const SOURCE: &str = "cloudEvents:source";
18+
pub const SPECVERSION: &str = "cloudEvents:specversion";
19+
pub const TYPE: &str = "cloudEvents:type";
20+
21+
// Optional
22+
pub const DATASCHEMA: &str = "cloudEvents:dataschema";
23+
pub const SUBJECT: &str = "cloudEvents:subject";
24+
pub const TIME: &str = "cloudEvents:time";
25+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use std::convert::TryFrom;
2+
3+
use fe2o3_amqp_types::primitives::{SimpleValue, Symbol};
4+
5+
use crate::{
6+
binding::CLOUDEVENTS_JSON_HEADER,
7+
event::SpecVersion,
8+
message::{
9+
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
10+
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
11+
},
12+
};
13+
14+
use super::{
15+
constants::{self, prefixed, DATACONTENTTYPE},
16+
AmqpCloudEvent, ATTRIBUTE_PREFIX,
17+
};
18+
19+
impl BinaryDeserializer for AmqpCloudEvent {
20+
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(
21+
mut self,
22+
mut serializer: V,
23+
) -> Result<R> {
24+
use fe2o3_amqp_types::messaging::Body;
25+
26+
// specversion
27+
let spec_version = {
28+
let value = self
29+
.application_properties
30+
.remove(prefixed::SPECVERSION)
31+
.ok_or(Error::WrongEncoding {})
32+
.map(|val| match val {
33+
SimpleValue::String(s) => Ok(s),
34+
_ => Err(Error::WrongEncoding {}),
35+
})??;
36+
SpecVersion::try_from(&value[..])?
37+
};
38+
serializer = serializer.set_spec_version(spec_version.clone())?;
39+
40+
// datacontenttype
41+
serializer = match self.content_type {
42+
Some(Symbol(content_type)) => serializer
43+
.set_attribute(DATACONTENTTYPE, MessageAttributeValue::String(content_type))?,
44+
None => serializer,
45+
};
46+
47+
// remaining attributes
48+
let attributes = spec_version.attribute_names();
49+
50+
for (key, value) in self.application_properties.0.into_iter() {
51+
if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) {
52+
if attributes.contains(&key) {
53+
let value = MessageAttributeValue::try_from((key, value))?;
54+
serializer = serializer.set_attribute(key, value)?;
55+
} else {
56+
let value = MessageAttributeValue::try_from(value)?;
57+
serializer = serializer.set_extension(key, value)?;
58+
}
59+
}
60+
}
61+
62+
match self.body {
63+
Body::Data(data) => {
64+
let bytes = data.0.into_vec();
65+
serializer.end_with_data(bytes)
66+
}
67+
Body::Nothing => serializer.end(),
68+
Body::Sequence(_) | Body::Value(_) => Err(Error::WrongEncoding {}),
69+
}
70+
}
71+
}
72+
73+
impl StructuredDeserializer for AmqpCloudEvent {
74+
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(
75+
self,
76+
serializer: V,
77+
) -> Result<R> {
78+
use fe2o3_amqp_types::messaging::Body;
79+
let bytes = match self.body {
80+
Body::Data(data) => data.0.into_vec(),
81+
Body::Nothing => vec![],
82+
Body::Sequence(_)
83+
| Body::Value(_) => return Err(Error::WrongEncoding { }),
84+
};
85+
serializer.set_structured_event(bytes)
86+
}
87+
}
88+
89+
impl MessageDeserializer for AmqpCloudEvent {
90+
fn encoding(&self) -> Encoding {
91+
match self
92+
.content_type
93+
.as_ref()
94+
.map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
95+
{
96+
Some(true) => Encoding::STRUCTURED,
97+
Some(false) => Encoding::BINARY,
98+
None => Encoding::UNKNOWN,
99+
}
100+
}
101+
}

src/binding/fe2o3_amqp/extensions.rs

Lines changed: 0 additions & 77 deletions
This file was deleted.

0 commit comments

Comments
 (0)