Skip to content

Commit 38c27cd

Browse files
authored
Merge branch 'main' into Add-metadata
2 parents 909aee4 + b5c163a commit 38c27cd

File tree

7 files changed

+69
-113
lines changed

7 files changed

+69
-113
lines changed

src/client/client_request.rs

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use futures::{
2727
};
2828
use futures_util::io;
2929
use reqwest::Method;
30-
use serde::Serialize;
3130
use serde::de::DeserializeOwned;
31+
use serde::{Deserialize, Serialize};
3232
use tokio::io::{AsyncBufReadExt, BufReader};
3333
use tokio_stream::wrappers::LinesStream;
3434
use tokio_util::io::StreamReader;
@@ -64,18 +64,70 @@ pub trait OneShotRequest: ClientRequest {
6464
}
6565
}
6666

67+
/// A line in any json-nd stream coming from the database
68+
#[derive(Deserialize, Debug)]
69+
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
70+
enum StreamLineItem<T> {
71+
/// An error occured during the request
72+
Error { error: String },
73+
/// A heardbeat message was sent to keep the connection alive.
74+
/// This is only used when observing events, but it does not hurt to have it everywhere.
75+
Heartbeat,
76+
/// A successful response from the database
77+
/// Since the exact type of the payload is not known at this point, we use this as a fallback case.
78+
/// Every request item gets put in here and the type can be checked later on.
79+
/// The type name checking is only for semantic reasons, as the payload is already parsed as the correct type at this point.
80+
#[serde(untagged)]
81+
Ok {
82+
#[serde(rename = "type")]
83+
ty: String,
84+
payload: T,
85+
},
86+
}
87+
6788
/// Represents a request to the database that expects a stream of responses
6889
pub trait StreamingRequest: ClientRequest {
6990
type ItemType: DeserializeOwned;
91+
const ITEM_TYPE_NAME: &'static str;
7092

7193
fn build_stream(
7294
response: reqwest::Response,
7395
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
74-
Self::lines_stream(response).map(|line| {
75-
let line = line?;
76-
let item = serde_json::from_str(line.as_str())?;
77-
Ok(item)
78-
})
96+
Box::pin(
97+
Self::lines_stream(response)
98+
.map(|maybe_line| {
99+
let line = maybe_line?;
100+
// This line does the heavy lifting of parsing the json-nd line into the correct type.
101+
// There's some Rust typesystem glory involved here, so let's break it down:
102+
// First of all `serde_json::from_str` is used to parse any json `&str` into the type we want to have (in this case a `StreamLineItem`).
103+
// `StreamLineItem` in turn is generic over `Self::ItemType`, which is the type that is expected by the exact response implementation and can change.
104+
// This means, that this will throw an error if the line is invalid json or the string cannot be parsed into an error, heartbeat or the expected type.
105+
// Because of this, we can guarantee after this line, that the payload of the `StreamLineItem` is of the correct type and no further checks are needed.
106+
Ok(serde_json::from_str::<StreamLineItem<Self::ItemType>>(
107+
line.as_str(),
108+
)?)
109+
})
110+
.filter_map(|o| async {
111+
match o {
112+
// An error was passed by the database, so we forward it as an error.
113+
Ok(StreamLineItem::Error { error }) => {
114+
Some(Err(ClientError::DBError(error)))
115+
}
116+
// A heartbeat message was sent, which we ignore.
117+
Ok(StreamLineItem::Heartbeat) => None,
118+
// A successful response was sent with the correct type.
119+
Ok(StreamLineItem::Ok { payload, ty }) if ty == Self::ITEM_TYPE_NAME => {
120+
Some(Ok(payload))
121+
}
122+
// A successful response was sent, but the type does not match the expected type.
123+
Ok(StreamLineItem::Ok { ty, .. }) => {
124+
Some(Err(ClientError::InvalidResponseType(ty)))
125+
}
126+
// An error occured while parsing the line, which we forward as an error.
127+
Err(e) => Some(Err(e)),
128+
}
129+
}),
130+
)
79131
}
80132

81133
fn lines_stream(
Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use futures::{Stream, stream::StreamExt};
21
use reqwest::Method;
32
use serde::{Deserialize, Serialize};
43
use serde_json::Value;
@@ -29,30 +28,5 @@ impl ClientRequest for ListEventTypesRequest {
2928
}
3029
impl StreamingRequest for ListEventTypesRequest {
3130
type ItemType = EventType;
32-
33-
fn build_stream(
34-
response: reqwest::Response,
35-
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
36-
#[derive(Deserialize, Debug)]
37-
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
38-
enum LineItem {
39-
Error { error: String },
40-
EventType(EventType),
41-
}
42-
43-
impl From<LineItem> for Result<EventType, ClientError> {
44-
fn from(item: LineItem) -> Self {
45-
match item {
46-
LineItem::Error { error } => Err(ClientError::DBError(error)),
47-
LineItem::EventType(event_type) => Ok(event_type),
48-
}
49-
}
50-
}
51-
52-
Self::lines_stream(response).map(|line| {
53-
let line = line?;
54-
let item: LineItem = serde_json::from_str(line.as_str())?;
55-
item.into()
56-
})
57-
}
31+
const ITEM_TYPE_NAME: &'static str = "eventType";
5832
}
Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use futures::{Stream, stream::StreamExt};
21
use reqwest::Method;
3-
use serde::{Deserialize, Serialize};
2+
use serde::Serialize;
43

54
use crate::error::ClientError;
65

@@ -22,26 +21,5 @@ impl ClientRequest for ListSubjectsRequest<'_> {
2221
}
2322
impl StreamingRequest for ListSubjectsRequest<'_> {
2423
type ItemType = String;
25-
26-
fn build_stream(
27-
response: reqwest::Response,
28-
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
29-
#[derive(Deserialize, Debug)]
30-
struct LineItem {
31-
payload: LineItemPayload,
32-
r#type: String,
33-
}
34-
#[derive(Deserialize, Debug)]
35-
struct LineItemPayload {
36-
subject: String,
37-
}
38-
Self::lines_stream(response).map(|line| {
39-
let line = line?;
40-
let item: LineItem = serde_json::from_str(line.as_str())?;
41-
if item.r#type != "subject" {
42-
return Err(ClientError::InvalidEventType);
43-
}
44-
Ok(item.payload.subject)
45-
})
46-
}
24+
const ITEM_TYPE_NAME: &'static str = "subject";
4725
}

src/client/client_request/observe_events.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ impl ClientRequest for ObserveEventsRequest<'_> {
2525

2626
impl StreamingRequest for ObserveEventsRequest<'_> {
2727
type ItemType = Event;
28+
const ITEM_TYPE_NAME: &'static str = "event";
2829
}
Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use futures::{Stream, stream::StreamExt};
21
use reqwest::Method;
3-
use serde::{Deserialize, Serialize};
2+
use serde::Serialize;
43

54
use crate::{client::request_options::ReadEventsRequestOptions, error::ClientError, event::Event};
65

@@ -24,30 +23,5 @@ impl ClientRequest for ReadEventsRequest<'_> {
2423

2524
impl StreamingRequest for ReadEventsRequest<'_> {
2625
type ItemType = Event;
27-
28-
fn build_stream(
29-
response: reqwest::Response,
30-
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
31-
#[derive(Deserialize, Debug)]
32-
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
33-
enum LineItem {
34-
Error { error: String },
35-
Event(Box<Event>),
36-
}
37-
38-
impl From<LineItem> for Result<Event, ClientError> {
39-
fn from(item: LineItem) -> Self {
40-
match item {
41-
LineItem::Error { error } => Err(ClientError::DBError(error)),
42-
LineItem::Event(event_type) => Ok(*event_type),
43-
}
44-
}
45-
}
46-
47-
Self::lines_stream(response).map(|line| {
48-
let line = line?;
49-
let item: LineItem = serde_json::from_str(line.as_str())?;
50-
item.into()
51-
})
52-
}
26+
const ITEM_TYPE_NAME: &'static str = "event";
5327
}
Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use futures::{Stream, StreamExt};
21
use reqwest::Method;
3-
use serde::{Deserialize, Serialize};
2+
use serde::Serialize;
43

54
use crate::error::ClientError;
65

@@ -24,30 +23,5 @@ impl ClientRequest for RunEventqlQueryRequest<'_> {
2423

2524
impl StreamingRequest for RunEventqlQueryRequest<'_> {
2625
type ItemType = EventqlRow;
27-
28-
fn build_stream(
29-
response: reqwest::Response,
30-
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
31-
#[derive(Deserialize, Debug)]
32-
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
33-
enum LineItem {
34-
Error { error: String },
35-
Row(EventqlRow),
36-
}
37-
38-
impl From<LineItem> for Result<EventqlRow, ClientError> {
39-
fn from(item: LineItem) -> Self {
40-
match item {
41-
LineItem::Error { error } => Err(ClientError::DBError(error)),
42-
LineItem::Row(row) => Ok(row),
43-
}
44-
}
45-
}
46-
47-
Self::lines_stream(response).map(|line| {
48-
let line = line?;
49-
let item = serde_json::from_str(line.as_str())?;
50-
Ok(item)
51-
})
52-
}
26+
const ITEM_TYPE_NAME: &'static str = "row";
5327
}

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ pub enum ClientError {
4444
#[cfg(feature = "cloudevents")]
4545
#[error("The CloudEvents message is invalid: {0}")]
4646
CloudeventsMessageError(#[from] cloudevents::message::Error),
47+
/// The database returned an invalid response type
48+
#[error("The DB returned an invalid response type: {0}")]
49+
InvalidResponseType(String),
4750
}
4851

4952
/// Error type for the [`crate::container`] feature.

0 commit comments

Comments
 (0)