From da5cec549b53f6bd42fd78a6f812c7637d8fe2ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Tue, 27 May 2025 13:47:42 +0200 Subject: [PATCH 1/4] fix(client): adapt internal client_request API to be easier to implement correctly This also fixes an issue with handling DB errors when reading events --- src/client/client_request.rs | 50 ++++++++++++++++--- src/client/client_request/list_event_types.rs | 28 +---------- src/client/client_request/list_subjects.rs | 26 +--------- src/client/client_request/observe_events.rs | 1 + src/client/client_request/read_events.rs | 30 +---------- src/error.rs | 3 ++ 6 files changed, 52 insertions(+), 86 deletions(-) diff --git a/src/client/client_request.rs b/src/client/client_request.rs index 7dba5ec..ff3318b 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -25,8 +25,9 @@ use futures::{ }; use futures_util::io; use reqwest::Method; -use serde::Serialize; use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::pin::Pin; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_stream::wrappers::LinesStream; use tokio_util::io::StreamReader; @@ -61,19 +62,54 @@ pub trait OneShotRequest: ClientRequest { Ok(()) } } +#[derive(Deserialize, Debug)] +#[serde(tag = "type", content = "payload", rename_all = "camelCase")] +enum StreamLineItem { + Error { + error: String, + }, + Heartbeat, + #[serde(untagged)] + Ok { + #[serde(rename = "type")] + ty: String, + payload: T, + }, +} + +impl StreamLineItem { + pub fn into_result_option(self, expected_type: &str) -> Result, ClientError> { + match self { + StreamLineItem::Error { error } => Err(ClientError::DBError(error)), + StreamLineItem::Heartbeat => Ok(None), + StreamLineItem::Ok { ty, payload } => { + if ty == expected_type { + Ok(Some(payload)) + } else { + Err(ClientError::InvalidResponseType(ty)) + } + } + } + } +} /// Represents a request to the database that expects a stream of responses pub trait StreamingRequest: ClientRequest { type ItemType: DeserializeOwned; + const ITEM_TYPE_NAME: &'static str; fn build_stream( response: reqwest::Response, - ) -> impl Stream> { - Self::lines_stream(response).map(|line| { - let line = line?; - let item = serde_json::from_str(line.as_str())?; - Ok(item) - }) + ) -> Pin>>> { + Box::pin( + Self::lines_stream(response) + .map(|line| { + let line = line?; + let item: StreamLineItem = serde_json::from_str(line.as_str())?; + item.into_result_option(Self::ITEM_TYPE_NAME) + }) + .filter_map(|o| async { o.transpose() }), + ) } fn lines_stream( diff --git a/src/client/client_request/list_event_types.rs b/src/client/client_request/list_event_types.rs index 03d9c7c..aeed53b 100644 --- a/src/client/client_request/list_event_types.rs +++ b/src/client/client_request/list_event_types.rs @@ -1,4 +1,3 @@ -use futures::{Stream, stream::StreamExt}; use reqwest::Method; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -29,30 +28,5 @@ impl ClientRequest for ListEventTypesRequest { } impl StreamingRequest for ListEventTypesRequest { type ItemType = EventType; - - fn build_stream( - response: reqwest::Response, - ) -> impl Stream> { - #[derive(Deserialize, Debug)] - #[serde(tag = "type", content = "payload", rename_all = "camelCase")] - enum LineItem { - Error { error: String }, - EventType(EventType), - } - - impl From for Result { - fn from(item: LineItem) -> Self { - match item { - LineItem::Error { error } => Err(ClientError::DBError(error)), - LineItem::EventType(event_type) => Ok(event_type), - } - } - } - - Self::lines_stream(response).map(|line| { - let line = line?; - let item: LineItem = serde_json::from_str(line.as_str())?; - item.into() - }) - } + const ITEM_TYPE_NAME: &'static str = "eventType"; } diff --git a/src/client/client_request/list_subjects.rs b/src/client/client_request/list_subjects.rs index 099ade9..a130593 100644 --- a/src/client/client_request/list_subjects.rs +++ b/src/client/client_request/list_subjects.rs @@ -1,6 +1,5 @@ -use futures::{Stream, stream::StreamExt}; use reqwest::Method; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use crate::error::ClientError; @@ -22,26 +21,5 @@ impl ClientRequest for ListSubjectsRequest<'_> { } impl StreamingRequest for ListSubjectsRequest<'_> { type ItemType = String; - - fn build_stream( - response: reqwest::Response, - ) -> impl Stream> { - #[derive(Deserialize, Debug)] - struct LineItem { - payload: LineItemPayload, - r#type: String, - } - #[derive(Deserialize, Debug)] - struct LineItemPayload { - subject: String, - } - Self::lines_stream(response).map(|line| { - let line = line?; - let item: LineItem = serde_json::from_str(line.as_str())?; - if item.r#type != "subject" { - return Err(ClientError::InvalidEventType); - } - Ok(item.payload.subject) - }) - } + const ITEM_TYPE_NAME: &'static str = "subject"; } diff --git a/src/client/client_request/observe_events.rs b/src/client/client_request/observe_events.rs index f57f463..706a444 100644 --- a/src/client/client_request/observe_events.rs +++ b/src/client/client_request/observe_events.rs @@ -25,4 +25,5 @@ impl ClientRequest for ObserveEventsRequest<'_> { impl StreamingRequest for ObserveEventsRequest<'_> { type ItemType = Event; + const ITEM_TYPE_NAME: &'static str = "event"; } diff --git a/src/client/client_request/read_events.rs b/src/client/client_request/read_events.rs index 801eaea..c0cba02 100644 --- a/src/client/client_request/read_events.rs +++ b/src/client/client_request/read_events.rs @@ -1,6 +1,5 @@ -use futures::{Stream, stream::StreamExt}; use reqwest::Method; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use crate::{client::request_options::ReadEventsRequestOptions, error::ClientError, event::Event}; @@ -24,30 +23,5 @@ impl ClientRequest for ReadEventsRequest<'_> { impl StreamingRequest for ReadEventsRequest<'_> { type ItemType = Event; - - fn build_stream( - response: reqwest::Response, - ) -> impl Stream> { - #[derive(Deserialize, Debug)] - #[serde(tag = "type", content = "payload", rename_all = "camelCase")] - enum LineItem { - Error { error: String }, - Event(Box), - } - - impl From for Result { - fn from(item: LineItem) -> Self { - match item { - LineItem::Error { error } => Err(ClientError::DBError(error)), - LineItem::Event(event_type) => Ok(*event_type), - } - } - } - - Self::lines_stream(response).map(|line| { - let line = line?; - let item: LineItem = serde_json::from_str(line.as_str())?; - item.into() - }) - } + const ITEM_TYPE_NAME: &'static str = "event"; } diff --git a/src/error.rs b/src/error.rs index 916b23b..85b3b9d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -44,6 +44,9 @@ pub enum ClientError { #[cfg(feature = "cloudevents")] #[error("The CloudEvents message is invalid: {0}")] CloudeventsMessageError(#[from] cloudevents::message::Error), + /// The database returned an invalid response type + #[error("The DB returned an invalid response type: {0}")] + InvalidResponseType(String), } /// Error type for the [`crate::container`] feature. From 0b06d1cb389cc0ffde83cb6ee447ff1e9032d6fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Tue, 27 May 2025 20:57:17 +0200 Subject: [PATCH 2/4] fix(client): remove unnecessarily restricting change to build_stream API of ClientRequest --- src/client/client_request.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/client/client_request.rs b/src/client/client_request.rs index ff3318b..59fb968 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -27,7 +27,6 @@ use futures_util::io; use reqwest::Method; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use std::pin::Pin; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_stream::wrappers::LinesStream; use tokio_util::io::StreamReader; @@ -100,7 +99,7 @@ pub trait StreamingRequest: ClientRequest { fn build_stream( response: reqwest::Response, - ) -> Pin>>> { + ) -> impl Stream> { Box::pin( Self::lines_stream(response) .map(|line| { From 80c774cec462036409ba2f408bea0b347394f627 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Wed, 28 May 2025 23:40:25 +0200 Subject: [PATCH 3/4] chore(client): add comments to stream parsing to make complex parts easier to understand especially for new rustaceans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Raphael Höser --- src/client/client_request.rs | 65 +++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/src/client/client_request.rs b/src/client/client_request.rs index 59fb968..af503e0 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -61,13 +61,20 @@ pub trait OneShotRequest: ClientRequest { Ok(()) } } + +/// A line in any json-nd stream coming from the database #[derive(Deserialize, Debug)] #[serde(tag = "type", content = "payload", rename_all = "camelCase")] enum StreamLineItem { - Error { - error: String, - }, + /// An error occured during the request + Error { error: String }, + /// A heardbeat message was sent to keep the connection alive. + /// This is only used when observing events, but it does not hurt to have it everywhere. Heartbeat, + /// A successful response from the database + /// Since the exact type of the payload is not known at this point, we use this as a fallback case. + /// Every request item gets put in here and the type can be checked later on. + /// The type name checking is only for semantic reasons, as the payload is already parsed as the correct type at this point. #[serde(untagged)] Ok { #[serde(rename = "type")] @@ -76,22 +83,6 @@ enum StreamLineItem { }, } -impl StreamLineItem { - pub fn into_result_option(self, expected_type: &str) -> Result, ClientError> { - match self { - StreamLineItem::Error { error } => Err(ClientError::DBError(error)), - StreamLineItem::Heartbeat => Ok(None), - StreamLineItem::Ok { ty, payload } => { - if ty == expected_type { - Ok(Some(payload)) - } else { - Err(ClientError::InvalidResponseType(ty)) - } - } - } - } -} - /// Represents a request to the database that expects a stream of responses pub trait StreamingRequest: ClientRequest { type ItemType: DeserializeOwned; @@ -102,12 +93,38 @@ pub trait StreamingRequest: ClientRequest { ) -> impl Stream> { Box::pin( Self::lines_stream(response) - .map(|line| { - let line = line?; - let item: StreamLineItem = serde_json::from_str(line.as_str())?; - item.into_result_option(Self::ITEM_TYPE_NAME) + .map(|maybe_line| { + let line = maybe_line?; + // This line does the heavy lifting of parsing the json-nd line into the correct type. + // There's some Rust typesystem glory involved here, so let's break it down: + // First of all `serde_json::from_str` is used to parse any json `&str` into the type we want to have (in this case a `StreamLineItem`). + // `StreamLineItem` in turn is generic over `Self::ItemType`, which is the type that is expected by the exact response implementation and can change. + // This means, that this will throw an error if the line is invalid json or the string cannot be parsed into an error, heartbeat or the expected type. + // Because of this, we can guarantee after this line, that the payload of the `StreamLineItem` is of the correct type and no further checks are needed. + Ok(serde_json::from_str::>( + line.as_str(), + )?) }) - .filter_map(|o| async { o.transpose() }), + .filter_map(|o| async { + match o { + // An error was passed by the database, so we forward it as an error. + Ok(StreamLineItem::Error { error }) => { + Some(Err(ClientError::DBError(error))) + } + // A heartbeat message was sent, which we ignore. + Ok(StreamLineItem::Heartbeat) => None, + // A successful response was sent with the correct type. + Ok(StreamLineItem::Ok { payload, ty }) if ty == Self::ITEM_TYPE_NAME => { + Some(Ok(payload)) + } + // A successful response was sent, but the type does not match the expected type. + Ok(StreamLineItem::Ok { ty, .. }) => { + Some(Err(ClientError::InvalidResponseType(ty))) + } + // An error occured while parsing the line, which we forward as an error. + Err(e) => Some(Err(e)), + } + }), ) } From 7f4af879fa933a988bf2eee884586fbc6e81c530 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Mon, 2 Jun 2025 14:45:40 +0200 Subject: [PATCH 4/4] fix(client): adapt PR to request rework --- .../client_request/run_eventql_query.rs | 30 ++----------------- 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/src/client/client_request/run_eventql_query.rs b/src/client/client_request/run_eventql_query.rs index 478739b..7ff3bd8 100644 --- a/src/client/client_request/run_eventql_query.rs +++ b/src/client/client_request/run_eventql_query.rs @@ -1,6 +1,5 @@ -use futures::{Stream, StreamExt}; use reqwest::Method; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use crate::error::ClientError; @@ -24,30 +23,5 @@ impl ClientRequest for RunEventqlQueryRequest<'_> { impl StreamingRequest for RunEventqlQueryRequest<'_> { type ItemType = EventqlRow; - - fn build_stream( - response: reqwest::Response, - ) -> impl Stream> { - #[derive(Deserialize, Debug)] - #[serde(tag = "type", content = "payload", rename_all = "camelCase")] - enum LineItem { - Error { error: String }, - Row(EventqlRow), - } - - impl From for Result { - fn from(item: LineItem) -> Self { - match item { - LineItem::Error { error } => Err(ClientError::DBError(error)), - LineItem::Row(row) => Ok(row), - } - } - } - - Self::lines_stream(response).map(|line| { - let line = line?; - let item = serde_json::from_str(line.as_str())?; - Ok(item) - }) - } + const ITEM_TYPE_NAME: &'static str = "row"; }