@@ -8,18 +8,18 @@ use cloudevents::{
8
8
EventBuilderV10 ,
9
9
} ;
10
10
use fe2o3_amqp:: { types:: messaging:: Message , Connection , Receiver , Sender , Session } ;
11
- use serde_json:: json;
11
+ use serde_json:: { json, from_slice , from_str } ;
12
12
13
13
type BoxError = Box < dyn std:: error:: Error > ;
14
14
type Result < T > = std:: result:: Result < T , BoxError > ;
15
15
16
- async fn send_event ( sender : & mut Sender , i : usize ) -> Result < ( ) > {
16
+ async fn send_event ( sender : & mut Sender , i : usize , value : serde_json :: Value ) -> Result < ( ) > {
17
17
let event = EventBuilderV10 :: new ( )
18
18
. id ( i. to_string ( ) )
19
19
. ty ( "example.test" )
20
20
. source ( "localhost" )
21
21
. extension ( "ext-name" , "AMQP" )
22
- . data ( "application/json" , json ! ( { "hello" : "world" } ) )
22
+ . data ( "application/json" , value )
23
23
. build ( ) ?;
24
24
let event_message = EventMessage :: from_binary_event ( event) ?;
25
25
let message = Message :: from ( event_message) ;
@@ -28,9 +28,7 @@ async fn send_event(sender: &mut Sender, i: usize) -> Result<()> {
28
28
}
29
29
30
30
async fn recv_event ( receiver : & mut Receiver ) -> Result < Event > {
31
- use fe2o3_amqp:: types:: primitives:: Value ;
32
-
33
- let delivery = receiver. recv :: < Value > ( ) . await ?;
31
+ let delivery = receiver. recv ( ) . await ?;
34
32
receiver. accept ( & delivery) . await ?;
35
33
36
34
let event_message = EventMessage :: from ( delivery. into_message ( ) ) ;
@@ -50,9 +48,16 @@ async fn main() {
50
48
. await
51
49
. unwrap ( ) ;
52
50
53
- send_event ( & mut sender, 1 ) . await . unwrap ( ) ;
51
+ let expected = json ! ( { "hello" : "world" } ) ;
52
+ send_event ( & mut sender, 1 , expected. clone ( ) ) . await . unwrap ( ) ;
54
53
let event = recv_event ( & mut receiver) . await . unwrap ( ) ;
55
- println ! ( "{:?}" , event) ;
54
+ let data: serde_json:: Value = match event. data ( ) . unwrap ( ) {
55
+ cloudevents:: Data :: Binary ( bytes) => from_slice ( bytes) . unwrap ( ) ,
56
+ cloudevents:: Data :: String ( s) => from_str ( s) . unwrap ( ) ,
57
+ cloudevents:: Data :: Json ( value) => value. clone ( ) ,
58
+ } ;
59
+
60
+ assert_eq ! ( data, expected) ;
56
61
57
62
sender. close ( ) . await . unwrap ( ) ;
58
63
receiver. close ( ) . await . unwrap ( ) ;
0 commit comments