5
5
6
6
use cloudevents:: {
7
7
binding:: fe2o3_amqp:: EventMessage , message:: MessageDeserializer , Event , EventBuilder ,
8
- EventBuilderV10 ,
8
+ EventBuilderV10 , AttributesReader , event :: ExtensionValue ,
9
9
} ;
10
10
use fe2o3_amqp:: { types:: messaging:: Message , Connection , Receiver , Sender , Session } ;
11
11
use serde_json:: { json, from_slice, from_str} ;
12
12
13
13
type BoxError = Box < dyn std:: error:: Error > ;
14
14
type Result < T > = std:: result:: Result < T , BoxError > ;
15
15
16
- async fn send_event ( sender : & mut Sender , i : usize , value : serde_json:: Value ) -> Result < ( ) > {
16
+ const EXAMPLE_TYPE : & str = "example.test" ;
17
+ const EXAMPLE_SOURCE : & str = "localhost" ;
18
+ const EXTENSION_NAME : & str = "ext-name" ;
19
+ const EXTENSION_VALUE : & str = "AMQP" ;
20
+
21
+ async fn send_binary_event ( sender : & mut Sender , i : usize , value : serde_json:: Value ) -> Result < ( ) > {
22
+ let event = EventBuilderV10 :: new ( )
23
+ . id ( i. to_string ( ) )
24
+ . ty ( EXAMPLE_TYPE )
25
+ . source ( EXAMPLE_SOURCE )
26
+ . extension ( EXTENSION_NAME , EXTENSION_VALUE )
27
+ . data ( "application/json" , value)
28
+ . build ( ) ?;
29
+ let event_message = EventMessage :: from_binary_event ( event) ?;
30
+ let message = Message :: from ( event_message) ;
31
+ sender. send ( message) . await ?. accepted_or ( "not accepted" ) ?;
32
+ Ok ( ( ) )
33
+ }
34
+
35
+ async fn send_structured_event ( sender : & mut Sender , i : usize , value : serde_json:: Value ) -> Result < ( ) > {
17
36
let event = EventBuilderV10 :: new ( )
18
37
. id ( i. to_string ( ) )
19
38
. ty ( "example.test" )
20
39
. source ( "localhost" )
21
40
. extension ( "ext-name" , "AMQP" )
22
41
. data ( "application/json" , value)
23
42
. build ( ) ?;
24
- let event_message = EventMessage :: from_binary_event ( event) ?;
43
+ let event_message = EventMessage :: from_structured_event ( event) ?;
25
44
let message = Message :: from ( event_message) ;
26
45
sender. send ( message) . await ?. accepted_or ( "not accepted" ) ?;
27
46
Ok ( ( ) )
@@ -36,6 +55,15 @@ async fn recv_event(receiver: &mut Receiver) -> Result<Event> {
36
55
Ok ( event)
37
56
}
38
57
58
+ fn convert_data_into_json_value ( data : & cloudevents:: Data ) -> Result < serde_json:: Value > {
59
+ let value = match data {
60
+ cloudevents:: Data :: Binary ( bytes) => from_slice ( bytes) ?,
61
+ cloudevents:: Data :: String ( s) => from_str ( s) ?,
62
+ cloudevents:: Data :: Json ( value) => value. clone ( ) ,
63
+ } ;
64
+ Ok ( value)
65
+ }
66
+
39
67
#[ tokio:: main]
40
68
async fn main ( ) {
41
69
let mut connection =
@@ -49,15 +77,32 @@ async fn main() {
49
77
. unwrap ( ) ;
50
78
51
79
let expected = json ! ( { "hello" : "world" } ) ;
52
- send_event ( & mut sender, 1 , expected. clone ( ) ) . await . unwrap ( ) ;
80
+
81
+ // Binary content mode
82
+ send_binary_event ( & mut sender, 1 , expected. clone ( ) ) . await . unwrap ( ) ;
53
83
let event = recv_event ( & mut receiver) . await . unwrap ( ) ;
54
- let data: serde_json:: Value = match event. data ( ) . unwrap ( ) {
55
- cloudevents:: Data :: Binary ( bytes) => from_slice ( bytes) . unwrap ( ) ,
56
- cloudevents:: Data :: String ( s) => from_str ( s) . unwrap ( ) ,
57
- cloudevents:: Data :: Json ( value) => value. clone ( ) ,
58
- } ;
84
+ let value = convert_data_into_json_value ( event. data ( ) . unwrap ( ) ) . unwrap ( ) ;
85
+ assert_eq ! ( event. id( ) , "1" ) ;
86
+ assert_eq ! ( event. ty( ) , EXAMPLE_TYPE ) ;
87
+ assert_eq ! ( event. source( ) , EXAMPLE_SOURCE ) ;
88
+ match event. extension ( EXTENSION_NAME ) . unwrap ( ) {
89
+ ExtensionValue :: String ( value) => assert_eq ! ( value, EXTENSION_VALUE ) ,
90
+ _ => panic ! ( "Expect a String" ) ,
91
+ }
92
+ assert_eq ! ( value, expected) ;
59
93
60
- assert_eq ! ( data, expected) ;
94
+ // Structured content mode
95
+ send_structured_event ( & mut sender, 2 , expected. clone ( ) ) . await . unwrap ( ) ;
96
+ let event = recv_event ( & mut receiver) . await . unwrap ( ) ;
97
+ let value = convert_data_into_json_value ( event. data ( ) . unwrap ( ) ) . unwrap ( ) ;
98
+ assert_eq ! ( event. id( ) , "2" ) ;
99
+ assert_eq ! ( event. ty( ) , EXAMPLE_TYPE ) ;
100
+ assert_eq ! ( event. source( ) , EXAMPLE_SOURCE ) ;
101
+ match event. extension ( EXTENSION_NAME ) . unwrap ( ) {
102
+ ExtensionValue :: String ( value) => assert_eq ! ( value, EXTENSION_VALUE ) ,
103
+ _ => panic ! ( "Expect a String" ) ,
104
+ }
105
+ assert_eq ! ( value, expected) ;
61
106
62
107
sender. close ( ) . await . unwrap ( ) ;
63
108
receiver. close ( ) . await . unwrap ( ) ;
0 commit comments