From 6cab3030300922f9c3bae4cf5125473c5b65c614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Tue, 13 May 2025 21:18:23 +0200 Subject: [PATCH 01/12] feat(client): add metadata and discovery compliance matching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This implementation is incomplete and lacking tests Signed-off-by: Raphael Höser --- Cargo.lock | 269 ++++++++++++++++++ Cargo.toml | 8 +- src/client.rs | 94 +++++- src/client/client_request.rs | 78 ++--- src/client/client_request/list_event_types.rs | 56 ++++ src/client/client_request/list_subjects.rs | 48 ++++ src/client/client_request/ping.rs | 24 ++ .../client_request/register_event_schema.rs | 42 +++ src/client/client_request/verify_api_token.rs | 24 ++ src/error.rs | 17 +- src/lib.rs | 1 + tests/metadata_and_discovery.rs | 42 +++ 12 files changed, 660 insertions(+), 43 deletions(-) create mode 100644 src/client/client_request/list_event_types.rs create mode 100644 src/client/client_request/list_subjects.rs create mode 100644 src/client/client_request/ping.rs create mode 100644 src/client/client_request/register_event_schema.rs create mode 100644 src/client/client_request/verify_api_token.rs create mode 100644 tests/metadata_and_discovery.rs diff --git a/Cargo.lock b/Cargo.lock index f2f3425..fe7bbe3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,20 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "getrandom 0.3.2", + "once_cell", + "serde", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -113,6 +127,21 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bit-set" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" + [[package]] name = "bitflags" version = "1.3.2" @@ -175,12 +204,24 @@ dependencies = [ "serde_with", ] +[[package]] +name = "borrow-or-share" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3eeab4423108c5d7c744f4d234de88d18d636100093ae04caf4825134b9c3a32" + [[package]] name = "bumpalo" version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +[[package]] +name = "bytecount" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" + [[package]] name = "bytes" version = "1.10.1" @@ -362,6 +403,15 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "email_address" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e079f19b08ca6239f47f8ba8509c11cf3ea30095831f7fed61441475edd8c449" +dependencies = [ + "serde", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -417,16 +467,32 @@ dependencies = [ "chrono", "cloudevents-sdk", "eventsourcingdb-client-rust", + "futures", + "futures-util", + "jsonschema", "reqwest", "serde", "serde_json", "testcontainers", "thiserror 2.0.12", "tokio", + "tokio-stream", "tokio-test", + "tokio-util", "url", ] +[[package]] +name = "fancy-regex" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e24cb5a94bcae1e5408b0effca5cd7172ea3c5755049c5f3af4cd283a165298" +dependencies = [ + "bit-set", + "regex-automata", + "regex-syntax", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -445,6 +511,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "fluent-uri" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1918b65d96df47d3591bed19c5cca17e3fa5d0707318e4b5ef2eae01764df7e5" +dependencies = [ + "borrow-or-share", + "ref-cast", + "serde", +] + [[package]] name = "fnv" version = "1.0.7" @@ -475,6 +552,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fraction" +version = "0.15.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f158e3ff0a1b334408dc9fb811cd99b446986f4d8b741bb08f9df1604085ae7" +dependencies = [ + "lazy_static", + "num", +] + [[package]] name = "futures" version = "0.3.31" @@ -1082,6 +1169,39 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonschema" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b46a0365a611fbf1d2143104dcf910aada96fafd295bab16c60b802bf6fa1d" +dependencies = [ + "ahash", + "base64 0.22.1", + "bytecount", + "email_address", + "fancy-regex", + "fraction", + "idna", + "itoa", + "num-cmp", + "num-traits", + "once_cell", + "percent-encoding", + "referencing", + "regex", + "regex-syntax", + "reqwest", + "serde", + "serde_json", + "uuid-simd", +] + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.172" @@ -1191,12 +1311,82 @@ dependencies = [ "tempfile", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-cmp" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63335b2e2c34fae2fb0aa2cecfd9f0832a1e24b3b32ecec612c3426d46dc8aaa" + +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1265,6 +1455,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + [[package]] name = "parking_lot" version = "0.12.3" @@ -1507,6 +1703,40 @@ dependencies = [ "bitflags 2.9.0", ] +[[package]] +name = "ref-cast" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a0ae411dbe946a674d89546582cea4ba2bb8defac896622d6496f14c23ba5cf" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1165225c21bff1f3bbce98f5a1f889949bc902d3575308cc7b0de30b4f6d27c7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "referencing" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8eff4fa778b5c2a57e85c5f2fe3a709c52f0e60d23146e2151cbef5893f420e" +dependencies = [ + "ahash", + "fluent-uri", + "once_cell", + "parking_lot", + "percent-encoding", + "serde_json", +] + [[package]] name = "regex" version = "1.11.1" @@ -1545,6 +1775,7 @@ dependencies = [ "base64 0.22.1", "bytes", "encoding_rs", + "futures-channel", "futures-core", "futures-util", "h2", @@ -1577,11 +1808,13 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", + "tokio-util", "tower", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 0.26.11", "windows-registry", @@ -2340,12 +2573,35 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "uuid-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b082222b4f6619906941c17eb2297fff4c2fb96cb60164170522942a200bd8" +dependencies = [ + "outref", + "uuid", + "vsimd", +] + [[package]] name = "vcpkg" version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "want" version = "0.3.1" @@ -2441,6 +2697,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.77" diff --git a/Cargo.toml b/Cargo.toml index 1a22ad5..c58ee8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,13 +12,19 @@ testcontainer = ["dep:testcontainers"] chrono = { version = "0.4.41", features = ["serde"] } cloudevents-sdk = { version = "0.8.0", features = ["reqwest"], optional = true } url = "2.5.4" -reqwest = { version = "0.12.15", features = ["json"] } +reqwest = { version = "0.12.15", features = ["json", "stream"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" testcontainers = { version = "0.24.0", features = [ "http_wait", ], optional = true } thiserror = "2.0.12" +jsonschema = "0.30.0" +futures-util = "0.3.31" +tokio-util = { version = "0.7.15", features = ["io"] } +tokio-stream = { version = "0.1.17", features = ["io-util"] } +futures = "0.3.31" +tokio = { version = "1.44.2", features = ["io-util"] } [dev-dependencies] eventsourcingdb-client-rust = { path = ".", features = ["testcontainer"] } diff --git a/src/client.rs b/src/client.rs index 3d9211f..31dfb4c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,12 +19,15 @@ mod client_request; -use client_request::{ClientRequest, PingRequest, VerifyApiTokenRequest}; +use client_request::{ + list_event_types::EventType, ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest +}; +use futures::Stream; use reqwest; use url::Url; -use crate::error::ClientError; +use crate::{error::ClientError, event::ManagementEvent}; /// Client for an [EventsourcingDB](https://www.eventsourcingdb.io/) instance. #[derive(Debug)] @@ -72,9 +75,14 @@ impl Client { /// Utility function to request an endpoint of the API. /// + /// This function will return a [`reqwest::RequestBuilder`] which can be used to send the request. + /// /// # Errors /// This function will return an error if the request fails or if the URL is invalid. - async fn request(&self, endpoint: R) -> Result { + fn build_request( + &self, + endpoint: &R, + ) -> Result { let url = self .base_url .join(endpoint.url_path()) @@ -93,15 +101,49 @@ impl Client { } else { request }; + Ok(request) + } - let response = request.send().await?; + /// Utility function to request an endpoint of the API as a oneshot. + /// + /// This means, that the response is not streamed, but returned as a single value. + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + async fn request_oneshot( + &self, + endpoint: R, + ) -> Result { + let response = self.build_request(&endpoint)?.send().await?; if response.status().is_success() { let result = response.json().await?; endpoint.validate_response(&result)?; Ok(result) } else { - Err(ClientError::DBError( + Err(ClientError::DBApiError( + response.status(), + response.text().await.unwrap_or_default(), + )) + } + } + + /// Utility function to request an endpoint of the API as a stream. + /// + /// This means, that the response is streamed and returned as a stream of values. + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + async fn request_streaming( + &self, + endpoint: R, + ) -> Result>, ClientError> { + let response = self.build_request(&endpoint)?.send().await?; + + if response.status().is_success() { + Ok(endpoint.build_stream(response)) + } else { + Err(ClientError::DBApiError( response.status(), response.text().await.unwrap_or_default(), )) @@ -125,7 +167,7 @@ impl Client { /// # Errors /// This function will return an error if the request fails or if the URL is invalid. pub async fn ping(&self) -> Result<(), ClientError> { - let _ = self.request(PingRequest).await?; + let _ = self.request_oneshot(PingRequest).await?; Ok(()) } @@ -146,7 +188,45 @@ impl Client { /// # Errors /// This function will return an error if the request fails or if the URL is invalid. pub async fn verify_api_token(&self) -> Result<(), ClientError> { - let _ = self.request(VerifyApiTokenRequest).await?; + let _ = self.request_oneshot(VerifyApiTokenRequest).await?; Ok(()) } + + /// Registers an event schema with the DB instance. + /// + /// # Errors + /// This function will return an error if the request fails or if the provided schema is invalid. + pub async fn register_event_schema( + &self, + event_type: &str, + schema: &serde_json::Value, + ) -> Result { + self.request_oneshot(RegisterEventSchemaRequest::try_new(event_type, schema)?) + .await + } + + /// List all subjects in the DB instance. + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + pub async fn list_subjects( + &self, + base_subject: Option<&str>, + ) -> Result>, ClientError> { + let response = self + .request_streaming(ListSubjectsRequest { base_subject }) + .await?; + Ok(response) + } + + /// List all event types in the DB instance. + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + pub async fn list_event_types( + &self, + ) -> Result>, ClientError> { + let response = self.request_streaming(ListEventTypesRequest).await?; + Ok(response) + } } diff --git a/src/client/client_request.rs b/src/client/client_request.rs index cc389a9..91cfdf7 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -1,15 +1,31 @@ //! This is a purely internal module to represent client requests to the database. -use reqwest::Method; -use serde_json::Value; +pub mod list_event_types; +mod list_subjects; +mod ping; +mod register_event_schema; +mod verify_api_token; + +pub use list_event_types::ListEventTypesRequest; +pub use list_subjects::ListSubjectsRequest; +pub use ping::PingRequest; +pub use register_event_schema::RegisterEventSchemaRequest; +pub use verify_api_token::VerifyApiTokenRequest; -use crate::{error::ClientError, event::ManagementEvent}; +use crate::error::ClientError; +use futures::{Stream, stream::TryStreamExt}; +use futures_util::io; +use reqwest::Method; +use serde::Serialize; +use serde::de::DeserializeOwned; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio_stream::wrappers::LinesStream; +use tokio_util::io::StreamReader; /// Represents a request to the database client pub trait ClientRequest { const URL_PATH: &'static str; const METHOD: Method; - type Response: serde::de::DeserializeOwned; /// Returns the URL path for the request fn url_path(&self) -> &'static str { @@ -22,9 +38,14 @@ pub trait ClientRequest { } /// Returns the body for the request - fn body(&self) -> Option> { - None + fn body(&self) -> Option> { + None::> } +} + +/// Represents a request to the database that expects a single response +pub trait OneShotRequest: ClientRequest { + type Response: DeserializeOwned; /// Validate the response from the database fn validate_response(&self, _response: &Self::Response) -> Result<(), ClientError> { @@ -32,34 +53,25 @@ pub trait ClientRequest { } } -/// Ping the Database instance -#[derive(Debug, Clone, Copy)] -pub struct PingRequest; - -impl ClientRequest for PingRequest { - const URL_PATH: &'static str = "/api/v1/ping"; - const METHOD: Method = Method::GET; - type Response = ManagementEvent; - - fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { - (response.ty() == "io.eventsourcingdb.api.ping-received") - .then_some(()) - .ok_or(ClientError::PingFailed) - } -} - -/// Verify the API token -#[derive(Debug, Clone, Copy)] -pub struct VerifyApiTokenRequest; +/// Represents a request to the database that expects a stream of responses +pub trait StreamingRequest: ClientRequest { + type ItemType: DeserializeOwned; -impl ClientRequest for VerifyApiTokenRequest { - const URL_PATH: &'static str = "/api/v1/verify-api-token"; - const METHOD: Method = Method::POST; - type Response = ManagementEvent; + fn build_stream( + self, + response: reqwest::Response, + ) -> impl Stream>; - fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { - (response.ty() == "io.eventsourcingdb.api.api-token-verified") - .then_some(()) - .ok_or(ClientError::APITokenInvalid) + fn lines_stream( + response: reqwest::Response, + ) -> impl Stream> { + let bytes = response.bytes_stream().map_err(|err| { + io::Error::new( + io::ErrorKind::Other, + format!("Failed to read response stream: {err}"), + ) + }); + let stream_reader = StreamReader::new(bytes); + LinesStream::new(BufReader::new(stream_reader).lines()).map_err(ClientError::from) } } diff --git a/src/client/client_request/list_event_types.rs b/src/client/client_request/list_event_types.rs new file mode 100644 index 0000000..4a21d4c --- /dev/null +++ b/src/client/client_request/list_event_types.rs @@ -0,0 +1,56 @@ +use futures::{Stream, stream::StreamExt}; +use reqwest::Method; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::error::ClientError; + +use super::{ClientRequest, StreamingRequest}; +#[derive(Deserialize, Debug)] +pub struct EventType { + pub name: String, + pub is_phantom: bool, + pub schema: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ListEventTypesRequest; + +impl ClientRequest for ListEventTypesRequest { + const URL_PATH: &'static str = "/api/v1/read-event-types"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} +impl StreamingRequest for ListEventTypesRequest { + type ItemType = EventType; + + fn build_stream( + self, + response: reqwest::Response, + ) -> impl Stream> { + #[derive(Deserialize, Debug)] + enum LineItem { + Error(String), + EventType(EventType), + } + + impl From for Result { + fn from(item: LineItem) -> Self { + match item { + LineItem::Error(err) => Err(ClientError::DBError(err)), + LineItem::EventType(event_type) => Ok(event_type), + } + } + } + + Self::lines_stream(response).map(|line| { + let line = line?; + let item: LineItem = serde_json::from_str(line.as_str())?; + item.into() + }) + } +} diff --git a/src/client/client_request/list_subjects.rs b/src/client/client_request/list_subjects.rs new file mode 100644 index 0000000..2be1456 --- /dev/null +++ b/src/client/client_request/list_subjects.rs @@ -0,0 +1,48 @@ +use futures::{Stream, stream::StreamExt}; +use reqwest::Method; +use serde::{Deserialize, Serialize}; + +use crate::error::ClientError; + +use super::{ClientRequest, StreamingRequest}; + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ListSubjectsRequest<'a> { + pub base_subject: Option<&'a str>, +} + +impl<'a> ClientRequest for ListSubjectsRequest<'a> { + const URL_PATH: &'static str = "/api/v1/read-subjects"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} +impl<'a> StreamingRequest for ListSubjectsRequest<'a> { + type ItemType = String; + + fn build_stream( + self, + response: reqwest::Response, + ) -> impl Stream> { + #[derive(Deserialize, Debug)] + struct LineItem { + payload: LineItemPayload, + r#type: String, + } + #[derive(Deserialize, Debug)] + struct LineItemPayload { + subject: String, + } + Self::lines_stream(response).map(|line| { + let line = line?; + let item: LineItem = serde_json::from_str(line.as_str())?; + if item.r#type != "subject" { + return Err(ClientError::InvalidEventType); + } + Ok(item.payload.subject) + }) + } +} diff --git a/src/client/client_request/ping.rs b/src/client/client_request/ping.rs new file mode 100644 index 0000000..5eaedd5 --- /dev/null +++ b/src/client/client_request/ping.rs @@ -0,0 +1,24 @@ +use reqwest::Method; + +use crate::{error::ClientError, event::ManagementEvent}; + +use super::{ClientRequest, OneShotRequest}; + +/// Ping the Database instance +#[derive(Debug, Clone, Copy)] +pub struct PingRequest; + +impl ClientRequest for PingRequest { + const URL_PATH: &'static str = "/api/v1/ping"; + const METHOD: Method = Method::GET; +} + +impl OneShotRequest for PingRequest { + type Response = ManagementEvent; + + fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { + (response.ty() == "io.eventsourcingdb.api.ping-received") + .then_some(()) + .ok_or(ClientError::PingFailed) + } +} diff --git a/src/client/client_request/register_event_schema.rs b/src/client/client_request/register_event_schema.rs new file mode 100644 index 0000000..f42720a --- /dev/null +++ b/src/client/client_request/register_event_schema.rs @@ -0,0 +1,42 @@ +use reqwest::Method; +use serde::Serialize; +use serde_json::Value; + +use crate::{error::ClientError, event::ManagementEvent}; + +use super::{ClientRequest, OneShotRequest}; + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RegisterEventSchemaRequest<'a> { + event_type: &'a str, + schema: &'a Value, +} + +impl<'a> RegisterEventSchemaRequest<'a> { + pub fn try_new(event_type: &'a str, schema: &'a Value) -> Result { + if event_type.is_empty() { + return Err(ClientError::InvalidEventType); + } + jsonschema::meta::validate(schema).map_err(|_e| ClientError::JsonSchemaError)?; + Ok(Self { event_type, schema }) + } +} + +impl<'a> ClientRequest for RegisterEventSchemaRequest<'a> { + const URL_PATH: &'static str = "/api/v1/register-event-schema"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} +impl<'a> OneShotRequest for RegisterEventSchemaRequest<'a> { + type Response = ManagementEvent; + + fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { + (response.ty() == "io.eventsourcingdb.api.event-schema-registered") + .then_some(()) + .ok_or(ClientError::InvalidEventType) + } +} diff --git a/src/client/client_request/verify_api_token.rs b/src/client/client_request/verify_api_token.rs new file mode 100644 index 0000000..c1c015e --- /dev/null +++ b/src/client/client_request/verify_api_token.rs @@ -0,0 +1,24 @@ +use reqwest::Method; + +use crate::{error::ClientError, event::ManagementEvent}; + +use super::{ClientRequest, OneShotRequest}; + +/// Verify the API token +#[derive(Debug, Clone, Copy)] +pub struct VerifyApiTokenRequest; + +impl ClientRequest for VerifyApiTokenRequest { + const URL_PATH: &'static str = "/api/v1/verify-api-token"; + const METHOD: Method = Method::POST; +} + +impl OneShotRequest for VerifyApiTokenRequest { + type Response = ManagementEvent; + + fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { + (response.ty() == "io.eventsourcingdb.api.api-token-verified") + .then_some(()) + .ok_or(ClientError::APITokenInvalid) + } +} diff --git a/src/error.rs b/src/error.rs index 89b61b9..ec007f4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,9 +6,15 @@ use thiserror::Error; /// Error type for the client #[derive(Debug, Error)] pub enum ClientError { + /// An IO Error occurred + #[error("An IO error occurred: {0}")] + IoError(#[from] std::io::Error), /// The provided request method is invalid #[error("The provided request method is invalid")] InvalidRequestMethod, + /// The provided event type is invalid + #[error("The provided event type is invalid")] + InvalidEventType, /// The provided API token is invalid #[error("The provided API token is invalid")] APITokenInvalid, @@ -24,9 +30,16 @@ pub enum ClientError { /// There was a problem with the JSON serialization #[error("The JSON serialization failed: {0}")] SerdeJsonError(#[from] serde_json::Error), - /// The DB returned an error+ + /// The DB returned an error #[error("The DB returned an error: {0}")] - DBError(StatusCode, String), + DBApiError(StatusCode, String), + /// The DB returned an error in the response + #[error("The DB returned an error in the response: {0}")] + DBError(String), + // check if this can hold a validation error in the future + /// The passed jsonschema is invalid + #[error("The passed jsonschema is invalid")] + JsonSchemaError, /// There was a problem with the `cloudevents` message #[cfg(feature = "cloudevents")] #[error("The CloudEvents message is invalid: {0}")] diff --git a/src/lib.rs b/src/lib.rs index 344f1be..90b8923 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![doc = include_str!("../README.md")] +#![allow(clippy::needless_lifetimes)] #![deny( ambiguous_negative_literals, clippy::pedantic, diff --git a/tests/metadata_and_discovery.rs b/tests/metadata_and_discovery.rs new file mode 100644 index 0000000..02ce240 --- /dev/null +++ b/tests/metadata_and_discovery.rs @@ -0,0 +1,42 @@ +use eventsourcingdb_client_rust::container::Container; +use serde_json::json; + +#[tokio::test] +async fn register_event_schema() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + client + .register_event_schema( + "io.eventsourcingdb.test", + &json!({ + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "name": { + "type": "string" + } + }, + "required": ["id", "name"] + }), + ) + .await + .expect("Failed to register event schema"); +} + + +#[tokio::test] +async fn register_invalid_event_schema() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let res = client + .register_event_schema( + "io.eventsourcingdb.test", + &json!({ + "x": "asd" + }), + ) + .await; + assert!(res.is_err(), "Expected an error, but got: {:?}", res); +} From 6c86db26b9533560c73d7a0d7be8132cb54ef320 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Tue, 13 May 2025 22:07:22 +0200 Subject: [PATCH 02/12] fix(client): correct default for listing subjects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Raphael Höser --- README.md | 2 +- src/client.rs | 8 +- src/client/client_request/list_event_types.rs | 7 +- src/client/client_request/list_subjects.rs | 2 +- tests/metadata_and_discovery.rs | 80 ++++++++++++++++++- 5 files changed, 92 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 7e04257..59c3054 100644 --- a/README.md +++ b/README.md @@ -12,5 +12,5 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c - ❌ [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events) - ❌ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql) - ❌ [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events) -- ❌ [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery) +- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery) - 🚀 [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support) diff --git a/src/client.rs b/src/client.rs index 31dfb4c..ca2a179 100644 --- a/src/client.rs +++ b/src/client.rs @@ -20,7 +20,9 @@ mod client_request; use client_request::{ - list_event_types::EventType, ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest + ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest, + RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, + list_event_types::EventType, }; use futures::Stream; @@ -214,7 +216,9 @@ impl Client { base_subject: Option<&str>, ) -> Result>, ClientError> { let response = self - .request_streaming(ListSubjectsRequest { base_subject }) + .request_streaming(ListSubjectsRequest { + base_subject: base_subject.unwrap_or("/"), + }) .await?; Ok(response) } diff --git a/src/client/client_request/list_event_types.rs b/src/client/client_request/list_event_types.rs index 4a21d4c..538c8e7 100644 --- a/src/client/client_request/list_event_types.rs +++ b/src/client/client_request/list_event_types.rs @@ -7,7 +7,9 @@ use crate::error::ClientError; use super::{ClientRequest, StreamingRequest}; #[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] pub struct EventType { + #[serde(rename = "eventType")] pub name: String, pub is_phantom: bool, pub schema: Option, @@ -33,15 +35,16 @@ impl StreamingRequest for ListEventTypesRequest { response: reqwest::Response, ) -> impl Stream> { #[derive(Deserialize, Debug)] + #[serde(tag = "type", content = "payload", rename_all = "camelCase")] enum LineItem { - Error(String), + Error { error: String }, EventType(EventType), } impl From for Result { fn from(item: LineItem) -> Self { match item { - LineItem::Error(err) => Err(ClientError::DBError(err)), + LineItem::Error { error } => Err(ClientError::DBError(error)), LineItem::EventType(event_type) => Ok(event_type), } } diff --git a/src/client/client_request/list_subjects.rs b/src/client/client_request/list_subjects.rs index 2be1456..c70ea36 100644 --- a/src/client/client_request/list_subjects.rs +++ b/src/client/client_request/list_subjects.rs @@ -9,7 +9,7 @@ use super::{ClientRequest, StreamingRequest}; #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct ListSubjectsRequest<'a> { - pub base_subject: Option<&'a str>, + pub base_subject: &'a str, } impl<'a> ClientRequest for ListSubjectsRequest<'a> { diff --git a/tests/metadata_and_discovery.rs b/tests/metadata_and_discovery.rs index 02ce240..b32fe8b 100644 --- a/tests/metadata_and_discovery.rs +++ b/tests/metadata_and_discovery.rs @@ -1,4 +1,5 @@ use eventsourcingdb_client_rust::container::Container; +use futures::StreamExt; use serde_json::json; #[tokio::test] @@ -25,7 +26,6 @@ async fn register_event_schema() { .expect("Failed to register event schema"); } - #[tokio::test] async fn register_invalid_event_schema() { let container = Container::start_default().await.unwrap(); @@ -40,3 +40,81 @@ async fn register_invalid_event_schema() { .await; assert!(res.is_err(), "Expected an error, but got: {:?}", res); } + +#[tokio::test] +async fn list_all_subjects() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let res = client.list_subjects(None).await; + match res { + Ok(subjects) => { + let subjects = subjects.collect::>().await; + assert!( + subjects.is_empty(), + "Expected no subjects, but got: {:?}", + subjects + ); + } + Err(err) => panic!("Failed to list subjects: {:?}", err), + } +} + +//TODO!: add list all subjects test after writing to db + +//TODO!: add list scoped subjects test after writing to db + +#[tokio::test] +async fn list_all_event_types() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let test_event_type = "io.eventsourcingdb.test"; + let schema = json!({ + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "name": { + "type": "string" + } + }, + "required": ["id", "name"] + }); + client + .register_event_schema(test_event_type, &schema) + .await + .expect("Failed to register event schema"); + let res = client.list_event_types().await; + match res { + Ok(event_types) => { + let mut event_types = event_types.collect::>().await; + assert!( + event_types.len() == 1, + "Expected one event types, but got: {:?}", + event_types + ); + assert!(event_types[0].is_ok(), "Expected event type to be ok"); + let response_event_type = event_types.pop().unwrap().unwrap(); + assert_eq!( + response_event_type.name, test_event_type, + "Expected event type to be 'io.eventsourcingdb.test', but got: {:?}", + response_event_type.name + ); + assert_eq!( + response_event_type.schema.as_ref(), + Some(&schema), + "Expected event type schema to be {:?}, but got: {:?}", + schema, + response_event_type.schema + ); + assert!( + response_event_type.is_phantom, + "Expected event type is_phantom to be true, but got: {:?}", + response_event_type.is_phantom + ); + } + Err(err) => panic!("Failed to list event types: {:?}", err), + } +} + +// TODO!: add list event types test after writing to db From e70b6ee9ec6fd16ad50bebe8747bec1ff2816477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Thu, 15 May 2025 13:37:18 +0200 Subject: [PATCH 03/12] chore(client): run cargo fmt --- src/container.rs | 10 +-- src/event.rs | 2 +- src/event/event_types/event.rs | 3 +- src/event/event_types/event_candidate.rs | 2 +- src/event/event_types/management_event.rs | 10 +-- src/event/trace_info.rs | 3 +- tests/essentials.rs | 5 +- tests/write_events.rs | 96 +++++++++-------------- 8 files changed, 57 insertions(+), 74 deletions(-) diff --git a/src/container.rs b/src/container.rs index b5833ea..d0af12a 100644 --- a/src/container.rs +++ b/src/container.rs @@ -137,10 +137,10 @@ impl ContainerBuilder { } /// A running test container for the [EventSourcingDB](https://www.eventsourcingdb.io/). -/// +/// /// Aside from managing the container, this struct also provides methods to get the data needed to connect to /// the database or even a fully configured client. -/// +/// /// You'll most likely want to use the [`Container::start_default`] method to create a new container instance for your tests. /// For more details, see the [`crate::container`] module documentation. /// ``` @@ -167,7 +167,7 @@ impl Container { } /// Shortcut method to start the container with default settings. - /// + /// /// This is the same as calling [`Container::builder`] and then [`ContainerBuilder::start`]. /// In most cases this will create a contaienr with the latest image tag and a working configuration. /// @@ -178,7 +178,7 @@ impl Container { } /// Get the host of the container. - /// + /// /// This is the host that you can use to connect to the database. In most cases this will be `localhost`. /// /// # Errors @@ -188,7 +188,7 @@ impl Container { } /// Get the mapped port for the database. - /// + /// /// This is the port that you can use to connect to the database. This will be a random port that is mapped to the internal port configured via [`ContainerBuilder::with_port`]. /// /// # Errors diff --git a/src/event.rs b/src/event.rs index f927015..be7fe0f 100644 --- a/src/event.rs +++ b/src/event.rs @@ -10,5 +10,5 @@ pub use event_types::event_candidate::EventCandidate; pub use event_types::management_event::ManagementEvent; pub use trace_info::TraceInfo; -#[cfg(feature="cloudevents")] +#[cfg(feature = "cloudevents")] use crate::error::EventError; diff --git a/src/event/event_types/event.rs b/src/event/event_types/event.rs index 1c730ce..93ed39d 100644 --- a/src/event/event_types/event.rs +++ b/src/event/event_types/event.rs @@ -2,8 +2,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::event::{trace_info::TraceInfo, EventCandidate}; - +use crate::event::{EventCandidate, trace_info::TraceInfo}; /// Represents an event that has been received from the DB. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/src/event/event_types/event_candidate.rs b/src/event/event_types/event_candidate.rs index 9d5947d..fd21a0c 100644 --- a/src/event/event_types/event_candidate.rs +++ b/src/event/event_types/event_candidate.rs @@ -1,7 +1,7 @@ +use crate::event::trace_info::TraceInfo; use serde::{Deserialize, Serialize}; use serde_json::Value; use typed_builder::TypedBuilder; -use crate::event::trace_info::TraceInfo; #[cfg(feature = "cloudevents")] use crate::error::EventError; diff --git a/src/event/event_types/management_event.rs b/src/event/event_types/management_event.rs index f47893c..56c34dd 100644 --- a/src/event/event_types/management_event.rs +++ b/src/event/event_types/management_event.rs @@ -1,11 +1,11 @@ +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use chrono::{DateTime, Utc}; /// Represents a management event that has been received from the DB. -/// +/// /// For management requests like [`crate::client::Client::ping`] and [`crate::client::Client::verify_api_token`] the DB will send a management event. -/// +/// /// Compared to a normal Event, this does not contain the following fields: /// - hash /// - predecessorhash @@ -18,7 +18,7 @@ pub struct ManagementEvent { source: String, specversion: String, subject: String, - time: DateTime, + time: DateTime, r#type: String, } @@ -61,7 +61,7 @@ impl ManagementEvent { &self.time } /// Get the type of an event. - /// + /// /// This method is called `ty` to avoid conflicts with the `type` keyword in Rust. #[must_use] pub fn ty(&self) -> &str { diff --git a/src/event/trace_info.rs b/src/event/trace_info.rs index 7647423..598a415 100644 --- a/src/event/trace_info.rs +++ b/src/event/trace_info.rs @@ -10,7 +10,6 @@ use serde::{Deserialize, Serialize}; pub enum TraceInfo { // LEAVE ORDER AS IS // This is important for deserialization as the traceparent is always present - /// The traceparent and tracestate of the event. /// This is used for distributed tracing. WithState { @@ -52,7 +51,7 @@ impl TraceInfo { /// /// # Errors /// If the cloudevent contains a tracestate but no traceparent, an error will be returned. - #[cfg(feature="cloudevents")] + #[cfg(feature = "cloudevents")] pub fn from_cloudevent(event: &cloudevents::Event) -> Result, EventError> { let traceparent = event.extension("traceparent").map(ToString::to_string); let tracestate = event.extension("tracestate").map(ToString::to_string); diff --git a/tests/essentials.rs b/tests/essentials.rs index 49b2615..f4a835d 100644 --- a/tests/essentials.rs +++ b/tests/essentials.rs @@ -18,7 +18,10 @@ async fn ping_unavailable_server_errors() { async fn verify_api_token() { let container = Container::start_default().await.unwrap(); let client = container.get_client().await.unwrap(); - client.verify_api_token().await.expect("Failed to verify API token"); + client + .verify_api_token() + .await + .expect("Failed to verify API token"); } #[tokio::test] diff --git a/tests/write_events.rs b/tests/write_events.rs index 4883614..ff38cf4 100644 --- a/tests/write_events.rs +++ b/tests/write_events.rs @@ -1,7 +1,9 @@ mod utils; use eventsourcingdb_client_rust::{ - client::Precondition, container::Container, event::{EventCandidate, TraceInfo} + client::Precondition, + container::Container, + event::{EventCandidate, TraceInfo}, }; use serde_json::json; use utils::{ @@ -46,11 +48,9 @@ async fn write_event_with_is_pristine_condition_on_empty_subject() { let result = client .write_events( vec![event_candidate.clone()], - vec![ - Precondition::IsSubjectPristine { - subject: event_candidate.subject.clone(), - }, - ], + vec![Precondition::IsSubjectPristine { + subject: event_candidate.subject.clone(), + }], ) .await; assert!(result.is_ok(), "Failed to write events: {:?}", result); @@ -74,11 +74,9 @@ async fn write_event_with_is_pristine_condition_on_non_empty_subject() { let result = client .write_events( vec![event_candidate.clone()], - vec![ - Precondition::IsSubjectPristine { - subject: event_candidate.subject.clone(), - }, - ], + vec![Precondition::IsSubjectPristine { + subject: event_candidate.subject.clone(), + }], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); @@ -96,11 +94,9 @@ async fn write_events_with_is_pristine_condition_on_empty_subject() { let result = client .write_events( event_candidates.clone(), - vec![ - Precondition::IsSubjectPristine { - subject: event_candidates[1].subject.clone(), - }, - ], + vec![Precondition::IsSubjectPristine { + subject: event_candidates[1].subject.clone(), + }], ) .await; assert!(result.is_ok(), "Failed to write events: {:?}", result); @@ -125,11 +121,9 @@ async fn write_events_with_is_pristine_condition_on_non_empty_subject() { let result = client .write_events( event_candidates, - vec![ - Precondition::IsSubjectPristine { - subject: fill_event_candidate.subject.clone(), - }, - ], + vec![Precondition::IsSubjectPristine { + subject: fill_event_candidate.subject.clone(), + }], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); @@ -144,12 +138,10 @@ async fn write_event_with_is_subject_on_event_id_condition_on_empty_subject() { let result = client .write_events( vec![event_candidate.clone()], - vec![ - Precondition::IsSubjectOnEventId { - subject: event_candidate.subject.clone(), - event_id: "100".to_string(), - }, - ], + vec![Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: "100".to_string(), + }], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); @@ -170,12 +162,10 @@ async fn write_event_with_is_subject_on_event_id_condition_on_non_empty_subject_ let result = client .write_events( vec![event_candidate.clone()], - vec![ - Precondition::IsSubjectOnEventId { - subject: event_candidate.subject.clone(), - event_id: written.id().to_string(), - }, - ], + vec![Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: written.id().to_string(), + }], ) .await; assert!(result.is_ok(), "Writing the event failed: {:?}", result); @@ -196,12 +186,10 @@ async fn write_event_with_is_subject_on_event_id_condition_on_non_empty_subject_ let result = client .write_events( vec![event_candidate.clone()], - vec![ - Precondition::IsSubjectOnEventId { - subject: event_candidate.subject.clone(), - event_id: 100.to_string(), - }, - ], + vec![Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: 100.to_string(), + }], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); @@ -219,12 +207,10 @@ async fn write_events_with_is_subject_on_event_id_condition_on_empty_subject() { let result = client .write_events( event_candidates.clone(), - vec![ - Precondition::IsSubjectOnEventId { - subject: event_candidates[1].subject.clone(), - event_id: "100".to_string(), - }, - ], + vec![Precondition::IsSubjectOnEventId { + subject: event_candidates[1].subject.clone(), + event_id: "100".to_string(), + }], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); @@ -249,12 +235,10 @@ async fn write_events_with_is_subject_on_event_id_condition_on_non_empty_subject let result = client .write_events( event_candidates, - vec![ - Precondition::IsSubjectOnEventId { - subject: fill_event_candidate.subject.clone(), - event_id: written.id().to_string(), - }, - ], + vec![Precondition::IsSubjectOnEventId { + subject: fill_event_candidate.subject.clone(), + event_id: written.id().to_string(), + }], ) .await; assert!(result.is_ok(), "Writing the events failed: {:?}", result); @@ -279,12 +263,10 @@ async fn write_events_with_is_subject_on_event_id_condition_on_non_empty_subject let result = client .write_events( event_candidates, - vec![ - Precondition::IsSubjectOnEventId { - subject: fill_event_candidate.subject.clone(), - event_id: 100.to_string(), - }, - ], + vec![Precondition::IsSubjectOnEventId { + subject: fill_event_candidate.subject.clone(), + event_id: 100.to_string(), + }], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); From a6c814f55ecec2b997b9ac938915380c794aa250 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Thu, 15 May 2025 13:57:12 +0200 Subject: [PATCH 04/12] chore(client): further formatting and added documentation --- Cargo.toml | 10 +++--- src/client.rs | 96 ++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 93 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0473abd..b7f067b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,9 @@ testcontainer = ["dep:testcontainers"] [dependencies] chrono = { version = "0.4.41", features = ["serde"] } cloudevents-sdk = { version = "0.8.0", features = ["reqwest"], optional = true } -url = "2.5.4" +futures = "0.3.31" +futures-util = "0.3.31" +jsonschema = "0.30.0" reqwest = { version = "0.12.15", features = ["json", "stream"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" @@ -19,13 +21,11 @@ testcontainers = { version = "0.24.0", features = [ "http_wait", ], optional = true } thiserror = "2.0.12" -jsonschema = "0.30.0" -futures-util = "0.3.31" +tokio = { version = "1.44.2", features = ["io-util"] } tokio-util = { version = "0.7.15", features = ["io"] } tokio-stream = { version = "0.1.17", features = ["io-util"] } -futures = "0.3.31" -tokio = { version = "1.44.2", features = ["io-util"] } typed-builder = "0.21.0" +url = "2.5.4" [dev-dependencies] eventsourcingdb-client-rust = { path = ".", features = ["testcontainer"] } diff --git a/src/client.rs b/src/client.rs index a8cc1d4..933f93e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -20,22 +20,20 @@ mod client_request; mod precondition; +use crate::{ + error::ClientError, + event::{Event, EventCandidate, ManagementEvent}, +}; use client_request::{ ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest, list_event_types::EventType, }; - use futures::Stream; pub use precondition::Precondition; use reqwest; use url::Url; -use crate::{ - error::ClientError, - event::{Event, EventCandidate, ManagementEvent}, -}; - /// Client for an [EventsourcingDB](https://www.eventsourcingdb.io/) instance. #[derive(Debug)] pub struct Client { @@ -84,8 +82,6 @@ impl Client { /// /// This function will return a [`reqwest::RequestBuilder`] which can be used to send the request. /// - /// This function will return a [`reqwest::RequestBuilder`] which can be used to send the request. - /// /// # Errors /// This function will return an error if the request fails or if the URL is invalid. fn build_request( @@ -203,6 +199,34 @@ impl Client { /// Registers an event schema with the DB instance. /// + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let event_type = "io.eventsourcingdb.test"; + /// let schema = json!({ + /// "type": "object", + /// "properties": { + /// "id": { + /// "type": "string" + /// }, + /// "name": { + /// "type": "string" + /// } + /// }, + /// "required": ["id", "name"] + /// }); + /// client.register_event_schema(event_type, &schema).await.expect("Failed to list event types"); + /// # }) + /// ``` + /// /// # Errors /// This function will return an error if the request fails or if the provided schema is invalid. pub async fn register_event_schema( @@ -216,6 +240,44 @@ impl Client { /// List all subjects in the DB instance. /// + /// To get all subjects in the DB, just pass `None` as the `base_subject`. + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let mut subject_stream = client.list_subjects(None).await.expect("Failed to list event types"); + /// while let Some(subject) = subject_stream.next().await { + /// println!("Found Type {}", subject.expect("Error while reading types")); + /// } + /// # }) + /// ``` + /// + /// To get all subjects under /test in the DB, just pass `Some("/test")` as the `base_subject`. + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let mut subject_stream = client.list_subjects(Some("/test")).await.expect("Failed to list event types"); + /// while let Some(subject) = subject_stream.next().await { + /// println!("Found Type {}", subject.expect("Error while reading types")); + /// } + /// # }) + /// ``` + /// /// # Errors /// This function will return an error if the request fails or if the URL is invalid. pub async fn list_subjects( @@ -232,6 +294,24 @@ impl Client { /// List all event types in the DB instance. /// + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let mut type_stream = client.list_event_types().await.expect("Failed to list event types"); + /// while let Some(ty) = type_stream.next().await { + /// println!("Found Type {}", ty.expect("Error while reading types").name); + /// } + /// # }) + /// ``` + /// /// # Errors /// This function will return an error if the request fails or if the URL is invalid. pub async fn list_event_types( From 9d6d4fd1cd68f350d4bd61de904f691e6dea94f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Thu, 15 May 2025 17:27:10 +0200 Subject: [PATCH 05/12] chore(formatting): undo some formatting to make PR smaller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Raphael Höser --- src/container.rs | 10 +-- src/event.rs | 2 +- src/event/event_types/event.rs | 3 +- src/event/event_types/event_candidate.rs | 2 +- src/event/event_types/management_event.rs | 10 +-- src/event/trace_info.rs | 3 +- tests/essentials.rs | 5 +- tests/write_events.rs | 96 ++++++++++++++--------- 8 files changed, 74 insertions(+), 57 deletions(-) diff --git a/src/container.rs b/src/container.rs index d0af12a..b5833ea 100644 --- a/src/container.rs +++ b/src/container.rs @@ -137,10 +137,10 @@ impl ContainerBuilder { } /// A running test container for the [EventSourcingDB](https://www.eventsourcingdb.io/). -/// +/// /// Aside from managing the container, this struct also provides methods to get the data needed to connect to /// the database or even a fully configured client. -/// +/// /// You'll most likely want to use the [`Container::start_default`] method to create a new container instance for your tests. /// For more details, see the [`crate::container`] module documentation. /// ``` @@ -167,7 +167,7 @@ impl Container { } /// Shortcut method to start the container with default settings. - /// + /// /// This is the same as calling [`Container::builder`] and then [`ContainerBuilder::start`]. /// In most cases this will create a contaienr with the latest image tag and a working configuration. /// @@ -178,7 +178,7 @@ impl Container { } /// Get the host of the container. - /// + /// /// This is the host that you can use to connect to the database. In most cases this will be `localhost`. /// /// # Errors @@ -188,7 +188,7 @@ impl Container { } /// Get the mapped port for the database. - /// + /// /// This is the port that you can use to connect to the database. This will be a random port that is mapped to the internal port configured via [`ContainerBuilder::with_port`]. /// /// # Errors diff --git a/src/event.rs b/src/event.rs index be7fe0f..f927015 100644 --- a/src/event.rs +++ b/src/event.rs @@ -10,5 +10,5 @@ pub use event_types::event_candidate::EventCandidate; pub use event_types::management_event::ManagementEvent; pub use trace_info::TraceInfo; -#[cfg(feature = "cloudevents")] +#[cfg(feature="cloudevents")] use crate::error::EventError; diff --git a/src/event/event_types/event.rs b/src/event/event_types/event.rs index 93ed39d..1c730ce 100644 --- a/src/event/event_types/event.rs +++ b/src/event/event_types/event.rs @@ -2,7 +2,8 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::event::{EventCandidate, trace_info::TraceInfo}; +use crate::event::{trace_info::TraceInfo, EventCandidate}; + /// Represents an event that has been received from the DB. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/src/event/event_types/event_candidate.rs b/src/event/event_types/event_candidate.rs index fd21a0c..9d5947d 100644 --- a/src/event/event_types/event_candidate.rs +++ b/src/event/event_types/event_candidate.rs @@ -1,7 +1,7 @@ -use crate::event::trace_info::TraceInfo; use serde::{Deserialize, Serialize}; use serde_json::Value; use typed_builder::TypedBuilder; +use crate::event::trace_info::TraceInfo; #[cfg(feature = "cloudevents")] use crate::error::EventError; diff --git a/src/event/event_types/management_event.rs b/src/event/event_types/management_event.rs index 56c34dd..f47893c 100644 --- a/src/event/event_types/management_event.rs +++ b/src/event/event_types/management_event.rs @@ -1,11 +1,11 @@ -use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; +use chrono::{DateTime, Utc}; /// Represents a management event that has been received from the DB. -/// +/// /// For management requests like [`crate::client::Client::ping`] and [`crate::client::Client::verify_api_token`] the DB will send a management event. -/// +/// /// Compared to a normal Event, this does not contain the following fields: /// - hash /// - predecessorhash @@ -18,7 +18,7 @@ pub struct ManagementEvent { source: String, specversion: String, subject: String, - time: DateTime, + time: DateTime, r#type: String, } @@ -61,7 +61,7 @@ impl ManagementEvent { &self.time } /// Get the type of an event. - /// + /// /// This method is called `ty` to avoid conflicts with the `type` keyword in Rust. #[must_use] pub fn ty(&self) -> &str { diff --git a/src/event/trace_info.rs b/src/event/trace_info.rs index 598a415..7647423 100644 --- a/src/event/trace_info.rs +++ b/src/event/trace_info.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; pub enum TraceInfo { // LEAVE ORDER AS IS // This is important for deserialization as the traceparent is always present + /// The traceparent and tracestate of the event. /// This is used for distributed tracing. WithState { @@ -51,7 +52,7 @@ impl TraceInfo { /// /// # Errors /// If the cloudevent contains a tracestate but no traceparent, an error will be returned. - #[cfg(feature = "cloudevents")] + #[cfg(feature="cloudevents")] pub fn from_cloudevent(event: &cloudevents::Event) -> Result, EventError> { let traceparent = event.extension("traceparent").map(ToString::to_string); let tracestate = event.extension("tracestate").map(ToString::to_string); diff --git a/tests/essentials.rs b/tests/essentials.rs index f4a835d..49b2615 100644 --- a/tests/essentials.rs +++ b/tests/essentials.rs @@ -18,10 +18,7 @@ async fn ping_unavailable_server_errors() { async fn verify_api_token() { let container = Container::start_default().await.unwrap(); let client = container.get_client().await.unwrap(); - client - .verify_api_token() - .await - .expect("Failed to verify API token"); + client.verify_api_token().await.expect("Failed to verify API token"); } #[tokio::test] diff --git a/tests/write_events.rs b/tests/write_events.rs index ff38cf4..4883614 100644 --- a/tests/write_events.rs +++ b/tests/write_events.rs @@ -1,9 +1,7 @@ mod utils; use eventsourcingdb_client_rust::{ - client::Precondition, - container::Container, - event::{EventCandidate, TraceInfo}, + client::Precondition, container::Container, event::{EventCandidate, TraceInfo} }; use serde_json::json; use utils::{ @@ -48,9 +46,11 @@ async fn write_event_with_is_pristine_condition_on_empty_subject() { let result = client .write_events( vec![event_candidate.clone()], - vec![Precondition::IsSubjectPristine { - subject: event_candidate.subject.clone(), - }], + vec![ + Precondition::IsSubjectPristine { + subject: event_candidate.subject.clone(), + }, + ], ) .await; assert!(result.is_ok(), "Failed to write events: {:?}", result); @@ -74,9 +74,11 @@ async fn write_event_with_is_pristine_condition_on_non_empty_subject() { let result = client .write_events( vec![event_candidate.clone()], - vec![Precondition::IsSubjectPristine { - subject: event_candidate.subject.clone(), - }], + vec![ + Precondition::IsSubjectPristine { + subject: event_candidate.subject.clone(), + }, + ], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); @@ -94,9 +96,11 @@ async fn write_events_with_is_pristine_condition_on_empty_subject() { let result = client .write_events( event_candidates.clone(), - vec![Precondition::IsSubjectPristine { - subject: event_candidates[1].subject.clone(), - }], + vec![ + Precondition::IsSubjectPristine { + subject: event_candidates[1].subject.clone(), + }, + ], ) .await; assert!(result.is_ok(), "Failed to write events: {:?}", result); @@ -121,9 +125,11 @@ async fn write_events_with_is_pristine_condition_on_non_empty_subject() { let result = client .write_events( event_candidates, - vec![Precondition::IsSubjectPristine { - subject: fill_event_candidate.subject.clone(), - }], + vec![ + Precondition::IsSubjectPristine { + subject: fill_event_candidate.subject.clone(), + }, + ], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); @@ -138,10 +144,12 @@ async fn write_event_with_is_subject_on_event_id_condition_on_empty_subject() { let result = client .write_events( vec![event_candidate.clone()], - vec![Precondition::IsSubjectOnEventId { - subject: event_candidate.subject.clone(), - event_id: "100".to_string(), - }], + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: "100".to_string(), + }, + ], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); @@ -162,10 +170,12 @@ async fn write_event_with_is_subject_on_event_id_condition_on_non_empty_subject_ let result = client .write_events( vec![event_candidate.clone()], - vec![Precondition::IsSubjectOnEventId { - subject: event_candidate.subject.clone(), - event_id: written.id().to_string(), - }], + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: written.id().to_string(), + }, + ], ) .await; assert!(result.is_ok(), "Writing the event failed: {:?}", result); @@ -186,10 +196,12 @@ async fn write_event_with_is_subject_on_event_id_condition_on_non_empty_subject_ let result = client .write_events( vec![event_candidate.clone()], - vec![Precondition::IsSubjectOnEventId { - subject: event_candidate.subject.clone(), - event_id: 100.to_string(), - }], + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: 100.to_string(), + }, + ], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); @@ -207,10 +219,12 @@ async fn write_events_with_is_subject_on_event_id_condition_on_empty_subject() { let result = client .write_events( event_candidates.clone(), - vec![Precondition::IsSubjectOnEventId { - subject: event_candidates[1].subject.clone(), - event_id: "100".to_string(), - }], + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidates[1].subject.clone(), + event_id: "100".to_string(), + }, + ], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); @@ -235,10 +249,12 @@ async fn write_events_with_is_subject_on_event_id_condition_on_non_empty_subject let result = client .write_events( event_candidates, - vec![Precondition::IsSubjectOnEventId { - subject: fill_event_candidate.subject.clone(), - event_id: written.id().to_string(), - }], + vec![ + Precondition::IsSubjectOnEventId { + subject: fill_event_candidate.subject.clone(), + event_id: written.id().to_string(), + }, + ], ) .await; assert!(result.is_ok(), "Writing the events failed: {:?}", result); @@ -263,10 +279,12 @@ async fn write_events_with_is_subject_on_event_id_condition_on_non_empty_subject let result = client .write_events( event_candidates, - vec![Precondition::IsSubjectOnEventId { - subject: fill_event_candidate.subject.clone(), - event_id: 100.to_string(), - }], + vec![ + Precondition::IsSubjectOnEventId { + subject: fill_event_candidate.subject.clone(), + event_id: 100.to_string(), + }, + ], ) .await; assert!(result.is_err(), "Expected an error, but got: {:?}", result); From 31b6bb4719a366b04b20cad9f5dbc2087556a7e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Thu, 15 May 2025 17:32:35 +0200 Subject: [PATCH 06/12] chore(linting): add reasoning to allow clippy::needless_lifetimes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Raphael Höser --- src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 90b8923..3843944 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ #![doc = include_str!("../README.md")] +// There is a known bug in clippy: +// https://github.com/rust-lang/rust-clippy/issues/12908 #![allow(clippy::needless_lifetimes)] #![deny( ambiguous_negative_literals, From e0026e80ff8b64942a463be752dbf08f09fda7ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Thu, 15 May 2025 21:16:08 +0200 Subject: [PATCH 07/12] feat(client): add reading and streaming events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests are still missing here Signed-off-by: Raphael Höser --- README.md | 2 +- src/client.rs | 40 +++++++++- src/client/client_request.rs | 16 +++- src/client/client_request/list_event_types.rs | 1 - src/client/client_request/list_subjects.rs | 1 - src/client/client_request/read_events.rs | 26 +++++++ src/client/request_options.rs | 76 +++++++++++++++++++ 7 files changed, 153 insertions(+), 9 deletions(-) create mode 100644 src/client/client_request/read_events.rs create mode 100644 src/client/request_options.rs diff --git a/README.md b/README.md index 1334066..9d26a12 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c - 🚀 [Essentials](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#essentials) - 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events) -- ❌ [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events) +- 🚀 [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events) - ❌ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql) - ❌ [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events) - 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery) diff --git a/src/client.rs b/src/client.rs index 933f93e..cdc7a6c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,6 +19,7 @@ mod client_request; mod precondition; +pub mod request_options; use crate::{ error::ClientError, @@ -26,8 +27,8 @@ use crate::{ }; use client_request::{ ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest, - RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest, - list_event_types::EventType, + ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, + WriteEventsRequest, list_event_types::EventType, }; use futures::Stream; pub use precondition::Precondition; @@ -146,7 +147,7 @@ impl Client { let response = self.build_request(&endpoint)?.send().await?; if response.status().is_success() { - Ok(endpoint.build_stream(response)) + Ok(R::build_stream(response)) } else { Err(ClientError::DBApiError( response.status(), @@ -176,6 +177,39 @@ impl Client { Ok(()) } + /// Reads events from the DB instance. + /// + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let mut event_stream = client.read_events("/").await.expect("Failed to read events"); + /// while let Some(event) = event_stream.next().await { + /// println!("Found Type {:?}", event.expect("Error while reading events")); + /// } + /// # }) + /// ``` + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + pub async fn read_events<'a>( + &self, + subject: &'a str, + options: Option>, + ) -> Result>, ClientError> { + let response = self + .request_streaming(ReadEventsRequest { subject, options }) + .await?; + Ok(response) + } + /// Verifies the API token by sending a request to the DB instance. /// /// ``` diff --git a/src/client/client_request.rs b/src/client/client_request.rs index 1467dbe..33ffd30 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -3,6 +3,7 @@ pub mod list_event_types; mod list_subjects; mod ping; +mod read_events; mod register_event_schema; mod verify_api_token; mod write_events; @@ -10,12 +11,16 @@ mod write_events; pub use list_event_types::ListEventTypesRequest; pub use list_subjects::ListSubjectsRequest; pub use ping::PingRequest; +pub use read_events::ReadEventsRequest; pub use register_event_schema::RegisterEventSchemaRequest; pub use verify_api_token::VerifyApiTokenRequest; pub use write_events::WriteEventsRequest; use crate::error::ClientError; -use futures::{Stream, stream::TryStreamExt}; +use futures::{ + Stream, + stream::{StreamExt, TryStreamExt}, +}; use futures_util::io; use reqwest::Method; use serde::Serialize; @@ -60,9 +65,14 @@ pub trait StreamingRequest: ClientRequest { type ItemType: DeserializeOwned; fn build_stream( - self, response: reqwest::Response, - ) -> impl Stream>; + ) -> impl Stream> { + Self::lines_stream(response).map(|line| { + let line = line?; + let item = serde_json::from_str(line.as_str())?; + Ok(item) + }) + } fn lines_stream( response: reqwest::Response, diff --git a/src/client/client_request/list_event_types.rs b/src/client/client_request/list_event_types.rs index 538c8e7..03d9c7c 100644 --- a/src/client/client_request/list_event_types.rs +++ b/src/client/client_request/list_event_types.rs @@ -31,7 +31,6 @@ impl StreamingRequest for ListEventTypesRequest { type ItemType = EventType; fn build_stream( - self, response: reqwest::Response, ) -> impl Stream> { #[derive(Deserialize, Debug)] diff --git a/src/client/client_request/list_subjects.rs b/src/client/client_request/list_subjects.rs index c70ea36..78c2fb4 100644 --- a/src/client/client_request/list_subjects.rs +++ b/src/client/client_request/list_subjects.rs @@ -24,7 +24,6 @@ impl<'a> StreamingRequest for ListSubjectsRequest<'a> { type ItemType = String; fn build_stream( - self, response: reqwest::Response, ) -> impl Stream> { #[derive(Deserialize, Debug)] diff --git a/src/client/client_request/read_events.rs b/src/client/client_request/read_events.rs new file mode 100644 index 0000000..47c5b58 --- /dev/null +++ b/src/client/client_request/read_events.rs @@ -0,0 +1,26 @@ +use reqwest::Method; +use serde::Serialize; + +use crate::{client::request_options::ReadEventsRequestOptions, error::ClientError, event::Event}; + +use super::{ClientRequest, StreamingRequest}; + +#[derive(Debug, Clone, Serialize)] +pub struct ReadEventsRequest<'a> { + pub subject: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + pub options: Option>, +} + +impl<'a> ClientRequest for ReadEventsRequest<'a> { + const URL_PATH: &'static str = "/api/v1/read-events"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} + +impl<'a> StreamingRequest for ReadEventsRequest<'a> { + type ItemType = Event; +} diff --git a/src/client/request_options.rs b/src/client/request_options.rs new file mode 100644 index 0000000..9db6c7f --- /dev/null +++ b/src/client/request_options.rs @@ -0,0 +1,76 @@ +//! This module contains supporting options for the client requests. + +use serde::Serialize; + +/// Options for reading events from the database +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ReadEventsRequestOptions<'a> { + /// Start reading events from this start event + #[serde(skip_serializing_if = "Option::is_none")] + pub from_latest_event: Option>, + /// Lower bound of events to read + #[serde(skip_serializing_if = "Option::is_none")] + pub lower_bound: Option>, + /// Ordering of the returned events + #[serde(skip_serializing_if = "Option::is_none")] + pub order: Option, + /// Include recursive subject's events + pub recursive: bool, + /// Upper bound of events to read + #[serde(skip_serializing_if = "Option::is_none")] + pub upper_bound: Option>, +} + +/// Ordering of the responses of requests +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum Ordering { + /// Order the responses in chronological order + Chronological, + /// Order the responses in reverse chronological order + Antichronological, +} + +/// The type of the request bound +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum BoundType { + /// The bound is included in the response + Inclusive, + /// The bound is excluded from the response + Exclusive, +} + +/// A single bound for the request +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Bound<'a> { + /// The type of the bound + pub bound_type: BoundType, + /// The value of the bound + pub id: &'a str, +} + +/// The strategy for handling missing events +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum EventMissingStrategy { + /// Read all events if the required one is missing + ReadEverything, + /// Read no events if the required one is missing + ReadNothing, +} + +/// Options for reading events from the start reading at +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FromLatestEventOptions<'a> { + /// The strategy for handling missing events + pub if_event_is_missing: EventMissingStrategy, + /// The subject the event should be on + pub subject: &'a str, + /// The type of the event to read from + #[serde(rename = "type")] + pub ty: &'a str, +} From 5c45884e5190c1fac41c552a14eb3178abf4cd67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Fri, 16 May 2025 19:50:36 +0200 Subject: [PATCH 08/12] fix(test): correct compile error in doc test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Raphael Höser --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index cdc7a6c..51d3146 100644 --- a/src/client.rs +++ b/src/client.rs @@ -190,7 +190,7 @@ impl Client { /// # let db_url = container.get_base_url().await.unwrap(); /// # let api_token = container.get_api_token(); /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); - /// let mut event_stream = client.read_events("/").await.expect("Failed to read events"); + /// let mut event_stream = client.read_events("/", None).await.expect("Failed to read events"); /// while let Some(event) = event_stream.next().await { /// println!("Found Type {:?}", event.expect("Error while reading events")); /// } From 3bbeb7bc96f420341f5022482972bfb24c0a7bf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Sun, 18 May 2025 21:20:25 +0200 Subject: [PATCH 09/12] fix(Client): fix parsing the read stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Raphael Höser --- src/client/client_request/read_events.rs | 29 +- src/client/request_options.rs | 2 +- tests/read_events.rs | 336 +++++++++++++++++++++++ tests/utils/mod.rs | 2 +- 4 files changed, 366 insertions(+), 3 deletions(-) create mode 100644 tests/read_events.rs diff --git a/src/client/client_request/read_events.rs b/src/client/client_request/read_events.rs index 47c5b58..618fb46 100644 --- a/src/client/client_request/read_events.rs +++ b/src/client/client_request/read_events.rs @@ -1,5 +1,6 @@ +use futures::{Stream, stream::StreamExt}; use reqwest::Method; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use crate::{client::request_options::ReadEventsRequestOptions, error::ClientError, event::Event}; @@ -23,4 +24,30 @@ impl<'a> ClientRequest for ReadEventsRequest<'a> { impl<'a> StreamingRequest for ReadEventsRequest<'a> { type ItemType = Event; + + fn build_stream( + response: reqwest::Response, + ) -> impl Stream> { + #[derive(Deserialize, Debug)] + #[serde(tag = "type", content = "payload", rename_all = "camelCase")] + enum LineItem { + Error { error: String }, + Event(Event), + } + + impl From for Result { + fn from(item: LineItem) -> Self { + match item { + LineItem::Error { error } => Err(ClientError::DBError(error)), + LineItem::Event(event_type) => Ok(event_type), + } + } + } + + Self::lines_stream(response).map(|line| { + let line = line?; + let item: LineItem = serde_json::from_str(line.as_str())?; + item.into() + }) + } } diff --git a/src/client/request_options.rs b/src/client/request_options.rs index 9db6c7f..40647ce 100644 --- a/src/client/request_options.rs +++ b/src/client/request_options.rs @@ -3,7 +3,7 @@ use serde::Serialize; /// Options for reading events from the database -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Default, Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct ReadEventsRequestOptions<'a> { /// Start reading events from this start event diff --git a/tests/read_events.rs b/tests/read_events.rs new file mode 100644 index 0000000..4fa4cb8 --- /dev/null +++ b/tests/read_events.rs @@ -0,0 +1,336 @@ +mod utils; + +use eventsourcingdb_client_rust::{ + client::request_options::{ + EventMissingStrategy, FromLatestEventOptions, Ordering, ReadEventsRequestOptions, + }, + container::Container, +}; +use futures::TryStreamExt; +use serde_json::json; +use utils::{ + assert_event_match_eventcandidate, create_numbered_eventcandidates, create_test_eventcandidate, +}; + +#[tokio::test] +async fn make_read_call() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let events_stream = client + .read_events("/", None) + .await + .expect("Failed to read events"); + let events: Result, _> = events_stream.try_collect().await; + assert!(events.is_ok(), "Failed to write events: {:?}", events); + let events = events.expect("Failed to read events"); + assert_eq!(events.len(), 0); +} + +#[tokio::test] +async fn make_read_call_with_event() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + let written = client + .write_events(vec![event_candidate.clone()], vec![]) + .await + .expect("Unable to write event"); + + let events_stream = client + .read_events("/test", None) + .await + .expect("Failed to request events"); + let events: Vec<_> = events_stream + .try_collect() + .await + .expect("Failed to read events"); + + assert_eq!(events, written); +} + +#[tokio::test] +async fn make_read_call_with_multiple_events() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidates = create_numbered_eventcandidates(10); + let written = client + .write_events(event_candidates.clone(), vec![]) + .await + .expect("Failed to write events"); + + let events_stream = client + .read_events("/test", None) + .await + .expect("Failed to request events"); + let events: Vec<_> = events_stream + .try_collect() + .await + .expect("Failed to read events"); + + assert_eq!(events, written); +} + +#[tokio::test] +async fn read_from_exact_topic() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + client + .write_events(vec![event_candidate.clone()], vec![]) + .await + .expect("Unable to write event"); + client + .write_events( + vec![create_test_eventcandidate("/wrong", json!({"value": 1}))], + vec![], + ) + .await + .expect("Unable to write event"); + + let events_stream = client + .read_events("/test", None) + .await + .expect("Failed to request events"); + let events: Vec<_> = events_stream + .try_collect() + .await + .expect("Failed to read events"); + + assert_eq!(events.len(), 1); + assert_event_match_eventcandidate(&events[0], &event_candidate, None, None); +} + +#[tokio::test] +async fn read_recursive() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidate_parent = create_test_eventcandidate("/test", json!({"value": 1})); + let event_candidate_child = create_test_eventcandidate("/test/sub", json!({"value": 2})); + let written = client + .write_events( + vec![ + event_candidate_parent.clone(), + event_candidate_child.clone(), + ], + vec![], + ) + .await + .expect("Unable to write event"); + + let events_stream = client + .read_events( + "/test", + Some(ReadEventsRequestOptions { + recursive: true, + ..Default::default() + }), + ) + .await + .expect("Failed to request events"); + let events: Vec<_> = events_stream + .try_collect() + .await + .expect("Failed to read events"); + + assert_eq!(events, written); +} + +#[tokio::test] +async fn read_not_recursive() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidate_parent = create_test_eventcandidate("/test", json!({"value": 1})); + let event_candidate_child = create_test_eventcandidate("/test/sub", json!({"value": 2})); + client + .write_events( + vec![ + event_candidate_parent.clone(), + event_candidate_child.clone(), + ], + vec![], + ) + .await + .expect("Unable to write event"); + + let events_stream = client + .read_events( + "/test", + Some(ReadEventsRequestOptions { + recursive: false, + ..Default::default() + }), + ) + .await + .expect("Failed to request events"); + let events: Vec<_> = events_stream + .try_collect() + .await + .expect("Failed to read events"); + assert_eq!(events.len(), 1); + assert_event_match_eventcandidate(&events[0], &event_candidate_parent, None, None); +} + +#[tokio::test] +async fn read_chronological() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidates = create_numbered_eventcandidates(10); + let written = client + .write_events(event_candidates.clone(), vec![]) + .await + .expect("Failed to write events"); + + let events_stream = client + .read_events( + "/test", + Some(ReadEventsRequestOptions { + order: Some(Ordering::Chronological), + ..Default::default() + }), + ) + .await + .expect("Failed to request events"); + let events: Vec<_> = events_stream + .try_collect() + .await + .expect("Failed to read events"); + + assert_eq!(events, written); +} + +#[tokio::test] +async fn read_antichronological() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidates = create_numbered_eventcandidates(10); + let written = client + .write_events(event_candidates.clone(), vec![]) + .await + .expect("Failed to write events"); + + let events_stream = client + .read_events( + "/test", + Some(ReadEventsRequestOptions { + order: Some(Ordering::Antichronological), + ..Default::default() + }), + ) + .await + .expect("Failed to request events"); + let events: Vec<_> = events_stream + .try_collect() + .await + .expect("Failed to read events"); + + // Reverse the reversed results from DB should result in the original order + let reversed_events: Vec<_> = events.iter().rev().cloned().collect(); + assert_eq!(reversed_events, written); +} + +#[tokio::test] +async fn read_everything_from_missing_latest_event() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidates = create_numbered_eventcandidates(10); + let written = client + .write_events(event_candidates.clone(), vec![]) + .await + .expect("Failed to write events"); + + let events_stream = client + .read_events( + "/test", + Some(ReadEventsRequestOptions { + from_latest_event: Some(FromLatestEventOptions { + subject: "/", + ty: "io.eventsourcingdb.test.does-not-exist", + if_event_is_missing: EventMissingStrategy::ReadEverything, + }), + ..Default::default() + }), + ) + .await + .expect("Failed to request events"); + let events: Vec<_> = events_stream + .try_collect() + .await + .expect("Failed to read events"); + + assert_eq!(events, written); +} + +#[tokio::test] +async fn read_nothing_from_missing_latest_event() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidates = create_numbered_eventcandidates(10); + client + .write_events(event_candidates.clone(), vec![]) + .await + .expect("Failed to write events"); + + let events_stream = client + .read_events( + "/test", + Some(ReadEventsRequestOptions { + from_latest_event: Some(FromLatestEventOptions { + subject: "/", + ty: "io.eventsourcingdb.test.does-not-exist", + if_event_is_missing: EventMissingStrategy::ReadNothing, + }), + ..Default::default() + }), + ) + .await + .expect("Failed to request events"); + let events: Vec<_> = events_stream + .try_collect() + .await + .expect("Failed to read events"); + + assert_eq!(events.len(), 0); +} + +#[tokio::test] +async fn read_from_latest_event() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidates = create_numbered_eventcandidates(10); + client + .write_events(event_candidates.clone(), vec![]) + .await + .expect("Failed to write events"); + client + .write_events( + vec![create_test_eventcandidate("/marker", json!({"value": 1}))], + vec![], + ) + .await + .expect("Failed to write events"); + let written = client + .write_events(event_candidates.clone(), vec![]) + .await + .expect("Failed to write events"); + + let events_stream = client + .read_events( + "/test", + Some(ReadEventsRequestOptions { + from_latest_event: Some(FromLatestEventOptions { + subject: "/marker", + ty: "io.eventsourcingdb.test", + if_event_is_missing: EventMissingStrategy::ReadNothing, + }), + ..Default::default() + }), + ) + .await + .expect("Failed to request events"); + let events: Vec<_> = events_stream + .try_collect() + .await + .expect("Failed to read events"); + + assert_eq!(events, written); +} diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs index 917dfd1..8b97d20 100644 --- a/tests/utils/mod.rs +++ b/tests/utils/mod.rs @@ -57,7 +57,7 @@ pub fn assert_event_match_eventcandidate( event.predecessorhash(), previous_event_hash .unwrap_or("0000000000000000000000000000000000000000000000000000000000000000"), - "Time should be present" + "Previous hash should be present" ); assert_eq!(event.specversion(), "1.0", "Spec version should be 1.0"); assert!( From c1aac65f72b0e0b3335833f7b2b9c59603fc4519 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Tue, 20 May 2025 20:02:32 +0200 Subject: [PATCH 10/12] chore(lints): fix lints and formatting --- src/client.rs | 10 +++++----- src/client/client_request.rs | 9 +++------ src/client/client_request/list_subjects.rs | 4 ++-- src/client/client_request/register_event_schema.rs | 4 ++-- src/container.rs | 13 +++++-------- 5 files changed, 17 insertions(+), 23 deletions(-) diff --git a/src/client.rs b/src/client.rs index 933f93e..699885f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -39,7 +39,7 @@ use url::Url; pub struct Client { base_url: Url, api_token: String, - client: reqwest::Client, + reqwest: reqwest::Client, } impl Client { @@ -48,7 +48,7 @@ impl Client { Client { base_url, api_token: api_token.into(), - client: reqwest::Client::new(), + reqwest: reqwest::Client::new(), } } @@ -94,8 +94,8 @@ impl Client { .map_err(ClientError::URLParseError)?; let request = match endpoint.method() { - reqwest::Method::GET => self.client.get(url), - reqwest::Method::POST => self.client.post(url), + reqwest::Method::GET => self.reqwest.get(url), + reqwest::Method::POST => self.reqwest.post(url), _ => return Err(ClientError::InvalidRequestMethod), } .header("Authorization", format!("Bearer {}", self.api_token)); @@ -311,7 +311,7 @@ impl Client { /// } /// # }) /// ``` - /// + /// /// # Errors /// This function will return an error if the request fails or if the URL is invalid. pub async fn list_event_types( diff --git a/src/client/client_request.rs b/src/client/client_request.rs index 1467dbe..4b8accd 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -67,12 +67,9 @@ pub trait StreamingRequest: ClientRequest { fn lines_stream( response: reqwest::Response, ) -> impl Stream> { - let bytes = response.bytes_stream().map_err(|err| { - io::Error::new( - io::ErrorKind::Other, - format!("Failed to read response stream: {err}"), - ) - }); + let bytes = response + .bytes_stream() + .map_err(|err| io::Error::other(format!("Failed to read response stream: {err}"))); let stream_reader = StreamReader::new(bytes); LinesStream::new(BufReader::new(stream_reader).lines()).map_err(ClientError::from) } diff --git a/src/client/client_request/list_subjects.rs b/src/client/client_request/list_subjects.rs index c70ea36..44fb5ce 100644 --- a/src/client/client_request/list_subjects.rs +++ b/src/client/client_request/list_subjects.rs @@ -12,7 +12,7 @@ pub struct ListSubjectsRequest<'a> { pub base_subject: &'a str, } -impl<'a> ClientRequest for ListSubjectsRequest<'a> { +impl ClientRequest for ListSubjectsRequest<'_> { const URL_PATH: &'static str = "/api/v1/read-subjects"; const METHOD: Method = Method::POST; @@ -20,7 +20,7 @@ impl<'a> ClientRequest for ListSubjectsRequest<'a> { Some(Ok(self)) } } -impl<'a> StreamingRequest for ListSubjectsRequest<'a> { +impl StreamingRequest for ListSubjectsRequest<'_> { type ItemType = String; fn build_stream( diff --git a/src/client/client_request/register_event_schema.rs b/src/client/client_request/register_event_schema.rs index f42720a..ed6049b 100644 --- a/src/client/client_request/register_event_schema.rs +++ b/src/client/client_request/register_event_schema.rs @@ -23,7 +23,7 @@ impl<'a> RegisterEventSchemaRequest<'a> { } } -impl<'a> ClientRequest for RegisterEventSchemaRequest<'a> { +impl ClientRequest for RegisterEventSchemaRequest<'_> { const URL_PATH: &'static str = "/api/v1/register-event-schema"; const METHOD: Method = Method::POST; @@ -31,7 +31,7 @@ impl<'a> ClientRequest for RegisterEventSchemaRequest<'a> { Some(Ok(self)) } } -impl<'a> OneShotRequest for RegisterEventSchemaRequest<'a> { +impl OneShotRequest for RegisterEventSchemaRequest<'_> { type Response = ManagementEvent; fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { diff --git a/src/container.rs b/src/container.rs index d0af12a..7be9804 100644 --- a/src/container.rs +++ b/src/container.rs @@ -114,7 +114,7 @@ impl ContainerBuilder { Ok(Container { internal_port: self.internal_port, api_token: self.api_token.clone(), - container: GenericImage::new(self.image_name, self.image_tag) + instance: GenericImage::new(self.image_name, self.image_tag) .with_exposed_port(self.internal_port) .with_wait_for(WaitFor::Http(Box::new( HttpWaitStrategy::new("/api/v1/ping") @@ -152,7 +152,7 @@ impl ContainerBuilder { /// ``` #[derive(Debug)] pub struct Container { - container: ContainerAsync, + instance: ContainerAsync, internal_port: ContainerPort, api_token: String, } @@ -184,7 +184,7 @@ impl Container { /// # Errors /// This function will return an error if the container is not running (e.g. because it crashed) or if the host could not be retrieved pub async fn get_host(&self) -> Result { - Ok(self.container.get_host().await?) + Ok(self.instance.get_host().await?) } /// Get the mapped port for the database. @@ -194,10 +194,7 @@ impl Container { /// # Errors /// This function will return an error if the container is not running (e.g. because it crashed) or if the host could not be retrieved pub async fn get_mapped_port(&self) -> Result { - Ok(self - .container - .get_host_port_ipv4(self.internal_port) - .await?) + Ok(self.instance.get_host_port_ipv4(self.internal_port).await?) } /// Get the complete http base URL for the database. @@ -227,7 +224,7 @@ impl Container { /// # Errors /// This function will return an error if the container could not be stopped. pub async fn stop(self) -> Result<(), ContainerError> { - self.container.stop().await?; + self.instance.stop().await?; Ok(()) } From 587a2deae7ab729b1f096e0c968878d76cff2e04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Tue, 20 May 2025 20:16:08 +0200 Subject: [PATCH 11/12] chore(lints): fix new qa lints --- src/client/client_request/read_events.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/client/client_request/read_events.rs b/src/client/client_request/read_events.rs index 618fb46..801eaea 100644 --- a/src/client/client_request/read_events.rs +++ b/src/client/client_request/read_events.rs @@ -13,7 +13,7 @@ pub struct ReadEventsRequest<'a> { pub options: Option>, } -impl<'a> ClientRequest for ReadEventsRequest<'a> { +impl ClientRequest for ReadEventsRequest<'_> { const URL_PATH: &'static str = "/api/v1/read-events"; const METHOD: Method = Method::POST; @@ -22,7 +22,7 @@ impl<'a> ClientRequest for ReadEventsRequest<'a> { } } -impl<'a> StreamingRequest for ReadEventsRequest<'a> { +impl StreamingRequest for ReadEventsRequest<'_> { type ItemType = Event; fn build_stream( @@ -32,14 +32,14 @@ impl<'a> StreamingRequest for ReadEventsRequest<'a> { #[serde(tag = "type", content = "payload", rename_all = "camelCase")] enum LineItem { Error { error: String }, - Event(Event), + Event(Box), } impl From for Result { fn from(item: LineItem) -> Self { match item { LineItem::Error { error } => Err(ClientError::DBError(error)), - LineItem::Event(event_type) => Ok(event_type), + LineItem::Event(event_type) => Ok(*event_type), } } } From a047d2164a7ebd6c40465a776a3a34b1ba807035 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Thu, 22 May 2025 19:49:36 +0200 Subject: [PATCH 12/12] fix(client): resolve merge issues --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 26371c8..113e125 100644 --- a/src/client.rs +++ b/src/client.rs @@ -147,7 +147,7 @@ impl Client { let response = self.build_request(&endpoint)?.send().await?; if response.status().is_success() { - Ok(endpoint.build_stream(response)) + Ok(R::build_stream(response)) } else { Err(ClientError::DBApiError( response.status(),