diff --git a/Cargo.lock b/Cargo.lock index f2f3425..7159932 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,7 @@ dependencies = [ "thiserror 2.0.12", "tokio", "tokio-test", + "typed-builder", "url", ] @@ -2293,6 +2294,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typed-builder" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce63bcaf7e9806c206f7d7b9c1f38e0dce8bb165a80af0898161058b19248534" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60d8d828da2a3d759d3519cdf29a5bac49c77d039ad36d0782edadbf9cd5415b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.18" diff --git a/Cargo.toml b/Cargo.toml index 1a22ad5..c84cef7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ testcontainers = { version = "0.24.0", features = [ "http_wait", ], optional = true } thiserror = "2.0.12" +typed-builder = "0.21.0" [dev-dependencies] eventsourcingdb-client-rust = { path = ".", features = ["testcontainer"] } diff --git a/README.md b/README.md index 7e04257..1889728 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This is a work in progress and not yet ready for production use. Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/) the SDK covers these criteria: - 🚀 [Essentials](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#essentials) -- ❌ [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events) +- 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events) - ❌ [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events) - ❌ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql) - ❌ [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events) diff --git a/src/client.rs b/src/client.rs index 3d9211f..ead82e4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,13 +18,20 @@ //! If this works, it means that the client is correctly configured and you can use it to make requests to the DB. mod client_request; +mod precondition; -use client_request::{ClientRequest, PingRequest, VerifyApiTokenRequest}; +use client_request::{ + ClientRequest, OneShotRequest, PingRequest, VerifyApiTokenRequest, WriteEventsRequest, +}; +pub use precondition::Precondition; use reqwest; use url::Url; -use crate::error::ClientError; +use crate::{ + error::ClientError, + event::{Event, EventCandidate}, +}; /// Client for an [EventsourcingDB](https://www.eventsourcingdb.io/) instance. #[derive(Debug)] @@ -72,9 +79,14 @@ impl Client { /// Utility function to request an endpoint of the API. /// + /// This function will return a [`reqwest::RequestBuilder`] which can be used to send the request. + /// /// # Errors /// This function will return an error if the request fails or if the URL is invalid. - async fn request(&self, endpoint: R) -> Result { + fn build_request( + &self, + endpoint: &R, + ) -> Result { let url = self .base_url .join(endpoint.url_path()) @@ -93,15 +105,27 @@ impl Client { } else { request }; + Ok(request) + } - let response = request.send().await?; + /// Utility function to request an endpoint of the API as a oneshot. + /// + /// This means, that the response is not streamed, but returned as a single value. + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + async fn request_oneshot( + &self, + endpoint: R, + ) -> Result { + let response = self.build_request(&endpoint)?.send().await?; if response.status().is_success() { let result = response.json().await?; endpoint.validate_response(&result)?; Ok(result) } else { - Err(ClientError::DBError( + Err(ClientError::DBApiError( response.status(), response.text().await.unwrap_or_default(), )) @@ -125,7 +149,7 @@ impl Client { /// # Errors /// This function will return an error if the request fails or if the URL is invalid. pub async fn ping(&self) -> Result<(), ClientError> { - let _ = self.request(PingRequest).await?; + let _ = self.request_oneshot(PingRequest).await?; Ok(()) } @@ -146,7 +170,45 @@ impl Client { /// # Errors /// This function will return an error if the request fails or if the URL is invalid. pub async fn verify_api_token(&self) -> Result<(), ClientError> { - let _ = self.request(VerifyApiTokenRequest).await?; + let _ = self.request_oneshot(VerifyApiTokenRequest).await?; Ok(()) } + + /// Writes events to the DB instance. + /// + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let candidates = vec![ + /// EventCandidate::builder() + /// .source("https://www.eventsourcingdb.io".to_string()) + /// .data(json!({"value": 1})) + /// .subject("/test".to_string()) + /// .r#type("io.eventsourcingdb.test".to_string()) + /// .build() + /// ]; + /// let written_events = client.write_events(candidates, vec![]).await.expect("Failed to write events"); + /// # }) + /// ``` + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + pub async fn write_events( + &self, + events: Vec, + preconditions: Vec, + ) -> Result, ClientError> { + self.request_oneshot(WriteEventsRequest { + events, + preconditions, + }) + .await + } } diff --git a/src/client/client_request.rs b/src/client/client_request.rs index cc389a9..4f3311e 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -1,15 +1,21 @@ //! This is a purely internal module to represent client requests to the database. -use reqwest::Method; -use serde_json::Value; +mod ping; +mod verify_api_token; +mod write_events; + +pub use ping::PingRequest; +pub use verify_api_token::VerifyApiTokenRequest; +pub use write_events::WriteEventsRequest; -use crate::{error::ClientError, event::ManagementEvent}; +use crate::error::ClientError; +use reqwest::Method; +use serde::{Serialize, de::DeserializeOwned}; /// Represents a request to the database client pub trait ClientRequest { const URL_PATH: &'static str; const METHOD: Method; - type Response: serde::de::DeserializeOwned; /// Returns the URL path for the request fn url_path(&self) -> &'static str { @@ -22,44 +28,17 @@ pub trait ClientRequest { } /// Returns the body for the request - fn body(&self) -> Option> { - None + fn body(&self) -> Option> { + None::> } +} + +/// Represents a request to the database that expects a single response +pub trait OneShotRequest: ClientRequest { + type Response: DeserializeOwned; /// Validate the response from the database fn validate_response(&self, _response: &Self::Response) -> Result<(), ClientError> { Ok(()) } } - -/// Ping the Database instance -#[derive(Debug, Clone, Copy)] -pub struct PingRequest; - -impl ClientRequest for PingRequest { - const URL_PATH: &'static str = "/api/v1/ping"; - const METHOD: Method = Method::GET; - type Response = ManagementEvent; - - fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { - (response.ty() == "io.eventsourcingdb.api.ping-received") - .then_some(()) - .ok_or(ClientError::PingFailed) - } -} - -/// Verify the API token -#[derive(Debug, Clone, Copy)] -pub struct VerifyApiTokenRequest; - -impl ClientRequest for VerifyApiTokenRequest { - const URL_PATH: &'static str = "/api/v1/verify-api-token"; - const METHOD: Method = Method::POST; - type Response = ManagementEvent; - - fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { - (response.ty() == "io.eventsourcingdb.api.api-token-verified") - .then_some(()) - .ok_or(ClientError::APITokenInvalid) - } -} diff --git a/src/client/client_request/ping.rs b/src/client/client_request/ping.rs new file mode 100644 index 0000000..5eaedd5 --- /dev/null +++ b/src/client/client_request/ping.rs @@ -0,0 +1,24 @@ +use reqwest::Method; + +use crate::{error::ClientError, event::ManagementEvent}; + +use super::{ClientRequest, OneShotRequest}; + +/// Ping the Database instance +#[derive(Debug, Clone, Copy)] +pub struct PingRequest; + +impl ClientRequest for PingRequest { + const URL_PATH: &'static str = "/api/v1/ping"; + const METHOD: Method = Method::GET; +} + +impl OneShotRequest for PingRequest { + type Response = ManagementEvent; + + fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { + (response.ty() == "io.eventsourcingdb.api.ping-received") + .then_some(()) + .ok_or(ClientError::PingFailed) + } +} diff --git a/src/client/client_request/verify_api_token.rs b/src/client/client_request/verify_api_token.rs new file mode 100644 index 0000000..c1c015e --- /dev/null +++ b/src/client/client_request/verify_api_token.rs @@ -0,0 +1,24 @@ +use reqwest::Method; + +use crate::{error::ClientError, event::ManagementEvent}; + +use super::{ClientRequest, OneShotRequest}; + +/// Verify the API token +#[derive(Debug, Clone, Copy)] +pub struct VerifyApiTokenRequest; + +impl ClientRequest for VerifyApiTokenRequest { + const URL_PATH: &'static str = "/api/v1/verify-api-token"; + const METHOD: Method = Method::POST; +} + +impl OneShotRequest for VerifyApiTokenRequest { + type Response = ManagementEvent; + + fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { + (response.ty() == "io.eventsourcingdb.api.api-token-verified") + .then_some(()) + .ok_or(ClientError::APITokenInvalid) + } +} diff --git a/src/client/client_request/write_events.rs b/src/client/client_request/write_events.rs new file mode 100644 index 0000000..db40863 --- /dev/null +++ b/src/client/client_request/write_events.rs @@ -0,0 +1,26 @@ +use super::{ClientRequest, OneShotRequest}; +use crate::{ + client::Precondition, + error::ClientError, + event::{Event, EventCandidate}, +}; +use reqwest::Method; +use serde::Serialize; + +#[derive(Debug, Serialize)] +pub struct WriteEventsRequest { + pub events: Vec, + pub preconditions: Vec, +} + +impl ClientRequest for WriteEventsRequest { + const URL_PATH: &'static str = "/api/v1/write-events"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} +impl OneShotRequest for WriteEventsRequest { + type Response = Vec; +} diff --git a/src/client/precondition.rs b/src/client/precondition.rs new file mode 100644 index 0000000..fbb47d0 --- /dev/null +++ b/src/client/precondition.rs @@ -0,0 +1,22 @@ +use serde::Serialize; + +/// Enum for different preconditions that can be used when writing events +#[derive(Debug, Serialize)] +#[serde(tag = "type", content = "payload")] +pub enum Precondition { + /// Check if the subject with the given path has no other events + #[serde(rename = "isSubjectPristine")] + IsSubjectPristine { + /// The subject to check + subject: String, + }, + /// Check if the subject with the given path has no other events + #[serde(rename = "isSubjectOnEventId")] + IsSubjectOnEventId { + /// The subject to check + subject: String, + /// The event ID to check against + #[serde(rename = "eventId")] + event_id: String, + }, +} diff --git a/src/error.rs b/src/error.rs index 89b61b9..749df80 100644 --- a/src/error.rs +++ b/src/error.rs @@ -26,7 +26,7 @@ pub enum ClientError { SerdeJsonError(#[from] serde_json::Error), /// The DB returned an error+ #[error("The DB returned an error: {0}")] - DBError(StatusCode, String), + DBApiError(StatusCode, String), /// There was a problem with the `cloudevents` message #[cfg(feature = "cloudevents")] #[error("The CloudEvents message is invalid: {0}")] @@ -46,3 +46,12 @@ pub enum ContainerError { #[error("URL parsing error: {0}")] URLParseError(#[from] url::ParseError), } + +/// Error type for the event +#[derive(Debug, thiserror::Error)] +pub enum EventError { + /// The passed cloudevent is invalid + #[cfg(feature = "cloudevents")] + #[error("The passed cloudevent is invalid")] + InvalidCloudevent, +} diff --git a/src/event.rs b/src/event.rs index 09f88e3..f927015 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,5 +1,14 @@ //! This module holds all event types that are send between the client and the database. -mod management_event; +mod event_types; +mod trace_info; -pub use management_event::ManagementEvent; +// Reexport relevant types to flatten the module graph for consumers and +// keep private encapsulation of implementation details. +pub use event_types::event::Event; +pub use event_types::event_candidate::EventCandidate; +pub use event_types::management_event::ManagementEvent; +pub use trace_info::TraceInfo; + +#[cfg(feature="cloudevents")] +use crate::error::EventError; diff --git a/src/event/event_types.rs b/src/event/event_types.rs new file mode 100644 index 0000000..02837c9 --- /dev/null +++ b/src/event/event_types.rs @@ -0,0 +1,5 @@ +//! This module holds all possible event types this sdk works with. + +pub mod event; +pub mod event_candidate; +pub mod management_event; diff --git a/src/event/event_types/event.rs b/src/event/event_types/event.rs new file mode 100644 index 0000000..1c730ce --- /dev/null +++ b/src/event/event_types/event.rs @@ -0,0 +1,127 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::event::{trace_info::TraceInfo, EventCandidate}; + + +/// Represents an event that has been received from the DB. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Event { + data: Value, + datacontenttype: String, + hash: String, + id: String, + predecessorhash: String, + source: String, + specversion: String, + subject: String, + time: DateTime, + #[serde(flatten)] + traceinfo: Option, + r#type: String, +} + +impl Event { + /// Get the data of an event. + #[must_use] + pub fn data(&self) -> &Value { + &self.data + } + /// Get the data content type of an event. + #[must_use] + pub fn datacontenttype(&self) -> &str { + &self.datacontenttype + } + /// Get the hash of an event. + #[must_use] + pub fn hash(&self) -> &str { + &self.hash + } + /// Get the ID of an event. + /// In eventsourcingdb, this is the sequence number of the event. + #[must_use] + pub fn id(&self) -> &str { + &self.id + } + /// Get the predecessor hash of an event. + #[must_use] + pub fn predecessorhash(&self) -> &str { + &self.predecessorhash + } + /// Get the source of an event. + #[must_use] + pub fn source(&self) -> &str { + &self.source + } + /// Get the spec version of an event. + /// This is always `1.0`. + #[must_use] + pub fn specversion(&self) -> &str { + &self.specversion + } + /// Get the subject of an event. + #[must_use] + pub fn subject(&self) -> &str { + &self.subject + } + /// Get the time of an event. + #[must_use] + pub fn time(&self) -> &DateTime { + &self.time + } + /// Get the traceparent of an event. + #[must_use] + pub fn traceparent(&self) -> Option<&str> { + self.traceinfo.as_ref().map(TraceInfo::traceparent) + } + /// Get the tracestate of an event. + #[must_use] + pub fn tracestate(&self) -> Option<&str> { + self.traceinfo.as_ref().and_then(|t| t.tracestate()) + } + /// Get the traceinfo of an event. + #[must_use] + pub fn traceinfo(&self) -> Option<&TraceInfo> { + self.traceinfo.as_ref() + } + /// Get the type of an event. + #[must_use] + pub fn ty(&self) -> &str { + &self.r#type + } +} + +impl From for EventCandidate { + fn from(event: Event) -> Self { + Self { + data: event.data, + source: event.source, + subject: event.subject, + r#type: event.r#type, + traceinfo: event.traceinfo, + } + } +} + +#[cfg(feature = "cloudevents")] +impl From for cloudevents::Event { + fn from(event: Event) -> Self { + let mut builder = cloudevents::EventBuilderV10::new() + .source(event.source) + .subject(event.subject) + .ty(event.r#type) + .id(event.id) + .time(event.time.to_string()) + .data(event.datacontenttype, event.data); + + if let Some(traceinfo) = event.traceinfo { + builder = builder.extension("traceparent", traceinfo.traceparent()); + if let Some(tracestate) = traceinfo.tracestate() { + builder = builder.extension("tracestate", tracestate); + } + } + + builder.build().expect("Failed to build cloudevent") + } +} diff --git a/src/event/event_types/event_candidate.rs b/src/event/event_types/event_candidate.rs new file mode 100644 index 0000000..9d5947d --- /dev/null +++ b/src/event/event_types/event_candidate.rs @@ -0,0 +1,57 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use typed_builder::TypedBuilder; +use crate::event::trace_info::TraceInfo; + +#[cfg(feature = "cloudevents")] +use crate::error::EventError; + +/// Represents an event candidate that can be sent to the DB. +/// This is a simplified version of the [`super::Event`] type. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] +pub struct EventCandidate { + /// The data of the event, serialized as JSON + #[builder(setter(into))] + pub data: Value, + /// The source of the event. + /// This has to be a valid URI. + #[builder(setter(into))] + pub source: String, + /// The subject of the event. + /// This has to start with a `/`. + #[builder(setter(into))] + pub subject: String, + /// The type of the event. + /// This has to be a reverse domain name. + #[builder(setter(into))] + pub r#type: String, + /// The traceparent of the event. + /// This is used for distributed tracing. + #[builder(default, setter(strip_option))] + #[serde(skip_serializing_if = "Option::is_none", flatten)] + pub traceinfo: Option, +} + +#[cfg(feature = "cloudevents")] +impl TryFrom for EventCandidate { + type Error = EventError; + fn try_from(event: cloudevents::Event) -> Result { + let data = match event.data() { + Some(cloudevents::Data::Json(json)) => json.to_owned(), + _ => return Err(EventError::InvalidCloudevent), + }; + let subject = match event.subject() { + Some(subject) => subject.to_string(), + None => return Err(EventError::InvalidCloudevent), + }; + let traceinfo = TraceInfo::from_cloudevent(&event)?; + + Ok(Self { + data, + source: event.source().to_string(), + subject, + r#type: event.ty().to_string(), + traceinfo, + }) + } +} diff --git a/src/event/management_event.rs b/src/event/event_types/management_event.rs similarity index 100% rename from src/event/management_event.rs rename to src/event/event_types/management_event.rs diff --git a/src/event/trace_info.rs b/src/event/trace_info.rs new file mode 100644 index 0000000..7647423 --- /dev/null +++ b/src/event/trace_info.rs @@ -0,0 +1,70 @@ +//! This module holds supporting traits for the "Tracing" feature of eventsourcingdb. + +use serde::{Deserialize, Serialize}; + +/// Represents the trace information of an event. +/// This is used for distributed tracing. +/// It can either be a traceparent or a traceparent and tracestate. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum TraceInfo { + // LEAVE ORDER AS IS + // This is important for deserialization as the traceparent is always present + + /// The traceparent and tracestate of the event. + /// This is used for distributed tracing. + WithState { + /// The traceparent of the event. + /// This is used for distributed tracing. + traceparent: String, + /// The tracestate of the event. + /// This is used for distributed tracing. + tracestate: String, + }, + /// The traceparent of the event. + /// This is used for distributed tracing. + Traceparent { + /// The traceparent of the event. + /// This is used for distributed tracing. + traceparent: String, + }, +} + +impl TraceInfo { + /// Get the traceparent of the event. + #[must_use] + pub fn traceparent(&self) -> &str { + match self { + Self::Traceparent { traceparent } | Self::WithState { traceparent, .. } => traceparent, + } + } + /// Get the tracestate of the event. + #[must_use] + pub fn tracestate(&self) -> Option<&str> { + match self { + Self::Traceparent { .. } => None, + Self::WithState { tracestate, .. } => Some(tracestate), + } + } + + /// Create a new `TraceInfo` from a cloudevent. + /// This will return None if the cloudevent does not contain a traceparent or tracestate. + /// + /// # Errors + /// If the cloudevent contains a tracestate but no traceparent, an error will be returned. + #[cfg(feature="cloudevents")] + pub fn from_cloudevent(event: &cloudevents::Event) -> Result, EventError> { + let traceparent = event.extension("traceparent").map(ToString::to_string); + let tracestate = event.extension("tracestate").map(ToString::to_string); + + match (traceparent, tracestate) { + (Some(traceparent), Some(tracestate)) => Ok(Some(Self::WithState { + traceparent, + tracestate, + })), + (Some(traceparent), None) => Ok(Some(Self::Traceparent { traceparent })), + (None, None) => Ok(None), + (None, Some(_)) => Err(EventError::InvalidCloudevent), + } + } +} diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs new file mode 100644 index 0000000..917dfd1 --- /dev/null +++ b/tests/utils/mod.rs @@ -0,0 +1,76 @@ +use chrono::{TimeDelta, Utc}; +use eventsourcingdb_client_rust::event::{Event, EventCandidate}; +use serde_json::{Value, json}; + +pub fn create_test_eventcandidate( + subject: impl ToString, + data: impl Into, +) -> EventCandidate { + EventCandidate::builder() + .source("https://www.eventsourcingdb.io".to_string()) + .data(data.into()) + .subject(subject.to_string()) + .r#type("io.eventsourcingdb.test".to_string()) + .build() +} + +pub fn create_numbered_eventcandidates(count: usize) -> Vec { + (0..count) + .map(|_| create_test_eventcandidate("/test", json!({"value": count}))) + .collect() +} + +pub fn assert_event_match_eventcandidate( + event: &Event, + event_candidate: &EventCandidate, + previous_event_hash: Option<&str>, + expected_id: Option, +) { + // check provided data + assert_eq!(event.data(), &event_candidate.data, "Data mismatch"); + assert_eq!(event.source(), &event_candidate.source, "Source mismatch"); + assert_eq!( + event.subject(), + &event_candidate.subject, + "Subject mismatch" + ); + assert_eq!(event.ty(), &event_candidate.r#type, "Type mismatch"); + assert_eq!( + event.traceinfo(), + event_candidate.traceinfo.as_ref(), + "Traceparent mismatch" + ); + + // Check added metadata + assert_eq!( + event.datacontenttype(), + "application/json", + "Data content type should be application/json" + ); + assert_eq!(event.hash().len(), 64, "Hash should be present"); + assert_eq!( + event.id(), + expected_id.unwrap_or_default().to_string(), + "ID should be present" + ); + assert_eq!( + event.predecessorhash(), + previous_event_hash + .unwrap_or("0000000000000000000000000000000000000000000000000000000000000000"), + "Time should be present" + ); + assert_eq!(event.specversion(), "1.0", "Spec version should be 1.0"); + assert!( + (Utc::now() - *event.time()) < TimeDelta::seconds(60), + "Time should be present" + ); +} + +pub fn assert_events_match_eventcandidates(events: &[Event], event_candidates: &[EventCandidate]) { + assert_eq!(events.len(), event_candidates.len(), "Length mismatch"); + let mut previous_event_hash: Option<&str> = None; + for (i, (event, event_candidate)) in events.iter().zip(event_candidates.iter()).enumerate() { + assert_event_match_eventcandidate(event, event_candidate, previous_event_hash, Some(i)); + previous_event_hash = Some(event.hash()); + } +} diff --git a/tests/write_events.rs b/tests/write_events.rs new file mode 100644 index 0000000..4883614 --- /dev/null +++ b/tests/write_events.rs @@ -0,0 +1,338 @@ +mod utils; + +use eventsourcingdb_client_rust::{ + client::Precondition, container::Container, event::{EventCandidate, TraceInfo} +}; +use serde_json::json; +use utils::{ + assert_event_match_eventcandidate, assert_events_match_eventcandidates, + create_numbered_eventcandidates, create_test_eventcandidate, +}; + +#[tokio::test] +async fn write_single_event() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event = create_test_eventcandidate("/test", json!({"value": 1})); + let result = client.write_events(vec![event.clone()], vec![]).await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let mut response = result.unwrap(); + assert_eq!(response.len(), 1, "Expected one event in the response"); + let response_event = response.pop().unwrap(); + + assert_event_match_eventcandidate(&response_event, &event, None, None); +} + +#[tokio::test] +async fn write_multiple_events() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidates = create_numbered_eventcandidates(10); + + let result = client.write_events(event_candidates.clone(), vec![]).await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let response = result.unwrap(); + + assert_events_match_eventcandidates(&response, &event_candidates); +} + +#[tokio::test] +async fn write_event_with_is_pristine_condition_on_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidate = create_test_eventcandidate("/test/42", json!({"value": 1})); + let result = client + .write_events( + vec![event_candidate.clone()], + vec![ + Precondition::IsSubjectPristine { + subject: event_candidate.subject.clone(), + }, + ], + ) + .await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let mut response = result.unwrap(); + assert_eq!(response.len(), 1, "Expected one event in the response"); + let response_event = response.pop().unwrap(); + + assert_event_match_eventcandidate(&response_event, &event_candidate, None, None); +} + +#[tokio::test] +async fn write_event_with_is_pristine_condition_on_non_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + client + .write_events(vec![event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event"); + let result = client + .write_events( + vec![event_candidate.clone()], + vec![ + Precondition::IsSubjectPristine { + subject: event_candidate.subject.clone(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_events_with_is_pristine_condition_on_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidates = vec![ + create_test_eventcandidate("/test/42", json!({"value": 1})), + create_test_eventcandidate("/test/42", json!({"value": 1})), + ]; + let result = client + .write_events( + event_candidates.clone(), + vec![ + Precondition::IsSubjectPristine { + subject: event_candidates[1].subject.clone(), + }, + ], + ) + .await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let response = result.unwrap(); + assert_events_match_eventcandidates(&response, &event_candidates); +} + +#[tokio::test] +async fn write_events_with_is_pristine_condition_on_non_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let fill_event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + client + .write_events(vec![fill_event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event"); + let event_candidates = vec![ + create_test_eventcandidate("/test2", json!({"value": 1})), + fill_event_candidate.clone(), + ]; + let result = client + .write_events( + event_candidates, + vec![ + Precondition::IsSubjectPristine { + subject: fill_event_candidate.subject.clone(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_event_with_is_subject_on_event_id_condition_on_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidate = create_test_eventcandidate("/test/42", json!({"value": 1})); + let result = client + .write_events( + vec![event_candidate.clone()], + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: "100".to_string(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_event_with_is_subject_on_event_id_condition_on_non_empty_subject_correct_id() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + let written = client + .write_events(vec![event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event") + .pop() + .unwrap(); + let result = client + .write_events( + vec![event_candidate.clone()], + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: written.id().to_string(), + }, + ], + ) + .await; + assert!(result.is_ok(), "Writing the event failed: {:?}", result); +} + +#[tokio::test] +async fn write_event_with_is_subject_on_event_id_condition_on_non_empty_subject_wrong_id() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + client + .write_events(vec![event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event") + .pop() + .unwrap(); + let result = client + .write_events( + vec![event_candidate.clone()], + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: 100.to_string(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_events_with_is_subject_on_event_id_condition_on_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidates = vec![ + create_test_eventcandidate("/test/42", json!({"value": 1})), + create_test_eventcandidate("/test/42", json!({"value": 1})), + ]; + let result = client + .write_events( + event_candidates.clone(), + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidates[1].subject.clone(), + event_id: "100".to_string(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_events_with_is_subject_on_event_id_condition_on_non_empty_subject_correct_id() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let fill_event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + let written = client + .write_events(vec![fill_event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event") + .pop() + .unwrap(); + let event_candidates = vec![ + create_test_eventcandidate("/test2", json!({"value": 1})), + fill_event_candidate.clone(), + ]; + let result = client + .write_events( + event_candidates, + vec![ + Precondition::IsSubjectOnEventId { + subject: fill_event_candidate.subject.clone(), + event_id: written.id().to_string(), + }, + ], + ) + .await; + assert!(result.is_ok(), "Writing the events failed: {:?}", result); +} + +#[tokio::test] +async fn write_events_with_is_subject_on_event_id_condition_on_non_empty_subject_wrong_id() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let fill_event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + client + .write_events(vec![fill_event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event") + .pop() + .unwrap(); + let event_candidates = vec![ + create_test_eventcandidate("/test2", json!({"value": 1})), + fill_event_candidate.clone(), + ]; + let result = client + .write_events( + event_candidates, + vec![ + Precondition::IsSubjectOnEventId { + subject: fill_event_candidate.subject.clone(), + event_id: 100.to_string(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_single_event_with_traceparent() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event = EventCandidate::builder() + .source("https://www.eventsourcingdb.io".to_string()) + .data(json!({"value": 1})) + .subject("/test".to_string()) + .r#type("io.eventsourcingdb.test".to_string()) + .traceinfo(TraceInfo::Traceparent { + traceparent: "00-01234567012345670123456701234567-0123456701234567-00".to_string(), + }) + .build(); + let result = client.write_events(vec![event.clone()], vec![]).await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let mut response = result.unwrap(); + assert_eq!(response.len(), 1, "Expected one event in the response"); + let response_event = response.pop().unwrap(); + + assert_event_match_eventcandidate(&response_event, &event, None, None); +} + +#[tokio::test] +async fn write_single_event_with_traceparent_and_state() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event = EventCandidate::builder() + .source("https://www.eventsourcingdb.io".to_string()) + .data(json!({"value": 1})) + .subject("/test".to_string()) + .r#type("io.eventsourcingdb.test".to_string()) + .traceinfo(TraceInfo::WithState { + traceparent: "00-01234567012345670123456701234567-0123456701234567-00".to_string(), + tracestate: "state=12345".to_string(), + }) + .build(); + let result = client.write_events(vec![event.clone()], vec![]).await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let mut response = result.unwrap(); + assert_eq!(response.len(), 1, "Expected one event in the response"); + let response_event = response.pop().unwrap(); + + println!("Response event: {:?}", response_event); + + assert_event_match_eventcandidate(&response_event, &event, None, None); +}