Skip to content

Commit da5cec5

Browse files
committed
fix(client): adapt internal client_request API to be easier to implement correctly
This also fixes an issue with handling DB errors when reading events
1 parent b308223 commit da5cec5

File tree

6 files changed

+52
-86
lines changed

6 files changed

+52
-86
lines changed

src/client/client_request.rs

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ use futures::{
2525
};
2626
use futures_util::io;
2727
use reqwest::Method;
28-
use serde::Serialize;
2928
use serde::de::DeserializeOwned;
29+
use serde::{Deserialize, Serialize};
30+
use std::pin::Pin;
3031
use tokio::io::{AsyncBufReadExt, BufReader};
3132
use tokio_stream::wrappers::LinesStream;
3233
use tokio_util::io::StreamReader;
@@ -61,19 +62,54 @@ pub trait OneShotRequest: ClientRequest {
6162
Ok(())
6263
}
6364
}
65+
#[derive(Deserialize, Debug)]
66+
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
67+
enum StreamLineItem<T> {
68+
Error {
69+
error: String,
70+
},
71+
Heartbeat,
72+
#[serde(untagged)]
73+
Ok {
74+
#[serde(rename = "type")]
75+
ty: String,
76+
payload: T,
77+
},
78+
}
79+
80+
impl<T> StreamLineItem<T> {
81+
pub fn into_result_option(self, expected_type: &str) -> Result<Option<T>, ClientError> {
82+
match self {
83+
StreamLineItem::Error { error } => Err(ClientError::DBError(error)),
84+
StreamLineItem::Heartbeat => Ok(None),
85+
StreamLineItem::Ok { ty, payload } => {
86+
if ty == expected_type {
87+
Ok(Some(payload))
88+
} else {
89+
Err(ClientError::InvalidResponseType(ty))
90+
}
91+
}
92+
}
93+
}
94+
}
6495

6596
/// Represents a request to the database that expects a stream of responses
6697
pub trait StreamingRequest: ClientRequest {
6798
type ItemType: DeserializeOwned;
99+
const ITEM_TYPE_NAME: &'static str;
68100

69101
fn build_stream(
70102
response: reqwest::Response,
71-
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
72-
Self::lines_stream(response).map(|line| {
73-
let line = line?;
74-
let item = serde_json::from_str(line.as_str())?;
75-
Ok(item)
76-
})
103+
) -> Pin<Box<impl Stream<Item = Result<Self::ItemType, ClientError>>>> {
104+
Box::pin(
105+
Self::lines_stream(response)
106+
.map(|line| {
107+
let line = line?;
108+
let item: StreamLineItem<Self::ItemType> = serde_json::from_str(line.as_str())?;
109+
item.into_result_option(Self::ITEM_TYPE_NAME)
110+
})
111+
.filter_map(|o| async { o.transpose() }),
112+
)
77113
}
78114

79115
fn lines_stream(
Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use futures::{Stream, stream::StreamExt};
21
use reqwest::Method;
32
use serde::{Deserialize, Serialize};
43
use serde_json::Value;
@@ -29,30 +28,5 @@ impl ClientRequest for ListEventTypesRequest {
2928
}
3029
impl StreamingRequest for ListEventTypesRequest {
3130
type ItemType = EventType;
32-
33-
fn build_stream(
34-
response: reqwest::Response,
35-
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
36-
#[derive(Deserialize, Debug)]
37-
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
38-
enum LineItem {
39-
Error { error: String },
40-
EventType(EventType),
41-
}
42-
43-
impl From<LineItem> for Result<EventType, ClientError> {
44-
fn from(item: LineItem) -> Self {
45-
match item {
46-
LineItem::Error { error } => Err(ClientError::DBError(error)),
47-
LineItem::EventType(event_type) => Ok(event_type),
48-
}
49-
}
50-
}
51-
52-
Self::lines_stream(response).map(|line| {
53-
let line = line?;
54-
let item: LineItem = serde_json::from_str(line.as_str())?;
55-
item.into()
56-
})
57-
}
31+
const ITEM_TYPE_NAME: &'static str = "eventType";
5832
}
Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use futures::{Stream, stream::StreamExt};
21
use reqwest::Method;
3-
use serde::{Deserialize, Serialize};
2+
use serde::Serialize;
43

54
use crate::error::ClientError;
65

@@ -22,26 +21,5 @@ impl ClientRequest for ListSubjectsRequest<'_> {
2221
}
2322
impl StreamingRequest for ListSubjectsRequest<'_> {
2423
type ItemType = String;
25-
26-
fn build_stream(
27-
response: reqwest::Response,
28-
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
29-
#[derive(Deserialize, Debug)]
30-
struct LineItem {
31-
payload: LineItemPayload,
32-
r#type: String,
33-
}
34-
#[derive(Deserialize, Debug)]
35-
struct LineItemPayload {
36-
subject: String,
37-
}
38-
Self::lines_stream(response).map(|line| {
39-
let line = line?;
40-
let item: LineItem = serde_json::from_str(line.as_str())?;
41-
if item.r#type != "subject" {
42-
return Err(ClientError::InvalidEventType);
43-
}
44-
Ok(item.payload.subject)
45-
})
46-
}
24+
const ITEM_TYPE_NAME: &'static str = "subject";
4725
}

src/client/client_request/observe_events.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ impl ClientRequest for ObserveEventsRequest<'_> {
2525

2626
impl StreamingRequest for ObserveEventsRequest<'_> {
2727
type ItemType = Event;
28+
const ITEM_TYPE_NAME: &'static str = "event";
2829
}
Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use futures::{Stream, stream::StreamExt};
21
use reqwest::Method;
3-
use serde::{Deserialize, Serialize};
2+
use serde::Serialize;
43

54
use crate::{client::request_options::ReadEventsRequestOptions, error::ClientError, event::Event};
65

@@ -24,30 +23,5 @@ impl ClientRequest for ReadEventsRequest<'_> {
2423

2524
impl StreamingRequest for ReadEventsRequest<'_> {
2625
type ItemType = Event;
27-
28-
fn build_stream(
29-
response: reqwest::Response,
30-
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
31-
#[derive(Deserialize, Debug)]
32-
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
33-
enum LineItem {
34-
Error { error: String },
35-
Event(Box<Event>),
36-
}
37-
38-
impl From<LineItem> for Result<Event, ClientError> {
39-
fn from(item: LineItem) -> Self {
40-
match item {
41-
LineItem::Error { error } => Err(ClientError::DBError(error)),
42-
LineItem::Event(event_type) => Ok(*event_type),
43-
}
44-
}
45-
}
46-
47-
Self::lines_stream(response).map(|line| {
48-
let line = line?;
49-
let item: LineItem = serde_json::from_str(line.as_str())?;
50-
item.into()
51-
})
52-
}
26+
const ITEM_TYPE_NAME: &'static str = "event";
5327
}

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ pub enum ClientError {
4444
#[cfg(feature = "cloudevents")]
4545
#[error("The CloudEvents message is invalid: {0}")]
4646
CloudeventsMessageError(#[from] cloudevents::message::Error),
47+
/// The database returned an invalid response type
48+
#[error("The DB returned an invalid response type: {0}")]
49+
InvalidResponseType(String),
4750
}
4851

4952
/// Error type for the [`crate::container`] feature.

0 commit comments

Comments
 (0)