Skip to content

Commit 11b4dc5

Browse files
committed
feat(client): add writing events to client adhering to the "Writing Events" compliance criteria
1 parent 96583d4 commit 11b4dc5

File tree

12 files changed

+799
-7
lines changed

12 files changed

+799
-7
lines changed

Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ testcontainers = { version = "0.24.0", features = [
1919
"http_wait",
2020
], optional = true }
2121
thiserror = "2.0.12"
22+
typed-builder = "0.21.0"
2223

2324
[dev-dependencies]
2425
eventsourcingdb-client-rust = { path = ".", features = ["testcontainer"] }

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ This is a work in progress and not yet ready for production use.
88
Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/) the SDK covers these criteria:
99

1010
- 🚀 [Essentials](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#essentials)
11-
- [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
11+
- 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
1212
-[Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
1313
-[Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
1414
-[Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)

src/client.rs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818
//! If this works, it means that the client is correctly configured and you can use it to make requests to the DB.
1919
2020
mod client_request;
21+
mod precondition;
2122

22-
use client_request::{ClientRequest, PingRequest, VerifyApiTokenRequest};
23+
use client_request::{ClientRequest, PingRequest, VerifyApiTokenRequest, WriteEvents};
2324

25+
pub use precondition::Precondition;
2426
use reqwest;
2527
use url::Url;
2628

27-
use crate::error::ClientError;
29+
use crate::{
30+
error::ClientError,
31+
event::{Event, EventCandidate},
32+
};
2833

2934
/// Client for an [EventsourcingDB](https://www.eventsourcingdb.io/) instance.
3035
#[derive(Debug)]
@@ -149,4 +154,42 @@ impl Client {
149154
let _ = self.request(VerifyApiTokenRequest).await?;
150155
Ok(())
151156
}
157+
158+
/// Writes events to the DB instance.
159+
///
160+
/// ```
161+
/// use eventsourcingdb_client_rust::event::EventCandidate;
162+
/// # use serde_json::json;
163+
/// # tokio_test::block_on(async {
164+
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
165+
/// let db_url = "http://localhost:3000/";
166+
/// let api_token = "secrettoken";
167+
/// # let db_url = container.get_base_url().await.unwrap();
168+
/// # let api_token = container.get_api_token();
169+
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
170+
/// let candidates = vec![
171+
/// EventCandidate::builder()
172+
/// .source("https://www.eventsourcingdb.io".to_string())
173+
/// .data(json!({"value": 1}))
174+
/// .subject("/test".to_string())
175+
/// .r#type("io.eventsourcingdb.test".to_string())
176+
/// .build()
177+
/// ];
178+
/// let written_events = client.write_events(candidates, vec![]).await.expect("Failed to write events");
179+
/// # })
180+
/// ```
181+
///
182+
/// # Errors
183+
/// This function will return an error if the request fails or if the URL is invalid.
184+
pub async fn write_events(
185+
&self,
186+
events: Vec<EventCandidate>,
187+
preconditions: Vec<Precondition>,
188+
) -> Result<Vec<Event>, ClientError> {
189+
self.request(WriteEvents {
190+
events,
191+
preconditions,
192+
})
193+
.await
194+
}
152195
}

src/client/client_request.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
//! This is a purely internal module to represent client requests to the database.
22
33
use reqwest::Method;
4-
use serde_json::Value;
4+
use serde::Serialize;
55

6-
use crate::{error::ClientError, event::ManagementEvent};
6+
use crate::{
7+
error::ClientError,
8+
event::{Event, EventCandidate, ManagementEvent},
9+
};
10+
11+
use super::precondition::Precondition;
712

813
/// Represents a request to the database client
914
pub trait ClientRequest {
@@ -22,8 +27,8 @@ pub trait ClientRequest {
2227
}
2328

2429
/// Returns the body for the request
25-
fn body(&self) -> Option<Result<Value, ClientError>> {
26-
None
30+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
31+
None::<Result<(), ClientError>>
2732
}
2833

2934
/// Validate the response from the database
@@ -63,3 +68,19 @@ impl ClientRequest for VerifyApiTokenRequest {
6368
.ok_or(ClientError::APITokenInvalid)
6469
}
6570
}
71+
72+
#[derive(Debug, Serialize)]
73+
pub struct WriteEvents {
74+
pub events: Vec<EventCandidate>,
75+
pub preconditions: Vec<Precondition>,
76+
}
77+
78+
impl ClientRequest for WriteEvents {
79+
const URL_PATH: &'static str = "/api/v1/write-events";
80+
const METHOD: Method = Method::POST;
81+
type Response = Vec<Event>;
82+
83+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
84+
Some(Ok(self))
85+
}
86+
}

src/client/precondition.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use serde::Serialize;
2+
3+
/// Enum for different preconditions that can be used when writing events
4+
#[derive(Debug, Serialize)]
5+
#[serde(tag = "type", content = "payload")]
6+
pub enum Precondition {
7+
/// Check if the subject with the given path has no other events
8+
#[serde(rename = "isSubjectPristine")]
9+
IsSubjectPristine {
10+
/// The subject to check
11+
subject: String,
12+
},
13+
/// Check if the subject with the given path has no other events
14+
#[serde(rename = "isSubjectOnEventId")]
15+
IsSubjectOnEventId {
16+
/// The subject to check
17+
subject: String,
18+
/// The event ID to check against
19+
#[serde(rename = "eventId")]
20+
event_id: String,
21+
},
22+
}

src/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,12 @@ pub enum ContainerError {
4646
#[error("URL parsing error: {0}")]
4747
URLParseError(#[from] url::ParseError),
4848
}
49+
50+
/// Error type for the event
51+
#[derive(Debug, thiserror::Error)]
52+
pub enum EventError {
53+
/// The passed cloudevent is invalid
54+
#[cfg(feature = "cloudevents")]
55+
#[error("The passed cloudevent is invalid")]
56+
InvalidCloudevent,
57+
}

src/event.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,83 @@
11
//! This module holds all event types that are send between the client and the database.
22
3+
// Allow module inception here, since "event" is the expected as the name for both modules.
4+
// Renaming would be possible, but would probably lead to more confusion.
5+
#[allow(clippy::module_inception)]
6+
mod event;
7+
mod event_candidate;
38
mod management_event;
49

10+
pub use event::Event;
11+
pub use event_candidate::EventCandidate;
512
pub use management_event::ManagementEvent;
13+
use serde::{Deserialize, Serialize};
14+
15+
#[cfg(feature="cloudevents")]
16+
use crate::error::EventError;
17+
18+
/// Represents the trace information of an event.
19+
/// This is used for distributed tracing.
20+
/// It can either be a traceparent or a traceparent and tracestate.
21+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22+
#[serde(untagged)]
23+
pub enum TraceInfo {
24+
// LEAVE ORDER AS IS
25+
// This is important for deserialization as the traceparent is always present
26+
27+
/// The traceparent and tracestate of the event.
28+
/// This is used for distributed tracing.
29+
WithState {
30+
/// The traceparent of the event.
31+
/// This is used for distributed tracing.
32+
traceparent: String,
33+
/// The tracestate of the event.
34+
/// This is used for distributed tracing.
35+
tracestate: String,
36+
},
37+
/// The traceparent of the event.
38+
/// This is used for distributed tracing.
39+
Traceparent {
40+
/// The traceparent of the event.
41+
/// This is used for distributed tracing.
42+
traceparent: String,
43+
},
44+
}
45+
46+
impl TraceInfo {
47+
/// Get the traceparent of the event.
48+
#[must_use]
49+
pub fn traceparent(&self) -> &str {
50+
match self {
51+
Self::Traceparent { traceparent } | Self::WithState { traceparent, .. } => traceparent,
52+
}
53+
}
54+
/// Get the tracestate of the event.
55+
#[must_use]
56+
pub fn tracestate(&self) -> Option<&str> {
57+
match self {
58+
Self::Traceparent { .. } => None,
59+
Self::WithState { tracestate, .. } => Some(tracestate),
60+
}
61+
}
62+
63+
/// Create a new `TraceInfo` from a cloudevent.
64+
/// This will return None if the cloudevent does not contain a traceparent or tracestate.
65+
///
66+
/// # Errors
67+
/// If the cloudevent contains a tracestate but no traceparent, an error will be returned.
68+
#[cfg(feature="cloudevents")]
69+
pub fn from_cloudevent(event: &cloudevents::Event) -> Result<Option<Self>, EventError> {
70+
let traceparent = event.extension("traceparent").map(ToString::to_string);
71+
let tracestate = event.extension("tracestate").map(ToString::to_string);
72+
73+
match (traceparent, tracestate) {
74+
(Some(traceparent), Some(tracestate)) => Ok(Some(Self::WithState {
75+
traceparent,
76+
tracestate,
77+
})),
78+
(Some(traceparent), None) => Ok(Some(Self::Traceparent { traceparent })),
79+
(None, None) => Ok(None),
80+
(None, Some(_)) => Err(EventError::InvalidCloudevent),
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)