Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions src/client/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use futures::{
};
use futures_util::io;
use reqwest::Method;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_stream::wrappers::LinesStream;
use tokio_util::io::StreamReader;
Expand Down Expand Up @@ -64,18 +64,70 @@ pub trait OneShotRequest: ClientRequest {
}
}

/// A line in any json-nd stream coming from the database
#[derive(Deserialize, Debug)]
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
enum StreamLineItem<T> {
/// An error occured during the request
Error { error: String },
/// A heardbeat message was sent to keep the connection alive.
/// This is only used when observing events, but it does not hurt to have it everywhere.
Heartbeat,
/// A successful response from the database
/// Since the exact type of the payload is not known at this point, we use this as a fallback case.
/// Every request item gets put in here and the type can be checked later on.
/// The type name checking is only for semantic reasons, as the payload is already parsed as the correct type at this point.
#[serde(untagged)]
Ok {
#[serde(rename = "type")]
ty: String,
payload: T,
},
}

/// Represents a request to the database that expects a stream of responses
pub trait StreamingRequest: ClientRequest {
type ItemType: DeserializeOwned;
const ITEM_TYPE_NAME: &'static str;

fn build_stream(
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
Self::lines_stream(response).map(|line| {
let line = line?;
let item = serde_json::from_str(line.as_str())?;
Ok(item)
})
Box::pin(
Self::lines_stream(response)
.map(|maybe_line| {
let line = maybe_line?;
// This line does the heavy lifting of parsing the json-nd line into the correct type.
// There's some Rust typesystem glory involved here, so let's break it down:
// First of all `serde_json::from_str` is used to parse any json `&str` into the type we want to have (in this case a `StreamLineItem`).
// `StreamLineItem` in turn is generic over `Self::ItemType`, which is the type that is expected by the exact response implementation and can change.
// This means, that this will throw an error if the line is invalid json or the string cannot be parsed into an error, heartbeat or the expected type.
// Because of this, we can guarantee after this line, that the payload of the `StreamLineItem` is of the correct type and no further checks are needed.
Ok(serde_json::from_str::<StreamLineItem<Self::ItemType>>(
line.as_str(),
)?)
})
.filter_map(|o| async {
match o {
// An error was passed by the database, so we forward it as an error.
Ok(StreamLineItem::Error { error }) => {
Some(Err(ClientError::DBError(error)))
}
// A heartbeat message was sent, which we ignore.
Ok(StreamLineItem::Heartbeat) => None,
// A successful response was sent with the correct type.
Ok(StreamLineItem::Ok { payload, ty }) if ty == Self::ITEM_TYPE_NAME => {
Some(Ok(payload))
}
// A successful response was sent, but the type does not match the expected type.
Ok(StreamLineItem::Ok { ty, .. }) => {
Some(Err(ClientError::InvalidResponseType(ty)))
}
// An error occured while parsing the line, which we forward as an error.
Err(e) => Some(Err(e)),
}
}),
)
}

fn lines_stream(
Expand Down
28 changes: 1 addition & 27 deletions src/client/client_request/list_event_types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use futures::{Stream, stream::StreamExt};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -29,30 +28,5 @@ impl ClientRequest for ListEventTypesRequest {
}
impl StreamingRequest for ListEventTypesRequest {
type ItemType = EventType;

fn build_stream(
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
#[derive(Deserialize, Debug)]
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
enum LineItem {
Error { error: String },
EventType(EventType),
}

impl From<LineItem> for Result<EventType, ClientError> {
fn from(item: LineItem) -> Self {
match item {
LineItem::Error { error } => Err(ClientError::DBError(error)),
LineItem::EventType(event_type) => Ok(event_type),
}
}
}

Self::lines_stream(response).map(|line| {
let line = line?;
let item: LineItem = serde_json::from_str(line.as_str())?;
item.into()
})
}
const ITEM_TYPE_NAME: &'static str = "eventType";
}
26 changes: 2 additions & 24 deletions src/client/client_request/list_subjects.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use futures::{Stream, stream::StreamExt};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use serde::Serialize;

use crate::error::ClientError;

Expand All @@ -22,26 +21,5 @@ impl ClientRequest for ListSubjectsRequest<'_> {
}
impl StreamingRequest for ListSubjectsRequest<'_> {
type ItemType = String;

fn build_stream(
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
#[derive(Deserialize, Debug)]
struct LineItem {
payload: LineItemPayload,
r#type: String,
}
#[derive(Deserialize, Debug)]
struct LineItemPayload {
subject: String,
}
Self::lines_stream(response).map(|line| {
let line = line?;
let item: LineItem = serde_json::from_str(line.as_str())?;
if item.r#type != "subject" {
return Err(ClientError::InvalidEventType);
}
Ok(item.payload.subject)
})
}
const ITEM_TYPE_NAME: &'static str = "subject";
}
1 change: 1 addition & 0 deletions src/client/client_request/observe_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ impl ClientRequest for ObserveEventsRequest<'_> {

impl StreamingRequest for ObserveEventsRequest<'_> {
type ItemType = Event;
const ITEM_TYPE_NAME: &'static str = "event";
}
30 changes: 2 additions & 28 deletions src/client/client_request/read_events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use futures::{Stream, stream::StreamExt};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use serde::Serialize;

use crate::{client::request_options::ReadEventsRequestOptions, error::ClientError, event::Event};

Expand All @@ -24,30 +23,5 @@ impl ClientRequest for ReadEventsRequest<'_> {

impl StreamingRequest for ReadEventsRequest<'_> {
type ItemType = Event;

fn build_stream(
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
#[derive(Deserialize, Debug)]
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
enum LineItem {
Error { error: String },
Event(Box<Event>),
}

impl From<LineItem> for Result<Event, ClientError> {
fn from(item: LineItem) -> Self {
match item {
LineItem::Error { error } => Err(ClientError::DBError(error)),
LineItem::Event(event_type) => Ok(*event_type),
}
}
}

Self::lines_stream(response).map(|line| {
let line = line?;
let item: LineItem = serde_json::from_str(line.as_str())?;
item.into()
})
}
const ITEM_TYPE_NAME: &'static str = "event";
}
30 changes: 2 additions & 28 deletions src/client/client_request/run_eventql_query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use futures::{Stream, StreamExt};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use serde::Serialize;

use crate::error::ClientError;

Expand All @@ -24,30 +23,5 @@ impl ClientRequest for RunEventqlQueryRequest<'_> {

impl StreamingRequest for RunEventqlQueryRequest<'_> {
type ItemType = EventqlRow;

fn build_stream(
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
#[derive(Deserialize, Debug)]
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
enum LineItem {
Error { error: String },
Row(EventqlRow),
}

impl From<LineItem> for Result<EventqlRow, ClientError> {
fn from(item: LineItem) -> Self {
match item {
LineItem::Error { error } => Err(ClientError::DBError(error)),
LineItem::Row(row) => Ok(row),
}
}
}

Self::lines_stream(response).map(|line| {
let line = line?;
let item = serde_json::from_str(line.as_str())?;
Ok(item)
})
}
const ITEM_TYPE_NAME: &'static str = "row";
}
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub enum ClientError {
#[cfg(feature = "cloudevents")]
#[error("The CloudEvents message is invalid: {0}")]
CloudeventsMessageError(#[from] cloudevents::message::Error),
/// The database returned an invalid response type
#[error("The DB returned an invalid response type: {0}")]
InvalidResponseType(String),
}

/// Error type for the [`crate::container`] feature.
Expand Down