11use crate :: flow:: DateTime ;
22use crate :: flow:: FlowError ;
33use crate :: flow:: Message ;
4- use anyhow:: Context ;
54use rquickjs:: Ctx ;
65use rquickjs:: FromJs ;
76use rquickjs:: IntoJs ;
87use rquickjs:: Value ;
8+ use std:: collections:: BTreeMap ;
99
10+ /// Akin to serde_json::Value with an extra case for binary data
1011#[ derive( Clone , Debug ) ]
11- pub struct JsonValue ( pub serde_json:: Value ) ;
12+ pub enum JsonValue {
13+ Null ,
14+ Bool ( bool ) ,
15+ Number ( serde_json:: Number ) ,
16+ String ( String ) ,
17+ Bytes ( Vec < u8 > ) , // <= This case motivates the use of JsonValue vs serde_json::Value
18+ Array ( Vec < JsonValue > ) ,
19+ Object ( BTreeMap < String , JsonValue > ) ,
20+ }
1221
1322impl Default for JsonValue {
1423 fn default ( ) -> Self {
15- JsonValue ( serde_json:: Value :: Object ( Default :: default ( ) ) )
24+ JsonValue :: Object ( Default :: default ( ) )
25+ }
26+ }
27+
28+ impl JsonValue {
29+ fn string ( value : impl ToString ) -> Self {
30+ JsonValue :: String ( value. to_string ( ) )
31+ }
32+
33+ fn number ( value : impl Into < serde_json:: Number > ) -> Self {
34+ JsonValue :: Number ( value. into ( ) )
35+ }
36+
37+ fn option ( value : Option < impl Into < JsonValue > > ) -> Self {
38+ value. map ( |v| v. into ( ) ) . unwrap_or ( JsonValue :: Null )
39+ }
40+
41+ fn object < T , K > ( values : T ) -> Self
42+ where
43+ T : IntoIterator < Item = ( K , JsonValue ) > ,
44+ K : ToString ,
45+ {
46+ let object = values
47+ . into_iter ( )
48+ . map ( |( k, v) | ( k. to_string ( ) , v) )
49+ . collect ( ) ;
50+ JsonValue :: Object ( object)
1651 }
1752}
1853
1954impl From < Message > for JsonValue {
2055 fn from ( value : Message ) -> Self {
21- JsonValue ( value. json ( ) )
56+ JsonValue :: object ( [
57+ ( "topic" , JsonValue :: string ( value. topic ) ) ,
58+ ( "payload" , JsonValue :: string ( value. payload ) ) ,
59+ ( "timestamp" , JsonValue :: option ( value. timestamp ) ) ,
60+ ] )
2261 }
2362}
2463
2564impl From < DateTime > for JsonValue {
2665 fn from ( value : DateTime ) -> Self {
27- JsonValue ( value. json ( ) )
66+ JsonValue :: object ( [
67+ ( "seconds" , JsonValue :: number ( value. seconds ) ) ,
68+ ( "nanoseconds" , JsonValue :: number ( value. nanoseconds ) ) ,
69+ ] )
70+ }
71+ }
72+
73+ impl From < serde_json:: Value > for JsonValue {
74+ fn from ( value : serde_json:: Value ) -> Self {
75+ match value {
76+ serde_json:: Value :: Null => JsonValue :: Null ,
77+ serde_json:: Value :: Bool ( b) => JsonValue :: Bool ( b) ,
78+ serde_json:: Value :: Number ( n) => JsonValue :: Number ( n) ,
79+ serde_json:: Value :: String ( s) => JsonValue :: String ( s) ,
80+ serde_json:: Value :: Array ( a) => {
81+ JsonValue :: Array ( a. into_iter ( ) . map ( JsonValue :: from) . collect ( ) )
82+ }
83+ serde_json:: Value :: Object ( o) => {
84+ JsonValue :: object ( o. into_iter ( ) . map ( |( k, v) | ( k, JsonValue :: from ( v) ) ) )
85+ }
86+ }
87+ }
88+ }
89+
90+ impl From < JsonValue > for serde_json:: Value {
91+ fn from ( value : JsonValue ) -> Self {
92+ match value {
93+ JsonValue :: Null => serde_json:: Value :: Null ,
94+ JsonValue :: Bool ( b) => serde_json:: Value :: Bool ( b) ,
95+ JsonValue :: Number ( n) => serde_json:: Value :: Number ( n) ,
96+ JsonValue :: String ( s) => serde_json:: Value :: String ( s) ,
97+ JsonValue :: Bytes ( b) => serde_json:: Value :: String ( format ! ( "0x {b:?}" ) ) ,
98+ JsonValue :: Array ( a) => {
99+ serde_json:: Value :: Array ( a. into_iter ( ) . map ( serde_json:: Value :: from) . collect ( ) )
100+ }
101+ JsonValue :: Object ( o) => serde_json:: Value :: Object (
102+ o. into_iter ( )
103+ . map ( |( k, v) | ( k, serde_json:: Value :: from ( v) ) )
104+ . collect ( ) ,
105+ ) ,
106+ }
28107 }
29108}
30109
31- impl TryFrom < serde_json :: Value > for Message {
110+ impl TryFrom < BTreeMap < String , JsonValue > > for Message {
32111 type Error = FlowError ;
33112
34- fn try_from ( value : serde_json:: Value ) -> Result < Self , Self :: Error > {
35- let message = serde_json:: from_value ( value)
36- . with_context ( || "Couldn't extract message payload and topic" ) ?;
37- Ok ( message)
113+ fn try_from ( value : BTreeMap < String , JsonValue > ) -> Result < Self , Self :: Error > {
114+ let Some ( JsonValue :: String ( topic) ) = value. get ( "topic" ) else {
115+ return Err ( anyhow:: anyhow!( "Missing message topic" ) . into ( ) ) ;
116+ } ;
117+ let Some ( JsonValue :: String ( payload) ) = value. get ( "payload" ) else {
118+ return Err ( anyhow:: anyhow!( "Missing message payload" ) . into ( ) ) ;
119+ } ;
120+ let timestamp = value
121+ . get ( "timestamp" )
122+ . map ( |t| DateTime :: try_from ( t. clone ( ) ) )
123+ . transpose ( ) ?;
124+
125+ Ok ( Message {
126+ topic : topic. to_owned ( ) ,
127+ payload : payload. to_owned ( ) ,
128+ timestamp,
129+ } )
38130 }
39131}
40132
41133impl TryFrom < JsonValue > for Message {
42134 type Error = FlowError ;
43135
44136 fn try_from ( value : JsonValue ) -> Result < Self , Self :: Error > {
45- Message :: try_from ( value. 0 )
137+ let JsonValue :: Object ( object) = value else {
138+ return Err (
139+ anyhow:: anyhow!( "Expect a message object with a topic and a payload" ) . into ( ) ,
140+ ) ;
141+ } ;
142+ Message :: try_from ( object)
143+ }
144+ }
145+
146+ impl TryFrom < JsonValue > for DateTime {
147+ type Error = FlowError ;
148+
149+ fn try_from ( value : JsonValue ) -> Result < Self , Self :: Error > {
150+ let JsonValue :: Object ( object) = value else {
151+ return Err (
152+ anyhow:: anyhow!( "Expect a timestamp object with seconds and nanoseconds" ) . into ( ) ,
153+ ) ;
154+ } ;
155+ DateTime :: try_from ( object)
156+ }
157+ }
158+
159+ impl TryFrom < BTreeMap < String , JsonValue > > for DateTime {
160+ type Error = FlowError ;
161+
162+ fn try_from ( value : BTreeMap < String , JsonValue > ) -> Result < Self , Self :: Error > {
163+ let Some ( JsonValue :: Number ( seconds) ) = value. get ( "seconds" ) else {
164+ return Err ( anyhow:: anyhow!( "Missing timestamp seconds" ) . into ( ) ) ;
165+ } ;
166+ let Some ( JsonValue :: Number ( nanoseconds) ) = value. get ( "nanoseconds" ) else {
167+ return Err ( anyhow:: anyhow!( "Missing timestamp nanoseconds" ) . into ( ) ) ;
168+ } ;
169+
170+ Ok ( DateTime {
171+ seconds : seconds. as_u64 ( ) . unwrap_or_default ( ) ,
172+ nanoseconds : nanoseconds. as_u64 ( ) . unwrap_or_default ( ) as u32 ,
173+ } )
46174 }
47175}
48176
49177impl TryFrom < JsonValue > for Vec < Message > {
50178 type Error = FlowError ;
51179
52180 fn try_from ( value : JsonValue ) -> Result < Self , Self :: Error > {
53- match value. 0 {
54- serde_json:: Value :: Array ( array) => array. into_iter ( ) . map ( Message :: try_from) . collect ( ) ,
55- serde_json:: Value :: Object ( map) => {
56- Message :: try_from ( serde_json:: Value :: Object ( map) ) . map ( |message| vec ! [ message] )
57- }
58- serde_json:: Value :: Null => Ok ( vec ! [ ] ) ,
181+ match value {
182+ JsonValue :: Array ( array) => array. into_iter ( ) . map ( Message :: try_from) . collect ( ) ,
183+ JsonValue :: Object ( object) => Message :: try_from ( object) . map ( |message| vec ! [ message] ) ,
184+ JsonValue :: Null => Ok ( vec ! [ ] ) ,
59185 _ => Err (
60186 anyhow:: anyhow!( "Flow scripts are expected to return an array of messages" ) . into ( ) ,
61187 ) ,
62188 }
63189 }
64190}
65191
66- struct JsonValueRef < ' a > ( & ' a serde_json :: Value ) ;
192+ struct JsonValueRef < ' a > ( & ' a JsonValue ) ;
67193
68194impl < ' js > IntoJs < ' js > for JsonValue {
69195 fn into_js ( self , ctx : & Ctx < ' js > ) -> rquickjs:: Result < Value < ' js > > {
70- JsonValueRef ( & self . 0 ) . into_js ( ctx)
196+ JsonValueRef ( & self ) . into_js ( ctx)
71197 }
72198}
73199
74200impl < ' js > IntoJs < ' js > for & JsonValue {
75201 fn into_js ( self , ctx : & Ctx < ' js > ) -> rquickjs:: Result < Value < ' js > > {
76- JsonValueRef ( & self . 0 ) . into_js ( ctx)
202+ JsonValueRef ( self ) . into_js ( ctx)
77203 }
78204}
79205
80206impl < ' js > IntoJs < ' js > for JsonValueRef < ' _ > {
81207 fn into_js ( self , ctx : & Ctx < ' js > ) -> rquickjs:: Result < Value < ' js > > {
82208 match self . 0 {
83- serde_json :: Value :: Null => Ok ( Value :: new_null ( ctx. clone ( ) ) ) ,
84- serde_json :: Value :: Bool ( value) => Ok ( Value :: new_bool ( ctx. clone ( ) , * value) ) ,
85- serde_json :: Value :: Number ( value) => {
209+ JsonValue :: Null => Ok ( Value :: new_null ( ctx. clone ( ) ) ) ,
210+ JsonValue :: Bool ( value) => Ok ( Value :: new_bool ( ctx. clone ( ) , * value) ) ,
211+ JsonValue :: Number ( value) => {
86212 if let Some ( n) = value. as_i64 ( ) {
87213 if let Ok ( n) = i32:: try_from ( n) {
88214 return Ok ( Value :: new_int ( ctx. clone ( ) , n) ) ;
@@ -94,20 +220,24 @@ impl<'js> IntoJs<'js> for JsonValueRef<'_> {
94220 let nan = rquickjs:: String :: from_str ( ctx. clone ( ) , "NaN" ) ?;
95221 Ok ( nan. into_value ( ) )
96222 }
97- serde_json :: Value :: String ( value) => {
223+ JsonValue :: String ( value) => {
98224 let string = rquickjs:: String :: from_str ( ctx. clone ( ) , value) ?;
99225 Ok ( string. into_value ( ) )
100226 }
101- serde_json:: Value :: Array ( values) => {
227+ JsonValue :: Bytes ( value) => {
228+ let string = rquickjs:: TypedArray :: new ( ctx. clone ( ) , value. clone ( ) ) ?;
229+ Ok ( string. into_value ( ) )
230+ }
231+ JsonValue :: Array ( values) => {
102232 let array = rquickjs:: Array :: new ( ctx. clone ( ) ) ?;
103233 for ( i, value) in values. iter ( ) . enumerate ( ) {
104234 array. set ( i, JsonValueRef ( value) ) ?;
105235 }
106236 Ok ( array. into_value ( ) )
107237 }
108- serde_json :: Value :: Object ( values) => {
238+ JsonValue :: Object ( values) => {
109239 let object = rquickjs:: Object :: new ( ctx. clone ( ) ) ?;
110- for ( key, value) in values. into_iter ( ) {
240+ for ( key, value) in values. iter ( ) {
111241 object. set ( key, JsonValueRef ( value) ) ?;
112242 }
113243 Ok ( object. into_value ( ) )
@@ -130,40 +260,42 @@ impl JsonValue {
130260 return promise. finish ( ) ;
131261 }
132262 if let Some ( b) = value. as_bool ( ) {
133- return Ok ( JsonValue ( serde_json :: Value :: Bool ( b) ) ) ;
263+ return Ok ( JsonValue :: Bool ( b) ) ;
134264 }
135265 if let Some ( n) = value. as_int ( ) {
136- return Ok ( JsonValue ( serde_json :: Value :: Number ( n. into ( ) ) ) ) ;
266+ return Ok ( JsonValue :: Number ( n. into ( ) ) ) ;
137267 }
138268 if let Some ( n) = value. as_float ( ) {
139269 let js_n = serde_json:: Number :: from_f64 ( n)
140- . map ( serde_json :: Value :: Number )
141- . unwrap_or ( serde_json :: Value :: Null ) ;
142- return Ok ( JsonValue ( js_n) ) ;
270+ . map ( JsonValue :: Number )
271+ . unwrap_or ( JsonValue :: Null ) ;
272+ return Ok ( js_n) ;
143273 }
144274 if let Some ( string) = value. as_string ( ) {
145- return Ok ( JsonValue ( serde_json :: Value :: String ( string. to_string ( ) ?) ) ) ;
275+ return Ok ( JsonValue :: String ( string. to_string ( ) ?) ) ;
146276 }
147277 if let Some ( array) = value. as_array ( ) {
148- let array: rquickjs:: Result < Vec < JsonValue > > = array. iter ( ) . collect ( ) ;
149- let array = array?. into_iter ( ) . map ( |v| v. 0 ) . collect ( ) ;
150- return Ok ( JsonValue ( serde_json:: Value :: Array ( array) ) ) ;
278+ let mut js_array = Vec :: new ( ) ;
279+ for v in array. iter ( ) {
280+ js_array. push ( JsonValue :: from_js_value ( v?) ?)
281+ }
282+ return Ok ( JsonValue :: Array ( js_array) ) ;
151283 }
152284 if let Some ( object) = value. as_object ( ) {
153- let mut js_object = serde_json :: Map :: new ( ) ;
285+ let mut js_object = BTreeMap :: new ( ) ;
154286 for key in object. keys :: < String > ( ) . flatten ( ) {
155- if let Ok ( JsonValue ( v ) ) = object. get ( & key) {
156- js_object. insert ( key, v . clone ( ) ) ;
287+ if let Ok ( v ) = object. get ( & key) {
288+ js_object. insert ( key, JsonValue :: from_js_value ( v ) ? ) ;
157289 }
158290 }
159- return Ok ( JsonValue ( serde_json :: Value :: Object ( js_object) ) ) ;
291+ return Ok ( JsonValue :: Object ( js_object) ) ;
160292 }
161293
162- Ok ( JsonValue ( serde_json :: Value :: Null ) )
294+ Ok ( JsonValue :: Null )
163295 }
164296
165297 pub ( crate ) fn display ( value : Value < ' _ > ) -> String {
166- let json = JsonValue :: from_js_value ( value) . unwrap_or_default ( ) ;
167- serde_json:: to_string_pretty ( & json. 0 ) . unwrap ( )
298+ let json = serde_json :: Value :: from ( JsonValue :: from_js_value ( value) . unwrap_or_default ( ) ) ;
299+ serde_json:: to_string_pretty ( & json) . unwrap ( )
168300 }
169301}
0 commit comments