diff --git a/Cargo.lock b/Cargo.lock index 7159932..28254e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,20 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "getrandom 0.3.2", + "once_cell", + "serde", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -113,6 +127,21 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bit-set" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" + [[package]] name = "bitflags" version = "1.3.2" @@ -175,12 +204,24 @@ dependencies = [ "serde_with", ] +[[package]] +name = "borrow-or-share" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3eeab4423108c5d7c744f4d234de88d18d636100093ae04caf4825134b9c3a32" + [[package]] name = "bumpalo" version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +[[package]] +name = "bytecount" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" + [[package]] name = "bytes" version = "1.10.1" @@ -362,6 +403,15 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "email_address" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e079f19b08ca6239f47f8ba8509c11cf3ea30095831f7fed61441475edd8c449" +dependencies = [ + "serde", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -417,17 +467,33 @@ dependencies = [ "chrono", "cloudevents-sdk", "eventsourcingdb-client-rust", + "futures", + "futures-util", + "jsonschema", "reqwest", "serde", "serde_json", "testcontainers", "thiserror 2.0.12", "tokio", + "tokio-stream", "tokio-test", + "tokio-util", "typed-builder", "url", ] +[[package]] +name = "fancy-regex" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e24cb5a94bcae1e5408b0effca5cd7172ea3c5755049c5f3af4cd283a165298" +dependencies = [ + "bit-set", + "regex-automata", + "regex-syntax", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -446,6 +512,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "fluent-uri" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1918b65d96df47d3591bed19c5cca17e3fa5d0707318e4b5ef2eae01764df7e5" +dependencies = [ + "borrow-or-share", + "ref-cast", + "serde", +] + [[package]] name = "fnv" version = "1.0.7" @@ -476,6 +553,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fraction" +version = "0.15.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f158e3ff0a1b334408dc9fb811cd99b446986f4d8b741bb08f9df1604085ae7" +dependencies = [ + "lazy_static", + "num", +] + [[package]] name = "futures" version = "0.3.31" @@ -1083,6 +1170,39 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonschema" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b46a0365a611fbf1d2143104dcf910aada96fafd295bab16c60b802bf6fa1d" +dependencies = [ + "ahash", + "base64 0.22.1", + "bytecount", + "email_address", + "fancy-regex", + "fraction", + "idna", + "itoa", + "num-cmp", + "num-traits", + "once_cell", + "percent-encoding", + "referencing", + "regex", + "regex-syntax", + "reqwest", + "serde", + "serde_json", + "uuid-simd", +] + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.172" @@ -1192,12 +1312,82 @@ dependencies = [ "tempfile", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-cmp" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63335b2e2c34fae2fb0aa2cecfd9f0832a1e24b3b32ecec612c3426d46dc8aaa" + +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1266,6 +1456,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + [[package]] name = "parking_lot" version = "0.12.3" @@ -1508,6 +1704,40 @@ dependencies = [ "bitflags 2.9.0", ] +[[package]] +name = "ref-cast" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a0ae411dbe946a674d89546582cea4ba2bb8defac896622d6496f14c23ba5cf" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1165225c21bff1f3bbce98f5a1f889949bc902d3575308cc7b0de30b4f6d27c7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "referencing" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8eff4fa778b5c2a57e85c5f2fe3a709c52f0e60d23146e2151cbef5893f420e" +dependencies = [ + "ahash", + "fluent-uri", + "once_cell", + "parking_lot", + "percent-encoding", + "serde_json", +] + [[package]] name = "regex" version = "1.11.1" @@ -1546,6 +1776,7 @@ dependencies = [ "base64 0.22.1", "bytes", "encoding_rs", + "futures-channel", "futures-core", "futures-util", "h2", @@ -1578,11 +1809,13 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", + "tokio-util", "tower", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 0.26.11", "windows-registry", @@ -2361,12 +2594,35 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "uuid-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b082222b4f6619906941c17eb2297fff4c2fb96cb60164170522942a200bd8" +dependencies = [ + "outref", + "uuid", + "vsimd", +] + [[package]] name = "vcpkg" version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "want" version = "0.3.1" @@ -2462,6 +2718,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.77" diff --git a/Cargo.toml b/Cargo.toml index c84cef7..b7f067b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,15 +11,21 @@ testcontainer = ["dep:testcontainers"] [dependencies] chrono = { version = "0.4.41", features = ["serde"] } cloudevents-sdk = { version = "0.8.0", features = ["reqwest"], optional = true } -url = "2.5.4" -reqwest = { version = "0.12.15", features = ["json"] } +futures = "0.3.31" +futures-util = "0.3.31" +jsonschema = "0.30.0" +reqwest = { version = "0.12.15", features = ["json", "stream"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" testcontainers = { version = "0.24.0", features = [ "http_wait", ], optional = true } thiserror = "2.0.12" +tokio = { version = "1.44.2", features = ["io-util"] } +tokio-util = { version = "0.7.15", features = ["io"] } +tokio-stream = { version = "0.1.17", features = ["io-util"] } typed-builder = "0.21.0" +url = "2.5.4" [dev-dependencies] eventsourcingdb-client-rust = { path = ".", features = ["testcontainer"] } diff --git a/README.md b/README.md index 1889728..1334066 100644 --- a/README.md +++ b/README.md @@ -12,5 +12,5 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c - ❌ [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events) - ❌ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql) - ❌ [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events) -- ❌ [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery) +- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery) - 🚀 [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support) diff --git a/src/client.rs b/src/client.rs index ead82e4..699885f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -20,25 +20,26 @@ mod client_request; mod precondition; +use crate::{ + error::ClientError, + event::{Event, EventCandidate, ManagementEvent}, +}; use client_request::{ - ClientRequest, OneShotRequest, PingRequest, VerifyApiTokenRequest, WriteEventsRequest, + ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest, + RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest, + list_event_types::EventType, }; - +use futures::Stream; pub use precondition::Precondition; use reqwest; use url::Url; -use crate::{ - error::ClientError, - event::{Event, EventCandidate}, -}; - /// Client for an [EventsourcingDB](https://www.eventsourcingdb.io/) instance. #[derive(Debug)] pub struct Client { base_url: Url, api_token: String, - client: reqwest::Client, + reqwest: reqwest::Client, } impl Client { @@ -47,7 +48,7 @@ impl Client { Client { base_url, api_token: api_token.into(), - client: reqwest::Client::new(), + reqwest: reqwest::Client::new(), } } @@ -93,8 +94,8 @@ impl Client { .map_err(ClientError::URLParseError)?; let request = match endpoint.method() { - reqwest::Method::GET => self.client.get(url), - reqwest::Method::POST => self.client.post(url), + reqwest::Method::GET => self.reqwest.get(url), + reqwest::Method::POST => self.reqwest.post(url), _ => return Err(ClientError::InvalidRequestMethod), } .header("Authorization", format!("Bearer {}", self.api_token)); @@ -132,6 +133,28 @@ impl Client { } } + /// Utility function to request an endpoint of the API as a stream. + /// + /// This means, that the response is streamed and returned as a stream of values. + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + async fn request_streaming( + &self, + endpoint: R, + ) -> Result>, ClientError> { + let response = self.build_request(&endpoint)?.send().await?; + + if response.status().is_success() { + Ok(endpoint.build_stream(response)) + } else { + Err(ClientError::DBApiError( + response.status(), + response.text().await.unwrap_or_default(), + )) + } + } + /// Pings the DB instance to check if it is reachable. /// /// ``` @@ -174,6 +197,130 @@ impl Client { Ok(()) } + /// Registers an event schema with the DB instance. + /// + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let event_type = "io.eventsourcingdb.test"; + /// let schema = json!({ + /// "type": "object", + /// "properties": { + /// "id": { + /// "type": "string" + /// }, + /// "name": { + /// "type": "string" + /// } + /// }, + /// "required": ["id", "name"] + /// }); + /// client.register_event_schema(event_type, &schema).await.expect("Failed to list event types"); + /// # }) + /// ``` + /// + /// # Errors + /// This function will return an error if the request fails or if the provided schema is invalid. + pub async fn register_event_schema( + &self, + event_type: &str, + schema: &serde_json::Value, + ) -> Result { + self.request_oneshot(RegisterEventSchemaRequest::try_new(event_type, schema)?) + .await + } + + /// List all subjects in the DB instance. + /// + /// To get all subjects in the DB, just pass `None` as the `base_subject`. + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let mut subject_stream = client.list_subjects(None).await.expect("Failed to list event types"); + /// while let Some(subject) = subject_stream.next().await { + /// println!("Found Type {}", subject.expect("Error while reading types")); + /// } + /// # }) + /// ``` + /// + /// To get all subjects under /test in the DB, just pass `Some("/test")` as the `base_subject`. + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let mut subject_stream = client.list_subjects(Some("/test")).await.expect("Failed to list event types"); + /// while let Some(subject) = subject_stream.next().await { + /// println!("Found Type {}", subject.expect("Error while reading types")); + /// } + /// # }) + /// ``` + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + pub async fn list_subjects( + &self, + base_subject: Option<&str>, + ) -> Result>, ClientError> { + let response = self + .request_streaming(ListSubjectsRequest { + base_subject: base_subject.unwrap_or("/"), + }) + .await?; + Ok(response) + } + + /// List all event types in the DB instance. + /// + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let mut type_stream = client.list_event_types().await.expect("Failed to list event types"); + /// while let Some(ty) = type_stream.next().await { + /// println!("Found Type {}", ty.expect("Error while reading types").name); + /// } + /// # }) + /// ``` + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + pub async fn list_event_types( + &self, + ) -> Result>, ClientError> { + let response = self.request_streaming(ListEventTypesRequest).await?; + Ok(response) + } + /// Writes events to the DB instance. /// /// ``` diff --git a/src/client/client_request.rs b/src/client/client_request.rs index 4f3311e..4b8accd 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -1,16 +1,28 @@ //! This is a purely internal module to represent client requests to the database. +pub mod list_event_types; +mod list_subjects; mod ping; +mod register_event_schema; mod verify_api_token; mod write_events; +pub use list_event_types::ListEventTypesRequest; +pub use list_subjects::ListSubjectsRequest; pub use ping::PingRequest; +pub use register_event_schema::RegisterEventSchemaRequest; pub use verify_api_token::VerifyApiTokenRequest; pub use write_events::WriteEventsRequest; use crate::error::ClientError; +use futures::{Stream, stream::TryStreamExt}; +use futures_util::io; use reqwest::Method; -use serde::{Serialize, de::DeserializeOwned}; +use serde::Serialize; +use serde::de::DeserializeOwned; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio_stream::wrappers::LinesStream; +use tokio_util::io::StreamReader; /// Represents a request to the database client pub trait ClientRequest { @@ -42,3 +54,23 @@ pub trait OneShotRequest: ClientRequest { Ok(()) } } + +/// Represents a request to the database that expects a stream of responses +pub trait StreamingRequest: ClientRequest { + type ItemType: DeserializeOwned; + + fn build_stream( + self, + response: reqwest::Response, + ) -> impl Stream>; + + fn lines_stream( + response: reqwest::Response, + ) -> impl Stream> { + let bytes = response + .bytes_stream() + .map_err(|err| io::Error::other(format!("Failed to read response stream: {err}"))); + let stream_reader = StreamReader::new(bytes); + LinesStream::new(BufReader::new(stream_reader).lines()).map_err(ClientError::from) + } +} diff --git a/src/client/client_request/list_event_types.rs b/src/client/client_request/list_event_types.rs new file mode 100644 index 0000000..538c8e7 --- /dev/null +++ b/src/client/client_request/list_event_types.rs @@ -0,0 +1,59 @@ +use futures::{Stream, stream::StreamExt}; +use reqwest::Method; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::error::ClientError; + +use super::{ClientRequest, StreamingRequest}; +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct EventType { + #[serde(rename = "eventType")] + pub name: String, + pub is_phantom: bool, + pub schema: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ListEventTypesRequest; + +impl ClientRequest for ListEventTypesRequest { + const URL_PATH: &'static str = "/api/v1/read-event-types"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} +impl StreamingRequest for ListEventTypesRequest { + type ItemType = EventType; + + fn build_stream( + self, + response: reqwest::Response, + ) -> impl Stream> { + #[derive(Deserialize, Debug)] + #[serde(tag = "type", content = "payload", rename_all = "camelCase")] + enum LineItem { + Error { error: String }, + EventType(EventType), + } + + impl From for Result { + fn from(item: LineItem) -> Self { + match item { + LineItem::Error { error } => Err(ClientError::DBError(error)), + LineItem::EventType(event_type) => Ok(event_type), + } + } + } + + Self::lines_stream(response).map(|line| { + let line = line?; + let item: LineItem = serde_json::from_str(line.as_str())?; + item.into() + }) + } +} diff --git a/src/client/client_request/list_subjects.rs b/src/client/client_request/list_subjects.rs new file mode 100644 index 0000000..44fb5ce --- /dev/null +++ b/src/client/client_request/list_subjects.rs @@ -0,0 +1,48 @@ +use futures::{Stream, stream::StreamExt}; +use reqwest::Method; +use serde::{Deserialize, Serialize}; + +use crate::error::ClientError; + +use super::{ClientRequest, StreamingRequest}; + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ListSubjectsRequest<'a> { + pub base_subject: &'a str, +} + +impl ClientRequest for ListSubjectsRequest<'_> { + const URL_PATH: &'static str = "/api/v1/read-subjects"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} +impl StreamingRequest for ListSubjectsRequest<'_> { + type ItemType = String; + + fn build_stream( + self, + response: reqwest::Response, + ) -> impl Stream> { + #[derive(Deserialize, Debug)] + struct LineItem { + payload: LineItemPayload, + r#type: String, + } + #[derive(Deserialize, Debug)] + struct LineItemPayload { + subject: String, + } + Self::lines_stream(response).map(|line| { + let line = line?; + let item: LineItem = serde_json::from_str(line.as_str())?; + if item.r#type != "subject" { + return Err(ClientError::InvalidEventType); + } + Ok(item.payload.subject) + }) + } +} diff --git a/src/client/client_request/register_event_schema.rs b/src/client/client_request/register_event_schema.rs new file mode 100644 index 0000000..ed6049b --- /dev/null +++ b/src/client/client_request/register_event_schema.rs @@ -0,0 +1,42 @@ +use reqwest::Method; +use serde::Serialize; +use serde_json::Value; + +use crate::{error::ClientError, event::ManagementEvent}; + +use super::{ClientRequest, OneShotRequest}; + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RegisterEventSchemaRequest<'a> { + event_type: &'a str, + schema: &'a Value, +} + +impl<'a> RegisterEventSchemaRequest<'a> { + pub fn try_new(event_type: &'a str, schema: &'a Value) -> Result { + if event_type.is_empty() { + return Err(ClientError::InvalidEventType); + } + jsonschema::meta::validate(schema).map_err(|_e| ClientError::JsonSchemaError)?; + Ok(Self { event_type, schema }) + } +} + +impl ClientRequest for RegisterEventSchemaRequest<'_> { + const URL_PATH: &'static str = "/api/v1/register-event-schema"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} +impl OneShotRequest for RegisterEventSchemaRequest<'_> { + type Response = ManagementEvent; + + fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { + (response.ty() == "io.eventsourcingdb.api.event-schema-registered") + .then_some(()) + .ok_or(ClientError::InvalidEventType) + } +} diff --git a/src/container.rs b/src/container.rs index d0af12a..7be9804 100644 --- a/src/container.rs +++ b/src/container.rs @@ -114,7 +114,7 @@ impl ContainerBuilder { Ok(Container { internal_port: self.internal_port, api_token: self.api_token.clone(), - container: GenericImage::new(self.image_name, self.image_tag) + instance: GenericImage::new(self.image_name, self.image_tag) .with_exposed_port(self.internal_port) .with_wait_for(WaitFor::Http(Box::new( HttpWaitStrategy::new("/api/v1/ping") @@ -152,7 +152,7 @@ impl ContainerBuilder { /// ``` #[derive(Debug)] pub struct Container { - container: ContainerAsync, + instance: ContainerAsync, internal_port: ContainerPort, api_token: String, } @@ -184,7 +184,7 @@ impl Container { /// # Errors /// This function will return an error if the container is not running (e.g. because it crashed) or if the host could not be retrieved pub async fn get_host(&self) -> Result { - Ok(self.container.get_host().await?) + Ok(self.instance.get_host().await?) } /// Get the mapped port for the database. @@ -194,10 +194,7 @@ impl Container { /// # Errors /// This function will return an error if the container is not running (e.g. because it crashed) or if the host could not be retrieved pub async fn get_mapped_port(&self) -> Result { - Ok(self - .container - .get_host_port_ipv4(self.internal_port) - .await?) + Ok(self.instance.get_host_port_ipv4(self.internal_port).await?) } /// Get the complete http base URL for the database. @@ -227,7 +224,7 @@ impl Container { /// # Errors /// This function will return an error if the container could not be stopped. pub async fn stop(self) -> Result<(), ContainerError> { - self.container.stop().await?; + self.instance.stop().await?; Ok(()) } diff --git a/src/error.rs b/src/error.rs index 749df80..916b23b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,9 +6,15 @@ use thiserror::Error; /// Error type for the client #[derive(Debug, Error)] pub enum ClientError { + /// An IO Error occurred + #[error("An IO error occurred: {0}")] + IoError(#[from] std::io::Error), /// The provided request method is invalid #[error("The provided request method is invalid")] InvalidRequestMethod, + /// The provided event type is invalid + #[error("The provided event type is invalid")] + InvalidEventType, /// The provided API token is invalid #[error("The provided API token is invalid")] APITokenInvalid, @@ -24,9 +30,16 @@ pub enum ClientError { /// There was a problem with the JSON serialization #[error("The JSON serialization failed: {0}")] SerdeJsonError(#[from] serde_json::Error), - /// The DB returned an error+ + /// The DB returned an error in the response + #[error("The DB returned an error in the response: {0}")] + DBError(String), + /// The DB returned an error #[error("The DB returned an error: {0}")] DBApiError(StatusCode, String), + // check if this can hold a validation error in the future + /// The passed jsonschema is invalid + #[error("The passed jsonschema is invalid")] + JsonSchemaError, /// There was a problem with the `cloudevents` message #[cfg(feature = "cloudevents")] #[error("The CloudEvents message is invalid: {0}")] diff --git a/src/lib.rs b/src/lib.rs index 344f1be..3843944 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,7 @@ #![doc = include_str!("../README.md")] +// There is a known bug in clippy: +// https://github.com/rust-lang/rust-clippy/issues/12908 +#![allow(clippy::needless_lifetimes)] #![deny( ambiguous_negative_literals, clippy::pedantic, diff --git a/tests/metadata_and_discovery.rs b/tests/metadata_and_discovery.rs new file mode 100644 index 0000000..b32fe8b --- /dev/null +++ b/tests/metadata_and_discovery.rs @@ -0,0 +1,120 @@ +use eventsourcingdb_client_rust::container::Container; +use futures::StreamExt; +use serde_json::json; + +#[tokio::test] +async fn register_event_schema() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + client + .register_event_schema( + "io.eventsourcingdb.test", + &json!({ + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "name": { + "type": "string" + } + }, + "required": ["id", "name"] + }), + ) + .await + .expect("Failed to register event schema"); +} + +#[tokio::test] +async fn register_invalid_event_schema() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let res = client + .register_event_schema( + "io.eventsourcingdb.test", + &json!({ + "x": "asd" + }), + ) + .await; + assert!(res.is_err(), "Expected an error, but got: {:?}", res); +} + +#[tokio::test] +async fn list_all_subjects() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let res = client.list_subjects(None).await; + match res { + Ok(subjects) => { + let subjects = subjects.collect::>().await; + assert!( + subjects.is_empty(), + "Expected no subjects, but got: {:?}", + subjects + ); + } + Err(err) => panic!("Failed to list subjects: {:?}", err), + } +} + +//TODO!: add list all subjects test after writing to db + +//TODO!: add list scoped subjects test after writing to db + +#[tokio::test] +async fn list_all_event_types() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let test_event_type = "io.eventsourcingdb.test"; + let schema = json!({ + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "name": { + "type": "string" + } + }, + "required": ["id", "name"] + }); + client + .register_event_schema(test_event_type, &schema) + .await + .expect("Failed to register event schema"); + let res = client.list_event_types().await; + match res { + Ok(event_types) => { + let mut event_types = event_types.collect::>().await; + assert!( + event_types.len() == 1, + "Expected one event types, but got: {:?}", + event_types + ); + assert!(event_types[0].is_ok(), "Expected event type to be ok"); + let response_event_type = event_types.pop().unwrap().unwrap(); + assert_eq!( + response_event_type.name, test_event_type, + "Expected event type to be 'io.eventsourcingdb.test', but got: {:?}", + response_event_type.name + ); + assert_eq!( + response_event_type.schema.as_ref(), + Some(&schema), + "Expected event type schema to be {:?}, but got: {:?}", + schema, + response_event_type.schema + ); + assert!( + response_event_type.is_phantom, + "Expected event type is_phantom to be true, but got: {:?}", + response_event_type.is_phantom + ); + } + Err(err) => panic!("Failed to list event types: {:?}", err), + } +} + +// TODO!: add list event types test after writing to db