@@ -19,7 +19,7 @@ pub use read_event_type::ReadEventTypeRequest;
1919pub use read_events:: ReadEventsRequest ;
2020pub use register_event_schema:: RegisterEventSchemaRequest ;
2121pub use run_eventql_query:: RunEventqlQueryRequest ;
22- use serde_json:: Value ;
22+ use serde_json:: value :: RawValue ;
2323pub use verify_api_token:: VerifyApiTokenRequest ;
2424pub use write_events:: WriteEventsRequest ;
2525
@@ -67,25 +67,13 @@ pub trait OneShotRequest: ClientRequest {
6767 }
6868}
6969
70- /// A line in any json-nd stream coming from the database
70+ /// A line in a json-nd stream coming from the database
71+ /// The body is parsed as a [`RawValue`], because some of the types need the raw string for internal usage.
7172#[ derive( Deserialize , Debug ) ]
72- #[ serde( tag = "type" , content = "payload" , rename_all = "camelCase" ) ]
73- enum StreamLineItem < T > {
74- /// An error occured during the request
75- Error { error : String } ,
76- /// A heardbeat message was sent to keep the connection alive.
77- /// This is only used when observing events, but it does not hurt to have it everywhere.
78- Heartbeat ( Value ) ,
79- /// A successful response from the database
80- /// Since the exact type of the payload is not known at this point, we use this as a fallback case.
81- /// Every request item gets put in here and the type can be checked later on.
82- /// The type name checking is only for semantic reasons, as the payload is already parsed as the correct type at this point.
83- #[ serde( untagged) ]
84- Ok {
85- #[ serde( rename = "type" ) ]
86- ty : String ,
87- payload : T ,
88- } ,
73+ struct StreamLineItem {
74+ #[ serde( rename = "type" ) ]
75+ ty : String ,
76+ payload : Box < RawValue > ,
8977}
9078
9179/// Represents a request to the database that expects a stream of responses
@@ -98,34 +86,25 @@ pub trait StreamingRequest: ClientRequest {
9886 ) -> impl Stream < Item = Result < Self :: ItemType , ClientError > > {
9987 Box :: pin (
10088 Self :: lines_stream ( response)
101- . map ( |maybe_line| {
102- let line = maybe_line?;
103- // This line does the heavy lifting of parsing the json-nd line into the correct type.
104- // There's some Rust typesystem glory involved here, so let's break it down:
105- // First of all `serde_json::from_str` is used to parse any json `&str` into the type we want to have (in this case a `StreamLineItem`).
106- // `StreamLineItem` in turn is generic over `Self::ItemType`, which is the type that is expected by the exact response implementation and can change.
107- // This means, that this will throw an error if the line is invalid json or the string cannot be parsed into an error, heartbeat or the expected type.
108- // Because of this, we can guarantee after this line, that the payload of the `StreamLineItem` is of the correct type and no further checks are needed.
109- Ok ( serde_json:: from_str :: < StreamLineItem < Self :: ItemType > > (
110- line. as_str ( ) ,
111- ) ?)
112- } )
89+ . map ( |line| Ok ( serde_json:: from_str :: < StreamLineItem > ( line?. as_str ( ) ) ?) )
11390 . filter_map ( |o| async {
11491 match o {
115- // An error was passed by the database, so we forward it as an error.
116- Ok ( StreamLineItem :: Error { error } ) => {
117- Some ( Err ( ClientError :: DBError ( error) ) )
118- }
119- // A heartbeat message was sent, which we ignore.
120- Ok ( StreamLineItem :: Heartbeat ( _value) ) => None ,
121- // A successful response was sent with the correct type.
122- Ok ( StreamLineItem :: Ok { payload, ty } ) if ty == Self :: ITEM_TYPE_NAME => {
123- Some ( Ok ( payload) )
124- }
125- // A successful response was sent, but the type does not match the expected type.
126- Ok ( StreamLineItem :: Ok { ty, .. } ) => {
127- Some ( Err ( ClientError :: InvalidResponseType ( ty) ) )
128- }
92+ // A line was successfully parsed.
93+ Ok ( StreamLineItem { payload, ty } ) => match ty. as_str ( ) {
94+ // This is the expected type, so we try to parse it.
95+ ty if ty == Self :: ITEM_TYPE_NAME => {
96+ Some ( serde_json:: from_str ( payload. get ( ) ) . map_err ( ClientError :: from) )
97+ }
98+ // Forward Errors from the DB as DBErrors.
99+ "error" => Some ( Err ( ClientError :: DBError ( payload. get ( ) . to_string ( ) ) ) ) ,
100+ // Ignore heartbeat messages.
101+ "heartbeat" => None ,
102+ other => Some ( Err ( ClientError :: InvalidResponseType ( format ! (
103+ "Expected type {}, but got {}" ,
104+ Self :: ITEM_TYPE_NAME ,
105+ other
106+ ) ) ) ) ,
107+ } ,
129108 // An error occured while parsing the line, which we forward as an error.
130109 Err ( e) => Some ( Err ( e) ) ,
131110 }
0 commit comments