Skip to content

Commit 01f491b

Browse files
committed
treat missing cloutEvents prefix as extension
Signed-off-by: minghuaw <[email protected]>
1 parent 5bfb7fe commit 01f491b

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

example-projects/fe2o3-amqp-example/src/main.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
//! You need a running AMQP 1.0 broker to try out this example.
44
//! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis
55
6-
use cloudevents::{binding::fe2o3_amqp::EventMessage, Event, EventBuilderV10, EventBuilder, message::MessageDeserializer};
7-
use fe2o3_amqp::{Connection, Sender, Receiver, types::messaging::Message, Session};
6+
use cloudevents::{
7+
binding::fe2o3_amqp::EventMessage, message::MessageDeserializer, Event, EventBuilder,
8+
EventBuilderV10,
9+
};
10+
use fe2o3_amqp::{types::messaging::Message, Connection, Receiver, Sender, Session};
811
use serde_json::json;
912

1013
type BoxError = Box<dyn std::error::Error>;
@@ -15,12 +18,12 @@ async fn send_event(sender: &mut Sender, i: usize) -> Result<()> {
1518
.id(i.to_string())
1619
.ty("example.test")
1720
.source("localhost")
21+
.extension("ext-name", "AMQP")
1822
.data("application/json", json!({"hello": "world"}))
1923
.build()?;
20-
let event_message = EventMessage::from_structured_event(event)?;
24+
let event_message = EventMessage::from_binary_event(event)?;
2125
let message = Message::from(event_message);
22-
sender.send(message).await?
23-
.accepted_or("not accepted")?;
26+
sender.send(message).await?.accepted_or("not accepted")?;
2427
Ok(())
2528
}
2629

@@ -43,7 +46,9 @@ async fn main() {
4346
.unwrap();
4447
let mut session = Session::begin(&mut connection).await.unwrap();
4548
let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap();
46-
let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap();
49+
let mut receiver = Receiver::attach(&mut session, "receiver", "q1")
50+
.await
51+
.unwrap();
4752

4853
send_event(&mut sender, 1).await.unwrap();
4954
let event = recv_event(&mut receiver).await.unwrap();

src/binding/fe2o3_amqp/deserializer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ impl BinaryDeserializer for EventMessage {
5959
let value = MessageAttributeValue::try_from(value)?;
6060
serializer = serializer.set_extension(key, value)?;
6161
}
62+
} else {
63+
let value = MessageAttributeValue::try_from(value)?;
64+
serializer = serializer.set_extension(&key, value)?;
6265
}
6366
}
6467
}

0 commit comments

Comments
 (0)