Skip to content

Commit 34f9829

Browse files
committed
added amqp example
Signed-off-by: minghuaw <[email protected]>
1 parent adb721f commit 34f9829

File tree

3 files changed

+69
-1
lines changed

3 files changed

+69
-1
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "fe2o3-amqp-example"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
cloudevents-sdk = { path = "../..", features = ["fe2o3-amqp"] }
10+
fe2o3-amqp = "0.4.0"
11+
tokio = { version = "1", features = ["macros", "net", "rt", "rt-multi-thread"] }
12+
serde_json = "1"
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//! AMQP 1.0 binding example
2+
//!
3+
//! You need a running AMQP 1.0 broker to try out this example.
4+
//! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis
5+
6+
use cloudevents::{binding::fe2o3_amqp::EventMessage, message::BinaryDeserializer, Event, EventBuilderV10, EventBuilder};
7+
use fe2o3_amqp::{Connection, Sender, Receiver, types::messaging::Message, Session};
8+
use serde_json::json;
9+
10+
type BoxError = Box<dyn std::error::Error>;
11+
type Result<T> = std::result::Result<T, BoxError>;
12+
13+
async fn send_event(sender: &mut Sender, i: usize) -> Result<()> {
14+
let event = EventBuilderV10::new()
15+
.id(i.to_string())
16+
.ty("example.test")
17+
.source("localhost")
18+
.data("application/json", json!({"hello": "world"}))
19+
.build()?;
20+
let event_message = EventMessage::from_binary_event(event)?;
21+
let message = Message::from(event_message);
22+
sender.send(message).await?
23+
.accepted_or("not accepted")?;
24+
Ok(())
25+
}
26+
27+
async fn recv_event(receiver: &mut Receiver) -> Result<Event> {
28+
use fe2o3_amqp::types::primitives::Value;
29+
30+
let delivery = receiver.recv::<Value>().await?;
31+
receiver.accept(&delivery).await?;
32+
33+
let event_message = EventMessage::from(delivery.into_message());
34+
let event = event_message.into_event()?;
35+
Ok(event)
36+
}
37+
38+
#[tokio::main]
39+
async fn main() {
40+
let mut connection =
41+
Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672")
42+
.await
43+
.unwrap();
44+
let mut session = Session::begin(&mut connection).await.unwrap();
45+
let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap();
46+
let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap();
47+
48+
send_event(&mut sender, 1).await.unwrap();
49+
let event = recv_event(&mut receiver).await.unwrap();
50+
println!("{:?}", event);
51+
52+
sender.close().await.unwrap();
53+
receiver.close().await.unwrap();
54+
session.end().await.unwrap();
55+
connection.close().await.unwrap();
56+
}

src/binding/fe2o3_amqp/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,4 +222,4 @@ impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue {
222222
}
223223
}
224224
}
225-
}
225+
}

0 commit comments

Comments
 (0)