Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c

- 🚀 [Essentials](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#essentials)
- 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
- [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
- 🚀 [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
- ❌ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
- ❌ [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)
Expand Down
40 changes: 37 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@

mod client_request;
mod precondition;
pub mod request_options;

use crate::{
error::ClientError,
event::{Event, EventCandidate, ManagementEvent},
};
use client_request::{
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest,
RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest,
list_event_types::EventType,
ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest,
WriteEventsRequest, list_event_types::EventType,
};
use futures::Stream;
pub use precondition::Precondition;
Expand Down Expand Up @@ -146,7 +147,7 @@ impl Client {
let response = self.build_request(&endpoint)?.send().await?;

if response.status().is_success() {
Ok(endpoint.build_stream(response))
Ok(R::build_stream(response))
} else {
Err(ClientError::DBApiError(
response.status(),
Expand Down Expand Up @@ -176,6 +177,39 @@ impl Client {
Ok(())
}

/// Reads events from the DB instance.
///
/// ```
/// use eventsourcingdb_client_rust::event::EventCandidate;
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
/// # let api_token = container.get_api_token();
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
/// let mut event_stream = client.read_events("/", None).await.expect("Failed to read events");
/// while let Some(event) = event_stream.next().await {
/// println!("Found Type {:?}", event.expect("Error while reading events"));
/// }
/// # })
/// ```
///
/// # Errors
/// This function will return an error if the request fails or if the URL is invalid.
pub async fn read_events<'a>(
&self,
subject: &'a str,
options: Option<request_options::ReadEventsRequestOptions<'a>>,
) -> Result<impl Stream<Item = Result<Event, ClientError>>, ClientError> {
let response = self
.request_streaming(ReadEventsRequest { subject, options })
.await?;
Ok(response)
}

/// Verifies the API token by sending a request to the DB instance.
///
/// ```
Expand Down
16 changes: 13 additions & 3 deletions src/client/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,24 @@
pub mod list_event_types;
mod list_subjects;
mod ping;
mod read_events;
mod register_event_schema;
mod verify_api_token;
mod write_events;

pub use list_event_types::ListEventTypesRequest;
pub use list_subjects::ListSubjectsRequest;
pub use ping::PingRequest;
pub use read_events::ReadEventsRequest;
pub use register_event_schema::RegisterEventSchemaRequest;
pub use verify_api_token::VerifyApiTokenRequest;
pub use write_events::WriteEventsRequest;

use crate::error::ClientError;
use futures::{Stream, stream::TryStreamExt};
use futures::{
Stream,
stream::{StreamExt, TryStreamExt},
};
use futures_util::io;
use reqwest::Method;
use serde::Serialize;
Expand Down Expand Up @@ -60,9 +65,14 @@ pub trait StreamingRequest: ClientRequest {
type ItemType: DeserializeOwned;

fn build_stream(
self,
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>>;
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
Self::lines_stream(response).map(|line| {
let line = line?;
let item = serde_json::from_str(line.as_str())?;
Ok(item)
})
}

fn lines_stream(
response: reqwest::Response,
Expand Down
1 change: 0 additions & 1 deletion src/client/client_request/list_event_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ impl StreamingRequest for ListEventTypesRequest {
type ItemType = EventType;

fn build_stream(
self,
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
#[derive(Deserialize, Debug)]
Expand Down
1 change: 0 additions & 1 deletion src/client/client_request/list_subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ impl StreamingRequest for ListSubjectsRequest<'_> {
type ItemType = String;

fn build_stream(
self,
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
#[derive(Deserialize, Debug)]
Expand Down
53 changes: 53 additions & 0 deletions src/client/client_request/read_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use futures::{Stream, stream::StreamExt};
use reqwest::Method;
use serde::{Deserialize, Serialize};

use crate::{client::request_options::ReadEventsRequestOptions, error::ClientError, event::Event};

use super::{ClientRequest, StreamingRequest};

#[derive(Debug, Clone, Serialize)]
pub struct ReadEventsRequest<'a> {
pub subject: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
pub options: Option<ReadEventsRequestOptions<'a>>,
}

impl ClientRequest for ReadEventsRequest<'_> {
const URL_PATH: &'static str = "/api/v1/read-events";
const METHOD: Method = Method::POST;

fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
Some(Ok(self))
}
}

impl StreamingRequest for ReadEventsRequest<'_> {
type ItemType = Event;

fn build_stream(
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
#[derive(Deserialize, Debug)]
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
enum LineItem {
Error { error: String },
Event(Box<Event>),
}

impl From<LineItem> for Result<Event, ClientError> {
fn from(item: LineItem) -> Self {
match item {
LineItem::Error { error } => Err(ClientError::DBError(error)),
LineItem::Event(event_type) => Ok(*event_type),
}
}
}

Self::lines_stream(response).map(|line| {
let line = line?;
let item: LineItem = serde_json::from_str(line.as_str())?;
item.into()
})
}
}
76 changes: 76 additions & 0 deletions src/client/request_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//! This module contains supporting options for the client requests.
use serde::Serialize;

/// Options for reading events from the database
#[derive(Debug, Default, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadEventsRequestOptions<'a> {
/// Start reading events from this start event
#[serde(skip_serializing_if = "Option::is_none")]
pub from_latest_event: Option<FromLatestEventOptions<'a>>,
/// Lower bound of events to read
#[serde(skip_serializing_if = "Option::is_none")]
pub lower_bound: Option<Bound<'a>>,
/// Ordering of the returned events
#[serde(skip_serializing_if = "Option::is_none")]
pub order: Option<Ordering>,
/// Include recursive subject's events
pub recursive: bool,
/// Upper bound of events to read
#[serde(skip_serializing_if = "Option::is_none")]
pub upper_bound: Option<Bound<'a>>,
}

/// Ordering of the responses of requests
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum Ordering {
/// Order the responses in chronological order
Chronological,
/// Order the responses in reverse chronological order
Antichronological,
}

/// The type of the request bound
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum BoundType {
/// The bound is included in the response
Inclusive,
/// The bound is excluded from the response
Exclusive,
}

/// A single bound for the request
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Bound<'a> {
/// The type of the bound
pub bound_type: BoundType,
/// The value of the bound
pub id: &'a str,
}

/// The strategy for handling missing events
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum EventMissingStrategy {
/// Read all events if the required one is missing
ReadEverything,
/// Read no events if the required one is missing
ReadNothing,
}

/// Options for reading events from the start reading at
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FromLatestEventOptions<'a> {
/// The strategy for handling missing events
pub if_event_is_missing: EventMissingStrategy,
/// The subject the event should be on
pub subject: &'a str,
/// The type of the event to read from
#[serde(rename = "type")]
pub ty: &'a str,
}
Loading