Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 269 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@ testcontainer = ["dep:testcontainers"]
[dependencies]
chrono = { version = "0.4.41", features = ["serde"] }
cloudevents-sdk = { version = "0.8.0", features = ["reqwest"], optional = true }
url = "2.5.4"
reqwest = { version = "0.12.15", features = ["json"] }
futures = "0.3.31"
futures-util = "0.3.31"
jsonschema = "0.30.0"
reqwest = { version = "0.12.15", features = ["json", "stream"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
testcontainers = { version = "0.24.0", features = [
"http_wait",
], optional = true }
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["io-util"] }
tokio-util = { version = "0.7.15", features = ["io"] }
tokio-stream = { version = "0.1.17", features = ["io-util"] }
typed-builder = "0.21.0"
url = "2.5.4"

[dev-dependencies]
eventsourcingdb-client-rust = { path = ".", features = ["testcontainer"] }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c
- ❌ [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
- ❌ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
- ❌ [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
- [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)
- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)
- 🚀 [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support)
169 changes: 158 additions & 11 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,26 @@
mod client_request;
mod precondition;

use crate::{
error::ClientError,
event::{Event, EventCandidate, ManagementEvent},
};
use client_request::{
ClientRequest, OneShotRequest, PingRequest, VerifyApiTokenRequest, WriteEventsRequest,
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest,
RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest,
list_event_types::EventType,
};

use futures::Stream;
pub use precondition::Precondition;
use reqwest;
use url::Url;

use crate::{
error::ClientError,
event::{Event, EventCandidate},
};

/// Client for an [EventsourcingDB](https://www.eventsourcingdb.io/) instance.
#[derive(Debug)]
pub struct Client {
base_url: Url,
api_token: String,
client: reqwest::Client,
reqwest: reqwest::Client,
}

impl Client {
Expand All @@ -47,7 +48,7 @@ impl Client {
Client {
base_url,
api_token: api_token.into(),
client: reqwest::Client::new(),
reqwest: reqwest::Client::new(),
}
}

Expand Down Expand Up @@ -93,8 +94,8 @@ impl Client {
.map_err(ClientError::URLParseError)?;

let request = match endpoint.method() {
reqwest::Method::GET => self.client.get(url),
reqwest::Method::POST => self.client.post(url),
reqwest::Method::GET => self.reqwest.get(url),
reqwest::Method::POST => self.reqwest.post(url),
_ => return Err(ClientError::InvalidRequestMethod),
}
.header("Authorization", format!("Bearer {}", self.api_token));
Expand Down Expand Up @@ -132,6 +133,28 @@ impl Client {
}
}

/// Utility function to request an endpoint of the API as a stream.
///
/// This means, that the response is streamed and returned as a stream of values.
///
/// # Errors
/// This function will return an error if the request fails or if the URL is invalid.
async fn request_streaming<R: StreamingRequest>(
&self,
endpoint: R,
) -> Result<impl Stream<Item = Result<R::ItemType, ClientError>>, ClientError> {
let response = self.build_request(&endpoint)?.send().await?;

if response.status().is_success() {
Ok(endpoint.build_stream(response))
} else {
Err(ClientError::DBApiError(
response.status(),
response.text().await.unwrap_or_default(),
))
}
}

/// Pings the DB instance to check if it is reachable.
///
/// ```
Expand Down Expand Up @@ -174,6 +197,130 @@ impl Client {
Ok(())
}

/// Registers an event schema with the DB instance.
///
/// ```
/// use eventsourcingdb_client_rust::event::EventCandidate;
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
/// # let api_token = container.get_api_token();
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
/// let event_type = "io.eventsourcingdb.test";
/// let schema = json!({
/// "type": "object",
/// "properties": {
/// "id": {
/// "type": "string"
/// },
/// "name": {
/// "type": "string"
/// }
/// },
/// "required": ["id", "name"]
/// });
/// client.register_event_schema(event_type, &schema).await.expect("Failed to list event types");
/// # })
/// ```
///
/// # Errors
/// This function will return an error if the request fails or if the provided schema is invalid.
pub async fn register_event_schema(
&self,
event_type: &str,
schema: &serde_json::Value,
) -> Result<ManagementEvent, ClientError> {
self.request_oneshot(RegisterEventSchemaRequest::try_new(event_type, schema)?)
.await
}

/// List all subjects in the DB instance.
///
/// To get all subjects in the DB, just pass `None` as the `base_subject`.
/// ```
/// use eventsourcingdb_client_rust::event::EventCandidate;
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
/// # let api_token = container.get_api_token();
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
/// let mut subject_stream = client.list_subjects(None).await.expect("Failed to list event types");
/// while let Some(subject) = subject_stream.next().await {
/// println!("Found Type {}", subject.expect("Error while reading types"));
/// }
/// # })
/// ```
///
/// To get all subjects under /test in the DB, just pass `Some("/test")` as the `base_subject`.
/// ```
/// use eventsourcingdb_client_rust::event::EventCandidate;
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
/// # let api_token = container.get_api_token();
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
/// let mut subject_stream = client.list_subjects(Some("/test")).await.expect("Failed to list event types");
/// while let Some(subject) = subject_stream.next().await {
/// println!("Found Type {}", subject.expect("Error while reading types"));
/// }
/// # })
/// ```
///
/// # Errors
/// This function will return an error if the request fails or if the URL is invalid.
pub async fn list_subjects(
&self,
base_subject: Option<&str>,
) -> Result<impl Stream<Item = Result<String, ClientError>>, ClientError> {
let response = self
.request_streaming(ListSubjectsRequest {
base_subject: base_subject.unwrap_or("/"),
})
.await?;
Ok(response)
}

/// List all event types in the DB instance.
///
/// ```
/// use eventsourcingdb_client_rust::event::EventCandidate;
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
/// # let api_token = container.get_api_token();
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
/// let mut type_stream = client.list_event_types().await.expect("Failed to list event types");
/// while let Some(ty) = type_stream.next().await {
/// println!("Found Type {}", ty.expect("Error while reading types").name);
/// }
/// # })
/// ```
///
/// # Errors
/// This function will return an error if the request fails or if the URL is invalid.
pub async fn list_event_types(
&self,
) -> Result<impl Stream<Item = Result<EventType, ClientError>>, ClientError> {
let response = self.request_streaming(ListEventTypesRequest).await?;
Ok(response)
}

/// Writes events to the DB instance.
///
/// ```
Expand Down
34 changes: 33 additions & 1 deletion src/client/client_request.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
//! This is a purely internal module to represent client requests to the database.

pub mod list_event_types;
mod list_subjects;
mod ping;
mod register_event_schema;
mod verify_api_token;
mod write_events;

pub use list_event_types::ListEventTypesRequest;
pub use list_subjects::ListSubjectsRequest;
pub use ping::PingRequest;
pub use register_event_schema::RegisterEventSchemaRequest;
pub use verify_api_token::VerifyApiTokenRequest;
pub use write_events::WriteEventsRequest;

use crate::error::ClientError;
use futures::{Stream, stream::TryStreamExt};
use futures_util::io;
use reqwest::Method;
use serde::{Serialize, de::DeserializeOwned};
use serde::Serialize;
use serde::de::DeserializeOwned;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_stream::wrappers::LinesStream;
use tokio_util::io::StreamReader;

/// Represents a request to the database client
pub trait ClientRequest {
Expand Down Expand Up @@ -42,3 +54,23 @@ pub trait OneShotRequest: ClientRequest {
Ok(())
}
}

/// Represents a request to the database that expects a stream of responses
pub trait StreamingRequest: ClientRequest {
type ItemType: DeserializeOwned;

fn build_stream(
self,
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>>;

fn lines_stream(
response: reqwest::Response,
) -> impl Stream<Item = Result<String, ClientError>> {
let bytes = response
.bytes_stream()
.map_err(|err| io::Error::other(format!("Failed to read response stream: {err}")));
let stream_reader = StreamReader::new(bytes);
LinesStream::new(BufReader::new(stream_reader).lines()).map_err(ClientError::from)
}
}
59 changes: 59 additions & 0 deletions src/client/client_request/list_event_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use futures::{Stream, stream::StreamExt};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::error::ClientError;

use super::{ClientRequest, StreamingRequest};
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct EventType {
#[serde(rename = "eventType")]
pub name: String,
pub is_phantom: bool,
pub schema: Option<Value>,
}

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ListEventTypesRequest;

impl ClientRequest for ListEventTypesRequest {
const URL_PATH: &'static str = "/api/v1/read-event-types";
const METHOD: Method = Method::POST;

fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
Some(Ok(self))
}
}
impl StreamingRequest for ListEventTypesRequest {
type ItemType = EventType;

fn build_stream(
self,
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
#[derive(Deserialize, Debug)]
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
enum LineItem {
Error { error: String },
EventType(EventType),
}

impl From<LineItem> for Result<EventType, ClientError> {
fn from(item: LineItem) -> Self {
match item {
LineItem::Error { error } => Err(ClientError::DBError(error)),
LineItem::EventType(event_type) => Ok(event_type),
}
}
}

Self::lines_stream(response).map(|line| {
let line = line?;
let item: LineItem = serde_json::from_str(line.as_str())?;
item.into()
})
}
}
Loading