From 11b4dc5758306c2f52eaa5778c9502b7b1403fe2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Tue, 13 May 2025 14:51:53 +0200 Subject: [PATCH 1/3] feat(client): add writing events to client adhering to the "Writing Events" compliance criteria --- Cargo.lock | 21 +++ Cargo.toml | 1 + README.md | 2 +- src/client.rs | 47 ++++- src/client/client_request.rs | 29 ++- src/client/precondition.rs | 22 +++ src/error.rs | 9 + src/event.rs | 78 ++++++++ src/event/event.rs | 126 +++++++++++++ src/event/event_candidate.rs | 57 ++++++ tests/utils/mod.rs | 76 ++++++++ tests/write_events.rs | 338 +++++++++++++++++++++++++++++++++++ 12 files changed, 799 insertions(+), 7 deletions(-) create mode 100644 src/client/precondition.rs create mode 100644 src/event/event.rs create mode 100644 src/event/event_candidate.rs create mode 100644 tests/utils/mod.rs create mode 100644 tests/write_events.rs diff --git a/Cargo.lock b/Cargo.lock index f2f3425..7159932 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,7 @@ dependencies = [ "thiserror 2.0.12", "tokio", "tokio-test", + "typed-builder", "url", ] @@ -2293,6 +2294,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typed-builder" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce63bcaf7e9806c206f7d7b9c1f38e0dce8bb165a80af0898161058b19248534" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60d8d828da2a3d759d3519cdf29a5bac49c77d039ad36d0782edadbf9cd5415b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.18" diff --git a/Cargo.toml b/Cargo.toml index 1a22ad5..c84cef7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ testcontainers = { version = "0.24.0", features = [ "http_wait", ], optional = true } thiserror = "2.0.12" +typed-builder = "0.21.0" [dev-dependencies] eventsourcingdb-client-rust = { path = ".", features = ["testcontainer"] } diff --git a/README.md b/README.md index 7e04257..1889728 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This is a work in progress and not yet ready for production use. Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/) the SDK covers these criteria: - πŸš€ [Essentials](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#essentials) -- ❌ [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events) +- πŸš€ [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events) - ❌ [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events) - ❌ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql) - ❌ [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events) diff --git a/src/client.rs b/src/client.rs index 3d9211f..92927a4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,13 +18,18 @@ //! If this works, it means that the client is correctly configured and you can use it to make requests to the DB. mod client_request; +mod precondition; -use client_request::{ClientRequest, PingRequest, VerifyApiTokenRequest}; +use client_request::{ClientRequest, PingRequest, VerifyApiTokenRequest, WriteEvents}; +pub use precondition::Precondition; use reqwest; use url::Url; -use crate::error::ClientError; +use crate::{ + error::ClientError, + event::{Event, EventCandidate}, +}; /// Client for an [EventsourcingDB](https://www.eventsourcingdb.io/) instance. #[derive(Debug)] @@ -149,4 +154,42 @@ impl Client { let _ = self.request(VerifyApiTokenRequest).await?; Ok(()) } + + /// Writes events to the DB instance. + /// + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let candidates = vec![ + /// EventCandidate::builder() + /// .source("https://www.eventsourcingdb.io".to_string()) + /// .data(json!({"value": 1})) + /// .subject("/test".to_string()) + /// .r#type("io.eventsourcingdb.test".to_string()) + /// .build() + /// ]; + /// let written_events = client.write_events(candidates, vec![]).await.expect("Failed to write events"); + /// # }) + /// ``` + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + pub async fn write_events( + &self, + events: Vec, + preconditions: Vec, + ) -> Result, ClientError> { + self.request(WriteEvents { + events, + preconditions, + }) + .await + } } diff --git a/src/client/client_request.rs b/src/client/client_request.rs index cc389a9..e6db97b 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -1,9 +1,14 @@ //! This is a purely internal module to represent client requests to the database. use reqwest::Method; -use serde_json::Value; +use serde::Serialize; -use crate::{error::ClientError, event::ManagementEvent}; +use crate::{ + error::ClientError, + event::{Event, EventCandidate, ManagementEvent}, +}; + +use super::precondition::Precondition; /// Represents a request to the database client pub trait ClientRequest { @@ -22,8 +27,8 @@ pub trait ClientRequest { } /// Returns the body for the request - fn body(&self) -> Option> { - None + fn body(&self) -> Option> { + None::> } /// Validate the response from the database @@ -63,3 +68,19 @@ impl ClientRequest for VerifyApiTokenRequest { .ok_or(ClientError::APITokenInvalid) } } + +#[derive(Debug, Serialize)] +pub struct WriteEvents { + pub events: Vec, + pub preconditions: Vec, +} + +impl ClientRequest for WriteEvents { + const URL_PATH: &'static str = "/api/v1/write-events"; + const METHOD: Method = Method::POST; + type Response = Vec; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} diff --git a/src/client/precondition.rs b/src/client/precondition.rs new file mode 100644 index 0000000..fbb47d0 --- /dev/null +++ b/src/client/precondition.rs @@ -0,0 +1,22 @@ +use serde::Serialize; + +/// Enum for different preconditions that can be used when writing events +#[derive(Debug, Serialize)] +#[serde(tag = "type", content = "payload")] +pub enum Precondition { + /// Check if the subject with the given path has no other events + #[serde(rename = "isSubjectPristine")] + IsSubjectPristine { + /// The subject to check + subject: String, + }, + /// Check if the subject with the given path has no other events + #[serde(rename = "isSubjectOnEventId")] + IsSubjectOnEventId { + /// The subject to check + subject: String, + /// The event ID to check against + #[serde(rename = "eventId")] + event_id: String, + }, +} diff --git a/src/error.rs b/src/error.rs index 89b61b9..0b03859 100644 --- a/src/error.rs +++ b/src/error.rs @@ -46,3 +46,12 @@ pub enum ContainerError { #[error("URL parsing error: {0}")] URLParseError(#[from] url::ParseError), } + +/// Error type for the event +#[derive(Debug, thiserror::Error)] +pub enum EventError { + /// The passed cloudevent is invalid + #[cfg(feature = "cloudevents")] + #[error("The passed cloudevent is invalid")] + InvalidCloudevent, +} diff --git a/src/event.rs b/src/event.rs index 09f88e3..7fc0dc1 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,5 +1,83 @@ //! This module holds all event types that are send between the client and the database. +// Allow module inception here, since "event" is the expected as the name for both modules. +// Renaming would be possible, but would probably lead to more confusion. +#[allow(clippy::module_inception)] +mod event; +mod event_candidate; mod management_event; +pub use event::Event; +pub use event_candidate::EventCandidate; pub use management_event::ManagementEvent; +use serde::{Deserialize, Serialize}; + +#[cfg(feature="cloudevents")] +use crate::error::EventError; + +/// Represents the trace information of an event. +/// This is used for distributed tracing. +/// It can either be a traceparent or a traceparent and tracestate. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum TraceInfo { + // LEAVE ORDER AS IS + // This is important for deserialization as the traceparent is always present + + /// The traceparent and tracestate of the event. + /// This is used for distributed tracing. + WithState { + /// The traceparent of the event. + /// This is used for distributed tracing. + traceparent: String, + /// The tracestate of the event. + /// This is used for distributed tracing. + tracestate: String, + }, + /// The traceparent of the event. + /// This is used for distributed tracing. + Traceparent { + /// The traceparent of the event. + /// This is used for distributed tracing. + traceparent: String, + }, +} + +impl TraceInfo { + /// Get the traceparent of the event. + #[must_use] + pub fn traceparent(&self) -> &str { + match self { + Self::Traceparent { traceparent } | Self::WithState { traceparent, .. } => traceparent, + } + } + /// Get the tracestate of the event. + #[must_use] + pub fn tracestate(&self) -> Option<&str> { + match self { + Self::Traceparent { .. } => None, + Self::WithState { tracestate, .. } => Some(tracestate), + } + } + + /// Create a new `TraceInfo` from a cloudevent. + /// This will return None if the cloudevent does not contain a traceparent or tracestate. + /// + /// # Errors + /// If the cloudevent contains a tracestate but no traceparent, an error will be returned. + #[cfg(feature="cloudevents")] + pub fn from_cloudevent(event: &cloudevents::Event) -> Result, EventError> { + let traceparent = event.extension("traceparent").map(ToString::to_string); + let tracestate = event.extension("tracestate").map(ToString::to_string); + + match (traceparent, tracestate) { + (Some(traceparent), Some(tracestate)) => Ok(Some(Self::WithState { + traceparent, + tracestate, + })), + (Some(traceparent), None) => Ok(Some(Self::Traceparent { traceparent })), + (None, None) => Ok(None), + (None, Some(_)) => Err(EventError::InvalidCloudevent), + } + } +} diff --git a/src/event/event.rs b/src/event/event.rs new file mode 100644 index 0000000..911f4f5 --- /dev/null +++ b/src/event/event.rs @@ -0,0 +1,126 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use super::{EventCandidate, TraceInfo}; + +/// Represents an event that has been received from the DB. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Event { + data: Value, + datacontenttype: String, + hash: String, + id: String, + predecessorhash: String, + source: String, + specversion: String, + subject: String, + time: DateTime, + #[serde(flatten)] + traceinfo: Option, + r#type: String, +} + +impl Event { + /// Get the data of an event. + #[must_use] + pub fn data(&self) -> &Value { + &self.data + } + /// Get the data content type of an event. + #[must_use] + pub fn datacontenttype(&self) -> &str { + &self.datacontenttype + } + /// Get the hash of an event. + #[must_use] + pub fn hash(&self) -> &str { + &self.hash + } + /// Get the ID of an event. + /// In eventsourcingdb, this is the sequence number of the event. + #[must_use] + pub fn id(&self) -> &str { + &self.id + } + /// Get the predecessor hash of an event. + #[must_use] + pub fn predecessorhash(&self) -> &str { + &self.predecessorhash + } + /// Get the source of an event. + #[must_use] + pub fn source(&self) -> &str { + &self.source + } + /// Get the spec version of an event. + /// This is always `1.0`. + #[must_use] + pub fn specversion(&self) -> &str { + &self.specversion + } + /// Get the subject of an event. + #[must_use] + pub fn subject(&self) -> &str { + &self.subject + } + /// Get the time of an event. + #[must_use] + pub fn time(&self) -> &DateTime { + &self.time + } + /// Get the traceparent of an event. + #[must_use] + pub fn traceparent(&self) -> Option<&str> { + self.traceinfo.as_ref().map(TraceInfo::traceparent) + } + /// Get the tracestate of an event. + #[must_use] + pub fn tracestate(&self) -> Option<&str> { + self.traceinfo.as_ref().and_then(|t| t.tracestate()) + } + /// Get the traceinfo of an event. + #[must_use] + pub fn traceinfo(&self) -> Option<&TraceInfo> { + self.traceinfo.as_ref() + } + /// Get the type of an event. + #[must_use] + pub fn ty(&self) -> &str { + &self.r#type + } +} + +impl From for EventCandidate { + fn from(event: Event) -> Self { + Self { + data: event.data, + source: event.source, + subject: event.subject, + r#type: event.r#type, + traceinfo: event.traceinfo, + } + } +} + +#[cfg(feature = "cloudevents")] +impl From for cloudevents::Event { + fn from(event: Event) -> Self { + let mut builder = cloudevents::EventBuilderV10::new() + .source(event.source) + .subject(event.subject) + .ty(event.r#type) + .id(event.id) + .time(event.time.to_string()) + .data(event.datacontenttype, event.data); + + if let Some(traceinfo) = event.traceinfo { + builder = builder.extension("traceparent", traceinfo.traceparent()); + if let Some(tracestate) = traceinfo.tracestate() { + builder = builder.extension("tracestate", tracestate); + } + } + + builder.build().expect("Failed to build cloudevent") + } +} diff --git a/src/event/event_candidate.rs b/src/event/event_candidate.rs new file mode 100644 index 0000000..33a6b5a --- /dev/null +++ b/src/event/event_candidate.rs @@ -0,0 +1,57 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use typed_builder::TypedBuilder; + +use super::TraceInfo; +#[cfg(feature = "cloudevents")] +use crate::error::EventError; + +/// Represents an event candidate that can be sent to the DB. +/// This is a simplified version of the [`super::Event`] type. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] +pub struct EventCandidate { + /// The data of the event, serialized as JSON + #[builder(setter(into))] + pub data: Value, + /// The source of the event. + /// This has to be a valid URI. + #[builder(setter(into))] + pub source: String, + /// The subject of the event. + /// This has to start with a `/`. + #[builder(setter(into))] + pub subject: String, + /// The type of the event. + /// This has to be a reverse domain name. + #[builder(setter(into))] + pub r#type: String, + /// The traceparent of the event. + /// This is used for distributed tracing. + #[builder(default, setter(strip_option))] + #[serde(skip_serializing_if = "Option::is_none", flatten)] + pub traceinfo: Option, +} + +#[cfg(feature = "cloudevents")] +impl TryFrom for EventCandidate { + type Error = EventError; + fn try_from(event: cloudevents::Event) -> Result { + let data = match event.data() { + Some(cloudevents::Data::Json(json)) => json.to_owned(), + _ => return Err(EventError::InvalidCloudevent), + }; + let subject = match event.subject() { + Some(subject) => subject.to_string(), + None => return Err(EventError::InvalidCloudevent), + }; + let traceinfo = TraceInfo::from_cloudevent(&event)?; + + Ok(Self { + data, + source: event.source().to_string(), + subject, + r#type: event.ty().to_string(), + traceinfo, + }) + } +} diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs new file mode 100644 index 0000000..917dfd1 --- /dev/null +++ b/tests/utils/mod.rs @@ -0,0 +1,76 @@ +use chrono::{TimeDelta, Utc}; +use eventsourcingdb_client_rust::event::{Event, EventCandidate}; +use serde_json::{Value, json}; + +pub fn create_test_eventcandidate( + subject: impl ToString, + data: impl Into, +) -> EventCandidate { + EventCandidate::builder() + .source("https://www.eventsourcingdb.io".to_string()) + .data(data.into()) + .subject(subject.to_string()) + .r#type("io.eventsourcingdb.test".to_string()) + .build() +} + +pub fn create_numbered_eventcandidates(count: usize) -> Vec { + (0..count) + .map(|_| create_test_eventcandidate("/test", json!({"value": count}))) + .collect() +} + +pub fn assert_event_match_eventcandidate( + event: &Event, + event_candidate: &EventCandidate, + previous_event_hash: Option<&str>, + expected_id: Option, +) { + // check provided data + assert_eq!(event.data(), &event_candidate.data, "Data mismatch"); + assert_eq!(event.source(), &event_candidate.source, "Source mismatch"); + assert_eq!( + event.subject(), + &event_candidate.subject, + "Subject mismatch" + ); + assert_eq!(event.ty(), &event_candidate.r#type, "Type mismatch"); + assert_eq!( + event.traceinfo(), + event_candidate.traceinfo.as_ref(), + "Traceparent mismatch" + ); + + // Check added metadata + assert_eq!( + event.datacontenttype(), + "application/json", + "Data content type should be application/json" + ); + assert_eq!(event.hash().len(), 64, "Hash should be present"); + assert_eq!( + event.id(), + expected_id.unwrap_or_default().to_string(), + "ID should be present" + ); + assert_eq!( + event.predecessorhash(), + previous_event_hash + .unwrap_or("0000000000000000000000000000000000000000000000000000000000000000"), + "Time should be present" + ); + assert_eq!(event.specversion(), "1.0", "Spec version should be 1.0"); + assert!( + (Utc::now() - *event.time()) < TimeDelta::seconds(60), + "Time should be present" + ); +} + +pub fn assert_events_match_eventcandidates(events: &[Event], event_candidates: &[EventCandidate]) { + assert_eq!(events.len(), event_candidates.len(), "Length mismatch"); + let mut previous_event_hash: Option<&str> = None; + for (i, (event, event_candidate)) in events.iter().zip(event_candidates.iter()).enumerate() { + assert_event_match_eventcandidate(event, event_candidate, previous_event_hash, Some(i)); + previous_event_hash = Some(event.hash()); + } +} diff --git a/tests/write_events.rs b/tests/write_events.rs new file mode 100644 index 0000000..4883614 --- /dev/null +++ b/tests/write_events.rs @@ -0,0 +1,338 @@ +mod utils; + +use eventsourcingdb_client_rust::{ + client::Precondition, container::Container, event::{EventCandidate, TraceInfo} +}; +use serde_json::json; +use utils::{ + assert_event_match_eventcandidate, assert_events_match_eventcandidates, + create_numbered_eventcandidates, create_test_eventcandidate, +}; + +#[tokio::test] +async fn write_single_event() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event = create_test_eventcandidate("/test", json!({"value": 1})); + let result = client.write_events(vec![event.clone()], vec![]).await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let mut response = result.unwrap(); + assert_eq!(response.len(), 1, "Expected one event in the response"); + let response_event = response.pop().unwrap(); + + assert_event_match_eventcandidate(&response_event, &event, None, None); +} + +#[tokio::test] +async fn write_multiple_events() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidates = create_numbered_eventcandidates(10); + + let result = client.write_events(event_candidates.clone(), vec![]).await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let response = result.unwrap(); + + assert_events_match_eventcandidates(&response, &event_candidates); +} + +#[tokio::test] +async fn write_event_with_is_pristine_condition_on_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidate = create_test_eventcandidate("/test/42", json!({"value": 1})); + let result = client + .write_events( + vec![event_candidate.clone()], + vec![ + Precondition::IsSubjectPristine { + subject: event_candidate.subject.clone(), + }, + ], + ) + .await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let mut response = result.unwrap(); + assert_eq!(response.len(), 1, "Expected one event in the response"); + let response_event = response.pop().unwrap(); + + assert_event_match_eventcandidate(&response_event, &event_candidate, None, None); +} + +#[tokio::test] +async fn write_event_with_is_pristine_condition_on_non_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + client + .write_events(vec![event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event"); + let result = client + .write_events( + vec![event_candidate.clone()], + vec![ + Precondition::IsSubjectPristine { + subject: event_candidate.subject.clone(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_events_with_is_pristine_condition_on_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidates = vec![ + create_test_eventcandidate("/test/42", json!({"value": 1})), + create_test_eventcandidate("/test/42", json!({"value": 1})), + ]; + let result = client + .write_events( + event_candidates.clone(), + vec![ + Precondition::IsSubjectPristine { + subject: event_candidates[1].subject.clone(), + }, + ], + ) + .await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let response = result.unwrap(); + assert_events_match_eventcandidates(&response, &event_candidates); +} + +#[tokio::test] +async fn write_events_with_is_pristine_condition_on_non_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let fill_event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + client + .write_events(vec![fill_event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event"); + let event_candidates = vec![ + create_test_eventcandidate("/test2", json!({"value": 1})), + fill_event_candidate.clone(), + ]; + let result = client + .write_events( + event_candidates, + vec![ + Precondition::IsSubjectPristine { + subject: fill_event_candidate.subject.clone(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_event_with_is_subject_on_event_id_condition_on_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidate = create_test_eventcandidate("/test/42", json!({"value": 1})); + let result = client + .write_events( + vec![event_candidate.clone()], + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: "100".to_string(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_event_with_is_subject_on_event_id_condition_on_non_empty_subject_correct_id() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + let written = client + .write_events(vec![event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event") + .pop() + .unwrap(); + let result = client + .write_events( + vec![event_candidate.clone()], + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: written.id().to_string(), + }, + ], + ) + .await; + assert!(result.is_ok(), "Writing the event failed: {:?}", result); +} + +#[tokio::test] +async fn write_event_with_is_subject_on_event_id_condition_on_non_empty_subject_wrong_id() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + client + .write_events(vec![event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event") + .pop() + .unwrap(); + let result = client + .write_events( + vec![event_candidate.clone()], + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidate.subject.clone(), + event_id: 100.to_string(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_events_with_is_subject_on_event_id_condition_on_empty_subject() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let event_candidates = vec![ + create_test_eventcandidate("/test/42", json!({"value": 1})), + create_test_eventcandidate("/test/42", json!({"value": 1})), + ]; + let result = client + .write_events( + event_candidates.clone(), + vec![ + Precondition::IsSubjectOnEventId { + subject: event_candidates[1].subject.clone(), + event_id: "100".to_string(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_events_with_is_subject_on_event_id_condition_on_non_empty_subject_correct_id() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let fill_event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + let written = client + .write_events(vec![fill_event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event") + .pop() + .unwrap(); + let event_candidates = vec![ + create_test_eventcandidate("/test2", json!({"value": 1})), + fill_event_candidate.clone(), + ]; + let result = client + .write_events( + event_candidates, + vec![ + Precondition::IsSubjectOnEventId { + subject: fill_event_candidate.subject.clone(), + event_id: written.id().to_string(), + }, + ], + ) + .await; + assert!(result.is_ok(), "Writing the events failed: {:?}", result); +} + +#[tokio::test] +async fn write_events_with_is_subject_on_event_id_condition_on_non_empty_subject_wrong_id() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let fill_event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + client + .write_events(vec![fill_event_candidate.clone()], vec![]) + .await + .expect("Failed to write initial event") + .pop() + .unwrap(); + let event_candidates = vec![ + create_test_eventcandidate("/test2", json!({"value": 1})), + fill_event_candidate.clone(), + ]; + let result = client + .write_events( + event_candidates, + vec![ + Precondition::IsSubjectOnEventId { + subject: fill_event_candidate.subject.clone(), + event_id: 100.to_string(), + }, + ], + ) + .await; + assert!(result.is_err(), "Expected an error, but got: {:?}", result); +} + +#[tokio::test] +async fn write_single_event_with_traceparent() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event = EventCandidate::builder() + .source("https://www.eventsourcingdb.io".to_string()) + .data(json!({"value": 1})) + .subject("/test".to_string()) + .r#type("io.eventsourcingdb.test".to_string()) + .traceinfo(TraceInfo::Traceparent { + traceparent: "00-01234567012345670123456701234567-0123456701234567-00".to_string(), + }) + .build(); + let result = client.write_events(vec![event.clone()], vec![]).await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let mut response = result.unwrap(); + assert_eq!(response.len(), 1, "Expected one event in the response"); + let response_event = response.pop().unwrap(); + + assert_event_match_eventcandidate(&response_event, &event, None, None); +} + +#[tokio::test] +async fn write_single_event_with_traceparent_and_state() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event = EventCandidate::builder() + .source("https://www.eventsourcingdb.io".to_string()) + .data(json!({"value": 1})) + .subject("/test".to_string()) + .r#type("io.eventsourcingdb.test".to_string()) + .traceinfo(TraceInfo::WithState { + traceparent: "00-01234567012345670123456701234567-0123456701234567-00".to_string(), + tracestate: "state=12345".to_string(), + }) + .build(); + let result = client.write_events(vec![event.clone()], vec![]).await; + assert!(result.is_ok(), "Failed to write events: {:?}", result); + let mut response = result.unwrap(); + assert_eq!(response.len(), 1, "Expected one event in the response"); + let response_event = response.pop().unwrap(); + + println!("Response event: {:?}", response_event); + + assert_event_match_eventcandidate(&response_event, &event, None, None); +} From ce372f714146b662d0972633dc7b0b49a5f2f1bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Tue, 13 May 2025 21:31:46 +0200 Subject: [PATCH 2/3] feat(client): rework write events based on forward looking learnings in #8 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Raphael HΓΆser --- src/client.rs | 33 +++++++-- src/client/client_request.rs | 72 ++++--------------- src/client/client_request/ping.rs | 24 +++++++ src/client/client_request/verify_api_token.rs | 24 +++++++ src/client/client_request/write_events.rs | 26 +++++++ src/error.rs | 2 +- 6 files changed, 116 insertions(+), 65 deletions(-) create mode 100644 src/client/client_request/ping.rs create mode 100644 src/client/client_request/verify_api_token.rs create mode 100644 src/client/client_request/write_events.rs diff --git a/src/client.rs b/src/client.rs index 92927a4..ead82e4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -20,7 +20,9 @@ mod client_request; mod precondition; -use client_request::{ClientRequest, PingRequest, VerifyApiTokenRequest, WriteEvents}; +use client_request::{ + ClientRequest, OneShotRequest, PingRequest, VerifyApiTokenRequest, WriteEventsRequest, +}; pub use precondition::Precondition; use reqwest; @@ -77,9 +79,14 @@ impl Client { /// Utility function to request an endpoint of the API. /// + /// This function will return a [`reqwest::RequestBuilder`] which can be used to send the request. + /// /// # Errors /// This function will return an error if the request fails or if the URL is invalid. - async fn request(&self, endpoint: R) -> Result { + fn build_request( + &self, + endpoint: &R, + ) -> Result { let url = self .base_url .join(endpoint.url_path()) @@ -98,15 +105,27 @@ impl Client { } else { request }; + Ok(request) + } - let response = request.send().await?; + /// Utility function to request an endpoint of the API as a oneshot. + /// + /// This means, that the response is not streamed, but returned as a single value. + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + async fn request_oneshot( + &self, + endpoint: R, + ) -> Result { + let response = self.build_request(&endpoint)?.send().await?; if response.status().is_success() { let result = response.json().await?; endpoint.validate_response(&result)?; Ok(result) } else { - Err(ClientError::DBError( + Err(ClientError::DBApiError( response.status(), response.text().await.unwrap_or_default(), )) @@ -130,7 +149,7 @@ impl Client { /// # Errors /// This function will return an error if the request fails or if the URL is invalid. pub async fn ping(&self) -> Result<(), ClientError> { - let _ = self.request(PingRequest).await?; + let _ = self.request_oneshot(PingRequest).await?; Ok(()) } @@ -151,7 +170,7 @@ impl Client { /// # Errors /// This function will return an error if the request fails or if the URL is invalid. pub async fn verify_api_token(&self) -> Result<(), ClientError> { - let _ = self.request(VerifyApiTokenRequest).await?; + let _ = self.request_oneshot(VerifyApiTokenRequest).await?; Ok(()) } @@ -186,7 +205,7 @@ impl Client { events: Vec, preconditions: Vec, ) -> Result, ClientError> { - self.request(WriteEvents { + self.request_oneshot(WriteEventsRequest { events, preconditions, }) diff --git a/src/client/client_request.rs b/src/client/client_request.rs index e6db97b..4f3311e 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -1,20 +1,21 @@ //! This is a purely internal module to represent client requests to the database. -use reqwest::Method; -use serde::Serialize; +mod ping; +mod verify_api_token; +mod write_events; -use crate::{ - error::ClientError, - event::{Event, EventCandidate, ManagementEvent}, -}; +pub use ping::PingRequest; +pub use verify_api_token::VerifyApiTokenRequest; +pub use write_events::WriteEventsRequest; -use super::precondition::Precondition; +use crate::error::ClientError; +use reqwest::Method; +use serde::{Serialize, de::DeserializeOwned}; /// Represents a request to the database client pub trait ClientRequest { const URL_PATH: &'static str; const METHOD: Method; - type Response: serde::de::DeserializeOwned; /// Returns the URL path for the request fn url_path(&self) -> &'static str { @@ -28,59 +29,16 @@ pub trait ClientRequest { /// Returns the body for the request fn body(&self) -> Option> { - None::> + None::> } +} + +/// Represents a request to the database that expects a single response +pub trait OneShotRequest: ClientRequest { + type Response: DeserializeOwned; /// Validate the response from the database fn validate_response(&self, _response: &Self::Response) -> Result<(), ClientError> { Ok(()) } } - -/// Ping the Database instance -#[derive(Debug, Clone, Copy)] -pub struct PingRequest; - -impl ClientRequest for PingRequest { - const URL_PATH: &'static str = "/api/v1/ping"; - const METHOD: Method = Method::GET; - type Response = ManagementEvent; - - fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { - (response.ty() == "io.eventsourcingdb.api.ping-received") - .then_some(()) - .ok_or(ClientError::PingFailed) - } -} - -/// Verify the API token -#[derive(Debug, Clone, Copy)] -pub struct VerifyApiTokenRequest; - -impl ClientRequest for VerifyApiTokenRequest { - const URL_PATH: &'static str = "/api/v1/verify-api-token"; - const METHOD: Method = Method::POST; - type Response = ManagementEvent; - - fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { - (response.ty() == "io.eventsourcingdb.api.api-token-verified") - .then_some(()) - .ok_or(ClientError::APITokenInvalid) - } -} - -#[derive(Debug, Serialize)] -pub struct WriteEvents { - pub events: Vec, - pub preconditions: Vec, -} - -impl ClientRequest for WriteEvents { - const URL_PATH: &'static str = "/api/v1/write-events"; - const METHOD: Method = Method::POST; - type Response = Vec; - - fn body(&self) -> Option> { - Some(Ok(self)) - } -} diff --git a/src/client/client_request/ping.rs b/src/client/client_request/ping.rs new file mode 100644 index 0000000..5eaedd5 --- /dev/null +++ b/src/client/client_request/ping.rs @@ -0,0 +1,24 @@ +use reqwest::Method; + +use crate::{error::ClientError, event::ManagementEvent}; + +use super::{ClientRequest, OneShotRequest}; + +/// Ping the Database instance +#[derive(Debug, Clone, Copy)] +pub struct PingRequest; + +impl ClientRequest for PingRequest { + const URL_PATH: &'static str = "/api/v1/ping"; + const METHOD: Method = Method::GET; +} + +impl OneShotRequest for PingRequest { + type Response = ManagementEvent; + + fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { + (response.ty() == "io.eventsourcingdb.api.ping-received") + .then_some(()) + .ok_or(ClientError::PingFailed) + } +} diff --git a/src/client/client_request/verify_api_token.rs b/src/client/client_request/verify_api_token.rs new file mode 100644 index 0000000..c1c015e --- /dev/null +++ b/src/client/client_request/verify_api_token.rs @@ -0,0 +1,24 @@ +use reqwest::Method; + +use crate::{error::ClientError, event::ManagementEvent}; + +use super::{ClientRequest, OneShotRequest}; + +/// Verify the API token +#[derive(Debug, Clone, Copy)] +pub struct VerifyApiTokenRequest; + +impl ClientRequest for VerifyApiTokenRequest { + const URL_PATH: &'static str = "/api/v1/verify-api-token"; + const METHOD: Method = Method::POST; +} + +impl OneShotRequest for VerifyApiTokenRequest { + type Response = ManagementEvent; + + fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> { + (response.ty() == "io.eventsourcingdb.api.api-token-verified") + .then_some(()) + .ok_or(ClientError::APITokenInvalid) + } +} diff --git a/src/client/client_request/write_events.rs b/src/client/client_request/write_events.rs new file mode 100644 index 0000000..db40863 --- /dev/null +++ b/src/client/client_request/write_events.rs @@ -0,0 +1,26 @@ +use super::{ClientRequest, OneShotRequest}; +use crate::{ + client::Precondition, + error::ClientError, + event::{Event, EventCandidate}, +}; +use reqwest::Method; +use serde::Serialize; + +#[derive(Debug, Serialize)] +pub struct WriteEventsRequest { + pub events: Vec, + pub preconditions: Vec, +} + +impl ClientRequest for WriteEventsRequest { + const URL_PATH: &'static str = "/api/v1/write-events"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} +impl OneShotRequest for WriteEventsRequest { + type Response = Vec; +} diff --git a/src/error.rs b/src/error.rs index 0b03859..749df80 100644 --- a/src/error.rs +++ b/src/error.rs @@ -26,7 +26,7 @@ pub enum ClientError { SerdeJsonError(#[from] serde_json::Error), /// The DB returned an error+ #[error("The DB returned an error: {0}")] - DBError(StatusCode, String), + DBApiError(StatusCode, String), /// There was a problem with the `cloudevents` message #[cfg(feature = "cloudevents")] #[error("The CloudEvents message is invalid: {0}")] From 2b3253b76ab34bbbb4bfff03d9d45cf931dd9ad1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Thu, 15 May 2025 11:10:04 +0200 Subject: [PATCH 3/3] chore(client): restructure event module to avoid allowing clippy warnings --- src/event.rs | 85 ++----------------- src/event/event_types.rs | 5 ++ src/event/{ => event_types}/event.rs | 3 +- .../{ => event_types}/event_candidate.rs | 2 +- .../{ => event_types}/management_event.rs | 0 src/event/trace_info.rs | 70 +++++++++++++++ 6 files changed, 86 insertions(+), 79 deletions(-) create mode 100644 src/event/event_types.rs rename src/event/{ => event_types}/event.rs (98%) rename src/event/{ => event_types}/event_candidate.rs (97%) rename src/event/{ => event_types}/management_event.rs (100%) create mode 100644 src/event/trace_info.rs diff --git a/src/event.rs b/src/event.rs index 7fc0dc1..f927015 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,83 +1,14 @@ //! This module holds all event types that are send between the client and the database. -// Allow module inception here, since "event" is the expected as the name for both modules. -// Renaming would be possible, but would probably lead to more confusion. -#[allow(clippy::module_inception)] -mod event; -mod event_candidate; -mod management_event; +mod event_types; +mod trace_info; -pub use event::Event; -pub use event_candidate::EventCandidate; -pub use management_event::ManagementEvent; -use serde::{Deserialize, Serialize}; +// Reexport relevant types to flatten the module graph for consumers and +// keep private encapsulation of implementation details. +pub use event_types::event::Event; +pub use event_types::event_candidate::EventCandidate; +pub use event_types::management_event::ManagementEvent; +pub use trace_info::TraceInfo; #[cfg(feature="cloudevents")] use crate::error::EventError; - -/// Represents the trace information of an event. -/// This is used for distributed tracing. -/// It can either be a traceparent or a traceparent and tracestate. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum TraceInfo { - // LEAVE ORDER AS IS - // This is important for deserialization as the traceparent is always present - - /// The traceparent and tracestate of the event. - /// This is used for distributed tracing. - WithState { - /// The traceparent of the event. - /// This is used for distributed tracing. - traceparent: String, - /// The tracestate of the event. - /// This is used for distributed tracing. - tracestate: String, - }, - /// The traceparent of the event. - /// This is used for distributed tracing. - Traceparent { - /// The traceparent of the event. - /// This is used for distributed tracing. - traceparent: String, - }, -} - -impl TraceInfo { - /// Get the traceparent of the event. - #[must_use] - pub fn traceparent(&self) -> &str { - match self { - Self::Traceparent { traceparent } | Self::WithState { traceparent, .. } => traceparent, - } - } - /// Get the tracestate of the event. - #[must_use] - pub fn tracestate(&self) -> Option<&str> { - match self { - Self::Traceparent { .. } => None, - Self::WithState { tracestate, .. } => Some(tracestate), - } - } - - /// Create a new `TraceInfo` from a cloudevent. - /// This will return None if the cloudevent does not contain a traceparent or tracestate. - /// - /// # Errors - /// If the cloudevent contains a tracestate but no traceparent, an error will be returned. - #[cfg(feature="cloudevents")] - pub fn from_cloudevent(event: &cloudevents::Event) -> Result, EventError> { - let traceparent = event.extension("traceparent").map(ToString::to_string); - let tracestate = event.extension("tracestate").map(ToString::to_string); - - match (traceparent, tracestate) { - (Some(traceparent), Some(tracestate)) => Ok(Some(Self::WithState { - traceparent, - tracestate, - })), - (Some(traceparent), None) => Ok(Some(Self::Traceparent { traceparent })), - (None, None) => Ok(None), - (None, Some(_)) => Err(EventError::InvalidCloudevent), - } - } -} diff --git a/src/event/event_types.rs b/src/event/event_types.rs new file mode 100644 index 0000000..02837c9 --- /dev/null +++ b/src/event/event_types.rs @@ -0,0 +1,5 @@ +//! This module holds all possible event types this sdk works with. + +pub mod event; +pub mod event_candidate; +pub mod management_event; diff --git a/src/event/event.rs b/src/event/event_types/event.rs similarity index 98% rename from src/event/event.rs rename to src/event/event_types/event.rs index 911f4f5..1c730ce 100644 --- a/src/event/event.rs +++ b/src/event/event_types/event.rs @@ -2,7 +2,8 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use super::{EventCandidate, TraceInfo}; +use crate::event::{trace_info::TraceInfo, EventCandidate}; + /// Represents an event that has been received from the DB. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/src/event/event_candidate.rs b/src/event/event_types/event_candidate.rs similarity index 97% rename from src/event/event_candidate.rs rename to src/event/event_types/event_candidate.rs index 33a6b5a..9d5947d 100644 --- a/src/event/event_candidate.rs +++ b/src/event/event_types/event_candidate.rs @@ -1,8 +1,8 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use typed_builder::TypedBuilder; +use crate::event::trace_info::TraceInfo; -use super::TraceInfo; #[cfg(feature = "cloudevents")] use crate::error::EventError; diff --git a/src/event/management_event.rs b/src/event/event_types/management_event.rs similarity index 100% rename from src/event/management_event.rs rename to src/event/event_types/management_event.rs diff --git a/src/event/trace_info.rs b/src/event/trace_info.rs new file mode 100644 index 0000000..7647423 --- /dev/null +++ b/src/event/trace_info.rs @@ -0,0 +1,70 @@ +//! This module holds supporting traits for the "Tracing" feature of eventsourcingdb. + +use serde::{Deserialize, Serialize}; + +/// Represents the trace information of an event. +/// This is used for distributed tracing. +/// It can either be a traceparent or a traceparent and tracestate. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum TraceInfo { + // LEAVE ORDER AS IS + // This is important for deserialization as the traceparent is always present + + /// The traceparent and tracestate of the event. + /// This is used for distributed tracing. + WithState { + /// The traceparent of the event. + /// This is used for distributed tracing. + traceparent: String, + /// The tracestate of the event. + /// This is used for distributed tracing. + tracestate: String, + }, + /// The traceparent of the event. + /// This is used for distributed tracing. + Traceparent { + /// The traceparent of the event. + /// This is used for distributed tracing. + traceparent: String, + }, +} + +impl TraceInfo { + /// Get the traceparent of the event. + #[must_use] + pub fn traceparent(&self) -> &str { + match self { + Self::Traceparent { traceparent } | Self::WithState { traceparent, .. } => traceparent, + } + } + /// Get the tracestate of the event. + #[must_use] + pub fn tracestate(&self) -> Option<&str> { + match self { + Self::Traceparent { .. } => None, + Self::WithState { tracestate, .. } => Some(tracestate), + } + } + + /// Create a new `TraceInfo` from a cloudevent. + /// This will return None if the cloudevent does not contain a traceparent or tracestate. + /// + /// # Errors + /// If the cloudevent contains a tracestate but no traceparent, an error will be returned. + #[cfg(feature="cloudevents")] + pub fn from_cloudevent(event: &cloudevents::Event) -> Result, EventError> { + let traceparent = event.extension("traceparent").map(ToString::to_string); + let tracestate = event.extension("tracestate").map(ToString::to_string); + + match (traceparent, tracestate) { + (Some(traceparent), Some(tracestate)) => Ok(Some(Self::WithState { + traceparent, + tracestate, + })), + (Some(traceparent), None) => Ok(Some(Self::Traceparent { traceparent })), + (None, None) => Ok(None), + (None, Some(_)) => Err(EventError::InvalidCloudevent), + } + } +}