Skip to content

Commit f741bf6

Browse files
feat(reading): Add Read-events support (#10)
* feat(client): add metadata and discovery compliance matching This implementation is incomplete and lacking tests Signed-off-by: Raphael Höser <[email protected]> * fix(client): correct default for listing subjects Signed-off-by: Raphael Höser <[email protected]> * chore(client): run cargo fmt * chore(client): further formatting and added documentation * chore(formatting): undo some formatting to make PR smaller Signed-off-by: Raphael Höser <[email protected]> * chore(linting): add reasoning to allow clippy::needless_lifetimes Signed-off-by: Raphael Höser <[email protected]> * feat(client): add reading and streaming events Tests are still missing here Signed-off-by: Raphael Höser <[email protected]> * fix(test): correct compile error in doc test Signed-off-by: Raphael Höser <[email protected]> * fix(Client): fix parsing the read stream Signed-off-by: Raphael Höser <[email protected]> * chore(lints): fix lints and formatting * chore(lints): fix new qa lints * fix(client): resolve merge issues --------- Signed-off-by: Raphael Höser <[email protected]> Co-authored-by: Kuro <[email protected]>
1 parent 1c7f965 commit f741bf6

File tree

9 files changed

+517
-10
lines changed

9 files changed

+517
-10
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c
99

1010
- 🚀 [Essentials](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#essentials)
1111
- 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
12-
- [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
12+
- 🚀 [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
1313
-[Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
1414
-[Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
1515
- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)

src/client.rs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,16 @@
1919
2020
mod client_request;
2121
mod precondition;
22+
pub mod request_options;
2223

2324
use crate::{
2425
error::ClientError,
2526
event::{Event, EventCandidate, ManagementEvent},
2627
};
2728
use client_request::{
2829
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest,
29-
RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest,
30-
list_event_types::EventType,
30+
ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest,
31+
WriteEventsRequest, list_event_types::EventType,
3132
};
3233
use futures::Stream;
3334
pub use precondition::Precondition;
@@ -146,7 +147,7 @@ impl Client {
146147
let response = self.build_request(&endpoint)?.send().await?;
147148

148149
if response.status().is_success() {
149-
Ok(endpoint.build_stream(response))
150+
Ok(R::build_stream(response))
150151
} else {
151152
Err(ClientError::DBApiError(
152153
response.status(),
@@ -176,6 +177,39 @@ impl Client {
176177
Ok(())
177178
}
178179

180+
/// Reads events from the DB instance.
181+
///
182+
/// ```
183+
/// use eventsourcingdb_client_rust::event::EventCandidate;
184+
/// use futures::StreamExt;
185+
/// # use serde_json::json;
186+
/// # tokio_test::block_on(async {
187+
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
188+
/// let db_url = "http://localhost:3000/";
189+
/// let api_token = "secrettoken";
190+
/// # let db_url = container.get_base_url().await.unwrap();
191+
/// # let api_token = container.get_api_token();
192+
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
193+
/// let mut event_stream = client.read_events("/", None).await.expect("Failed to read events");
194+
/// while let Some(event) = event_stream.next().await {
195+
/// println!("Found Type {:?}", event.expect("Error while reading events"));
196+
/// }
197+
/// # })
198+
/// ```
199+
///
200+
/// # Errors
201+
/// This function will return an error if the request fails or if the URL is invalid.
202+
pub async fn read_events<'a>(
203+
&self,
204+
subject: &'a str,
205+
options: Option<request_options::ReadEventsRequestOptions<'a>>,
206+
) -> Result<impl Stream<Item = Result<Event, ClientError>>, ClientError> {
207+
let response = self
208+
.request_streaming(ReadEventsRequest { subject, options })
209+
.await?;
210+
Ok(response)
211+
}
212+
179213
/// Verifies the API token by sending a request to the DB instance.
180214
///
181215
/// ```

src/client/client_request.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,24 @@
33
pub mod list_event_types;
44
mod list_subjects;
55
mod ping;
6+
mod read_events;
67
mod register_event_schema;
78
mod verify_api_token;
89
mod write_events;
910

1011
pub use list_event_types::ListEventTypesRequest;
1112
pub use list_subjects::ListSubjectsRequest;
1213
pub use ping::PingRequest;
14+
pub use read_events::ReadEventsRequest;
1315
pub use register_event_schema::RegisterEventSchemaRequest;
1416
pub use verify_api_token::VerifyApiTokenRequest;
1517
pub use write_events::WriteEventsRequest;
1618

1719
use crate::error::ClientError;
18-
use futures::{Stream, stream::TryStreamExt};
20+
use futures::{
21+
Stream,
22+
stream::{StreamExt, TryStreamExt},
23+
};
1924
use futures_util::io;
2025
use reqwest::Method;
2126
use serde::Serialize;
@@ -60,9 +65,14 @@ pub trait StreamingRequest: ClientRequest {
6065
type ItemType: DeserializeOwned;
6166

6267
fn build_stream(
63-
self,
6468
response: reqwest::Response,
65-
) -> impl Stream<Item = Result<Self::ItemType, ClientError>>;
69+
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
70+
Self::lines_stream(response).map(|line| {
71+
let line = line?;
72+
let item = serde_json::from_str(line.as_str())?;
73+
Ok(item)
74+
})
75+
}
6676

6777
fn lines_stream(
6878
response: reqwest::Response,

src/client/client_request/list_event_types.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ impl StreamingRequest for ListEventTypesRequest {
3131
type ItemType = EventType;
3232

3333
fn build_stream(
34-
self,
3534
response: reqwest::Response,
3635
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
3736
#[derive(Deserialize, Debug)]

src/client/client_request/list_subjects.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ impl StreamingRequest for ListSubjectsRequest<'_> {
2424
type ItemType = String;
2525

2626
fn build_stream(
27-
self,
2827
response: reqwest::Response,
2928
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
3029
#[derive(Deserialize, Debug)]
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use futures::{Stream, stream::StreamExt};
2+
use reqwest::Method;
3+
use serde::{Deserialize, Serialize};
4+
5+
use crate::{client::request_options::ReadEventsRequestOptions, error::ClientError, event::Event};
6+
7+
use super::{ClientRequest, StreamingRequest};
8+
9+
#[derive(Debug, Clone, Serialize)]
10+
pub struct ReadEventsRequest<'a> {
11+
pub subject: &'a str,
12+
#[serde(skip_serializing_if = "Option::is_none")]
13+
pub options: Option<ReadEventsRequestOptions<'a>>,
14+
}
15+
16+
impl ClientRequest for ReadEventsRequest<'_> {
17+
const URL_PATH: &'static str = "/api/v1/read-events";
18+
const METHOD: Method = Method::POST;
19+
20+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
21+
Some(Ok(self))
22+
}
23+
}
24+
25+
impl StreamingRequest for ReadEventsRequest<'_> {
26+
type ItemType = Event;
27+
28+
fn build_stream(
29+
response: reqwest::Response,
30+
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
31+
#[derive(Deserialize, Debug)]
32+
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
33+
enum LineItem {
34+
Error { error: String },
35+
Event(Box<Event>),
36+
}
37+
38+
impl From<LineItem> for Result<Event, ClientError> {
39+
fn from(item: LineItem) -> Self {
40+
match item {
41+
LineItem::Error { error } => Err(ClientError::DBError(error)),
42+
LineItem::Event(event_type) => Ok(*event_type),
43+
}
44+
}
45+
}
46+
47+
Self::lines_stream(response).map(|line| {
48+
let line = line?;
49+
let item: LineItem = serde_json::from_str(line.as_str())?;
50+
item.into()
51+
})
52+
}
53+
}

src/client/request_options.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//! This module contains supporting options for the client requests.
2+
3+
use serde::Serialize;
4+
5+
/// Options for reading events from the database
6+
#[derive(Debug, Default, Clone, Serialize)]
7+
#[serde(rename_all = "camelCase")]
8+
pub struct ReadEventsRequestOptions<'a> {
9+
/// Start reading events from this start event
10+
#[serde(skip_serializing_if = "Option::is_none")]
11+
pub from_latest_event: Option<FromLatestEventOptions<'a>>,
12+
/// Lower bound of events to read
13+
#[serde(skip_serializing_if = "Option::is_none")]
14+
pub lower_bound: Option<Bound<'a>>,
15+
/// Ordering of the returned events
16+
#[serde(skip_serializing_if = "Option::is_none")]
17+
pub order: Option<Ordering>,
18+
/// Include recursive subject's events
19+
pub recursive: bool,
20+
/// Upper bound of events to read
21+
#[serde(skip_serializing_if = "Option::is_none")]
22+
pub upper_bound: Option<Bound<'a>>,
23+
}
24+
25+
/// Ordering of the responses of requests
26+
#[derive(Debug, Clone, Serialize)]
27+
#[serde(rename_all = "kebab-case")]
28+
pub enum Ordering {
29+
/// Order the responses in chronological order
30+
Chronological,
31+
/// Order the responses in reverse chronological order
32+
Antichronological,
33+
}
34+
35+
/// The type of the request bound
36+
#[derive(Debug, Clone, Serialize)]
37+
#[serde(rename_all = "kebab-case")]
38+
pub enum BoundType {
39+
/// The bound is included in the response
40+
Inclusive,
41+
/// The bound is excluded from the response
42+
Exclusive,
43+
}
44+
45+
/// A single bound for the request
46+
#[derive(Debug, Clone, Serialize)]
47+
#[serde(rename_all = "camelCase")]
48+
pub struct Bound<'a> {
49+
/// The type of the bound
50+
pub bound_type: BoundType,
51+
/// The value of the bound
52+
pub id: &'a str,
53+
}
54+
55+
/// The strategy for handling missing events
56+
#[derive(Debug, Clone, Serialize)]
57+
#[serde(rename_all = "kebab-case")]
58+
pub enum EventMissingStrategy {
59+
/// Read all events if the required one is missing
60+
ReadEverything,
61+
/// Read no events if the required one is missing
62+
ReadNothing,
63+
}
64+
65+
/// Options for reading events from the start reading at
66+
#[derive(Debug, Clone, Serialize)]
67+
#[serde(rename_all = "camelCase")]
68+
pub struct FromLatestEventOptions<'a> {
69+
/// The strategy for handling missing events
70+
pub if_event_is_missing: EventMissingStrategy,
71+
/// The subject the event should be on
72+
pub subject: &'a str,
73+
/// The type of the event to read from
74+
#[serde(rename = "type")]
75+
pub ty: &'a str,
76+
}

0 commit comments

Comments
 (0)