Skip to content

Commit a29e06c

Browse files
committed
added udf to handle extensions
Signed-off-by: minghuaw <[email protected]>
1 parent 31c269b commit a29e06c

File tree

2 files changed

+140
-35
lines changed

2 files changed

+140
-35
lines changed

src/binding/fe2o3_amqp/extensions.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use std::collections::HashMap;
2+
3+
use fe2o3_amqp_types::messaging::{ApplicationProperties, Properties};
4+
5+
use crate::{event::ExtensionValue, message::Error, Event};
6+
7+
use super::{
8+
from_event_attributes, from_event_data, AmqpBody, AmqpCloudEvent, AmqpMessage, Extensions,
9+
};
10+
11+
pub struct ExtensionsHandler<F>
12+
where
13+
F: FnOnce(Extensions) -> AmqpMessage,
14+
{
15+
handler: F,
16+
}
17+
18+
impl<F> ExtensionsHandler<F>
19+
where
20+
F: FnOnce(Extensions) -> AmqpMessage,
21+
{
22+
pub fn new(handler: F) -> Self {
23+
Self { handler }
24+
}
25+
26+
pub fn from_event(self, event: Event) -> Result<AmqpCloudEventExt<F>, Error> {
27+
let (content_type, application_properties) = from_event_attributes(event.attributes);
28+
let body = from_event_data(event.data)?;
29+
let inner = AmqpCloudEvent {
30+
content_type,
31+
application_properties,
32+
body,
33+
};
34+
Ok(AmqpCloudEventExt {
35+
inner,
36+
extensions: event.extensions,
37+
handler: self.handler,
38+
})
39+
}
40+
}
41+
42+
pub struct AmqpCloudEventExt<F>
43+
where
44+
F: FnOnce(Extensions) -> AmqpMessage,
45+
{
46+
inner: AmqpCloudEvent,
47+
extensions: Extensions,
48+
handler: F,
49+
}
50+
51+
impl<F> AmqpCloudEventExt<F> where F: FnOnce(Extensions) -> AmqpMessage {}
52+
53+
impl<F> From<AmqpCloudEventExt<F>> for AmqpMessage
54+
where
55+
F: FnOnce(Extensions) -> AmqpMessage,
56+
{
57+
fn from(mut value: AmqpCloudEventExt<F>) -> Self {
58+
let mut message = (value.handler)(value.extensions);
59+
60+
// Set content_type to "datacontenttype"
61+
let properties = message.properties.get_or_insert(Properties::default());
62+
properties.content_type = value.inner.content_type;
63+
64+
// Append ApplicationProperties
65+
let application_properties = message
66+
.application_properties
67+
.get_or_insert(ApplicationProperties::default());
68+
application_properties
69+
.0
70+
.append(&mut value.inner.application_properties.0);
71+
72+
// Overrite the message body
73+
message.body = value.inner.body;
74+
75+
message
76+
}
77+
}

src/binding/fe2o3_amqp/mod.rs

Lines changed: 63 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
//! Implements AMQP 1.0 binding for CloudEvents
22
3+
use std::collections::HashMap;
34
use std::convert::TryFrom;
45

56
use fe2o3_amqp_lib::types::messaging::{
67
ApplicationProperties, Body, Data as AmqpData, Message, Properties,
78
};
89
use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value};
910

10-
use crate::event::{AttributeValue, Attributes};
11+
use crate::event::{AttributeValue, Attributes, ExtensionValue};
1112
use crate::message::{Error, MessageAttributeValue};
12-
use crate::{Event};
13+
use crate::{Event, AttributesReader, Data};
14+
15+
use self::extensions::ExtensionsHandler;
16+
17+
pub mod extensions;
1318

1419
/// Type alias for an AMQP 1.0 message
1520
///
@@ -20,19 +25,36 @@ pub type AmqpMessage = Message<Value>;
2025

2126
pub type AmqpBody = Body<Value>;
2227

28+
pub type Extensions = HashMap<String, ExtensionValue>;
29+
2330
pub struct AmqpCloudEvent {
24-
properties: Properties,
31+
content_type: Option<Symbol>,
2532
application_properties: ApplicationProperties,
2633
body: AmqpBody,
2734
}
2835

36+
impl AmqpCloudEvent {
37+
pub fn with_extensions_handler<F>(handler: F) -> ExtensionsHandler<F>
38+
where
39+
F: FnOnce(Extensions) -> AmqpMessage
40+
{
41+
ExtensionsHandler::new(handler)
42+
}
43+
44+
pub fn from_event(event: Event) -> Result<Self, Error> {
45+
Self::try_from(event)
46+
}
47+
}
48+
2949
impl From<AmqpCloudEvent> for AmqpMessage {
3050
fn from(event: AmqpCloudEvent) -> Self {
51+
let mut properties = Properties::default();
52+
properties.content_type = event.content_type;
3153
Message {
3254
header: None,
3355
delivery_annotations: None,
3456
message_annotations: None,
35-
properties: Some(event.properties),
57+
properties: Some(properties),
3658
application_properties: Some(event.application_properties),
3759
body: event.body,
3860
footer: None,
@@ -48,12 +70,13 @@ impl TryFrom<AmqpMessage> for AmqpCloudEvent {
4870
Body::Data(data) => Body::Data(data),
4971
_ => return Err(Error::WrongEncoding {}),
5072
};
51-
let properties = value.properties.ok_or(Error::WrongEncoding {})?;
73+
let content_type = value.properties.ok_or(Error::WrongEncoding {})?
74+
.content_type.take();
5275
let application_properties = value
5376
.application_properties
5477
.ok_or(Error::WrongEncoding {})?;
5578
Ok(Self {
56-
properties,
79+
content_type,
5780
application_properties,
5881
body,
5982
})
@@ -140,43 +163,48 @@ impl From<MessageAttributeValue> for Value {
140163
impl TryFrom<Event> for AmqpCloudEvent {
141164
type Error = Error;
142165

143-
fn try_from(mut event: Event) -> Result<Self, Self::Error> {
144-
let mut properties = Properties::default();
145-
properties.content_type = match &mut event.attributes {
146-
Attributes::V03(attributes) => attributes.datacontenttype.take(),
147-
Attributes::V10(attributes) => attributes.datacontenttype.take(),
148-
}.map(Symbol::from);
149-
150-
let mut application_properties = ApplicationProperties::default();
151-
for (key, value) in event.attributes.iter() {
152-
if key == "datacontenttype" {
153-
continue;
154-
} else {
155-
let key = format!("cloudEvents:{}", key);
156-
application_properties.insert(key, SimpleValue::from(value));
157-
}
158-
}
159-
160-
let body = match event.data {
161-
Some(data) => match data {
162-
crate::Data::Binary(data) => Body::Data(AmqpData(Binary::from(data))),
163-
crate::Data::String(val) => Body::Data(AmqpData(Binary::from(val))),
164-
crate::Data::Json(val) => {
165-
let bytes = serde_json::to_vec(&val)?;
166-
Body::Data(AmqpData(Binary::from(bytes)))
167-
},
168-
},
169-
None => AmqpBody::Nothing,
170-
};
166+
fn try_from(event: Event) -> Result<Self, Self::Error> {
167+
let (content_type, application_properties) = from_event_attributes(event.attributes);
168+
let body = from_event_data(event.data)?;
171169

172170
Ok(Self {
173-
properties,
171+
content_type,
174172
application_properties,
175173
body,
176174
})
177175
}
178176
}
179177

178+
fn from_event_attributes(attributes: Attributes) -> (Option<Symbol>, ApplicationProperties) {
179+
let content_type = attributes.datacontenttype().map(Symbol::from);
180+
181+
let mut application_properties = ApplicationProperties::default();
182+
for (key, value) in attributes.iter() {
183+
if key == "datacontenttype" {
184+
continue;
185+
} else {
186+
let key = format!("cloudEvents:{}", key);
187+
application_properties.insert(key, SimpleValue::from(value));
188+
}
189+
}
190+
(content_type, application_properties)
191+
}
192+
193+
fn from_event_data(data: Option<Data>) -> Result<AmqpBody, Error> {
194+
let body = match data {
195+
Some(data) => match data {
196+
crate::Data::Binary(data) => Body::Data(AmqpData(Binary::from(data))),
197+
crate::Data::String(val) => Body::Data(AmqpData(Binary::from(val))),
198+
crate::Data::Json(val) => {
199+
let bytes = serde_json::to_vec(&val)?;
200+
Body::Data(AmqpData(Binary::from(bytes)))
201+
},
202+
},
203+
None => AmqpBody::Nothing,
204+
};
205+
Ok(body)
206+
}
207+
180208
// impl BinarySerializer<AmqpCloudEvent> for AmqpCloudEvent {
181209
// fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result<Self> {
182210
// let key = String::from("cloudEvents:specversion");

0 commit comments

Comments
 (0)