Skip to content

Commit 24c30b5

Browse files
committed
impl conversion btwn AmqpMessage & AmqpCloudEvent
Signed-off-by: minghuaw <[email protected]>
1 parent e206652 commit 24c30b5

File tree

3 files changed

+58
-0
lines changed

3 files changed

+58
-0
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ warp = ["warp-lib", "bytes", "http", "hyper"]
2525
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
2626
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
2727
nats = ["nats-lib"]
28+
fe2o3-amqp = ["fe2o3-amqp-lib"]
2829

2930
[dependencies]
3031
serde = { version = "^1.0", features = ["derive"] }
@@ -52,6 +53,7 @@ axum-lib = { version = "^0.5", optional = true , package="axum"}
5253
http-body = { version = "^0.4", optional = true }
5354
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
5455
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
56+
fe2o3-amqp-lib = { version = "0.4.0", optional = true, package = "fe2o3-amqp" }
5557

5658
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
5759
hostname = "^0.3"

src/binding/fe2o3_amqp/mod.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//! Implements AMQP 1.0 binding for CloudEvents
2+
3+
use std::convert::TryFrom;
4+
5+
use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Message, Body, Data as AmqpData, Properties};
6+
use fe2o3_amqp_lib::types::primitives::{Value, Binary};
7+
8+
use crate::Event;
9+
use crate::message::Error;
10+
11+
/// Type alias for an AMQP 1.0 message
12+
///
13+
/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of
14+
/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For
15+
/// convenience, this type alias chose `Value` as the value of the generic parameter
16+
pub type AmqpMessage = Message<Value>;
17+
18+
pub struct AmqpCloudEvent {
19+
properties: Properties,
20+
application_properties: ApplicationProperties,
21+
data: Binary,
22+
}
23+
24+
impl From<AmqpCloudEvent> for AmqpMessage {
25+
fn from(event: AmqpCloudEvent) -> Self {
26+
Message::builder()
27+
.properties(event.properties)
28+
.application_properties(event.application_properties)
29+
.data(event.data)
30+
.build()
31+
}
32+
}
33+
34+
impl TryFrom<AmqpMessage> for AmqpCloudEvent {
35+
type Error = Error;
36+
37+
fn try_from(value: AmqpMessage) -> Result<Self, Self::Error> {
38+
let data = match value.body {
39+
Body::Data(AmqpData(data)) => data,
40+
_ => return Err(Error::WrongEncoding { })
41+
};
42+
let properties = value.properties
43+
.ok_or(Error::WrongEncoding { })?;
44+
let application_properties = value.application_properties
45+
.ok_or(Error::WrongEncoding { })?;
46+
Ok(Self {
47+
properties,
48+
application_properties,
49+
data,
50+
})
51+
}
52+
}
53+
54+

src/binding/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ pub mod rdkafka;
2323
pub mod reqwest;
2424
#[cfg(feature = "warp")]
2525
pub mod warp;
26+
#[cfg(feature = "fe2o3-amqp")]
27+
pub mod fe2o3_amqp;
2628

2729
#[cfg(feature = "rdkafka")]
2830
pub(crate) mod kafka {

0 commit comments

Comments
 (0)