Skip to content

Commit e521068

Browse files
feat(client): add writing events to client adhering to the "Writing Events" compliance criteria (#7)
* feat(client): add writing events to client adhering to the "Writing Events" compliance criteria * feat(client): rework write events based on forward looking learnings in #8 Signed-off-by: Raphael Höser <[email protected]> * chore(client): restructure event module to avoid allowing clippy warnings --------- Signed-off-by: Raphael Höser <[email protected]>
1 parent 96583d4 commit e521068

18 files changed

+899
-49
lines changed

Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ testcontainers = { version = "0.24.0", features = [
1919
"http_wait",
2020
], optional = true }
2121
thiserror = "2.0.12"
22+
typed-builder = "0.21.0"
2223

2324
[dev-dependencies]
2425
eventsourcingdb-client-rust = { path = ".", features = ["testcontainer"] }

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ This is a work in progress and not yet ready for production use.
88
Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/) the SDK covers these criteria:
99

1010
- 🚀 [Essentials](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#essentials)
11-
- [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
11+
- 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
1212
-[Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
1313
-[Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
1414
-[Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)

src/client.rs

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,20 @@
1818
//! If this works, it means that the client is correctly configured and you can use it to make requests to the DB.
1919
2020
mod client_request;
21+
mod precondition;
2122

22-
use client_request::{ClientRequest, PingRequest, VerifyApiTokenRequest};
23+
use client_request::{
24+
ClientRequest, OneShotRequest, PingRequest, VerifyApiTokenRequest, WriteEventsRequest,
25+
};
2326

27+
pub use precondition::Precondition;
2428
use reqwest;
2529
use url::Url;
2630

27-
use crate::error::ClientError;
31+
use crate::{
32+
error::ClientError,
33+
event::{Event, EventCandidate},
34+
};
2835

2936
/// Client for an [EventsourcingDB](https://www.eventsourcingdb.io/) instance.
3037
#[derive(Debug)]
@@ -72,9 +79,14 @@ impl Client {
7279

7380
/// Utility function to request an endpoint of the API.
7481
///
82+
/// This function will return a [`reqwest::RequestBuilder`] which can be used to send the request.
83+
///
7584
/// # Errors
7685
/// This function will return an error if the request fails or if the URL is invalid.
77-
async fn request<R: ClientRequest>(&self, endpoint: R) -> Result<R::Response, ClientError> {
86+
fn build_request<R: ClientRequest>(
87+
&self,
88+
endpoint: &R,
89+
) -> Result<reqwest::RequestBuilder, ClientError> {
7890
let url = self
7991
.base_url
8092
.join(endpoint.url_path())
@@ -93,15 +105,27 @@ impl Client {
93105
} else {
94106
request
95107
};
108+
Ok(request)
109+
}
96110

97-
let response = request.send().await?;
111+
/// Utility function to request an endpoint of the API as a oneshot.
112+
///
113+
/// This means, that the response is not streamed, but returned as a single value.
114+
///
115+
/// # Errors
116+
/// This function will return an error if the request fails or if the URL is invalid.
117+
async fn request_oneshot<R: OneShotRequest>(
118+
&self,
119+
endpoint: R,
120+
) -> Result<R::Response, ClientError> {
121+
let response = self.build_request(&endpoint)?.send().await?;
98122

99123
if response.status().is_success() {
100124
let result = response.json().await?;
101125
endpoint.validate_response(&result)?;
102126
Ok(result)
103127
} else {
104-
Err(ClientError::DBError(
128+
Err(ClientError::DBApiError(
105129
response.status(),
106130
response.text().await.unwrap_or_default(),
107131
))
@@ -125,7 +149,7 @@ impl Client {
125149
/// # Errors
126150
/// This function will return an error if the request fails or if the URL is invalid.
127151
pub async fn ping(&self) -> Result<(), ClientError> {
128-
let _ = self.request(PingRequest).await?;
152+
let _ = self.request_oneshot(PingRequest).await?;
129153
Ok(())
130154
}
131155

@@ -146,7 +170,45 @@ impl Client {
146170
/// # Errors
147171
/// This function will return an error if the request fails or if the URL is invalid.
148172
pub async fn verify_api_token(&self) -> Result<(), ClientError> {
149-
let _ = self.request(VerifyApiTokenRequest).await?;
173+
let _ = self.request_oneshot(VerifyApiTokenRequest).await?;
150174
Ok(())
151175
}
176+
177+
/// Writes events to the DB instance.
178+
///
179+
/// ```
180+
/// use eventsourcingdb_client_rust::event::EventCandidate;
181+
/// # use serde_json::json;
182+
/// # tokio_test::block_on(async {
183+
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
184+
/// let db_url = "http://localhost:3000/";
185+
/// let api_token = "secrettoken";
186+
/// # let db_url = container.get_base_url().await.unwrap();
187+
/// # let api_token = container.get_api_token();
188+
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
189+
/// let candidates = vec![
190+
/// EventCandidate::builder()
191+
/// .source("https://www.eventsourcingdb.io".to_string())
192+
/// .data(json!({"value": 1}))
193+
/// .subject("/test".to_string())
194+
/// .r#type("io.eventsourcingdb.test".to_string())
195+
/// .build()
196+
/// ];
197+
/// let written_events = client.write_events(candidates, vec![]).await.expect("Failed to write events");
198+
/// # })
199+
/// ```
200+
///
201+
/// # Errors
202+
/// This function will return an error if the request fails or if the URL is invalid.
203+
pub async fn write_events(
204+
&self,
205+
events: Vec<EventCandidate>,
206+
preconditions: Vec<Precondition>,
207+
) -> Result<Vec<Event>, ClientError> {
208+
self.request_oneshot(WriteEventsRequest {
209+
events,
210+
preconditions,
211+
})
212+
.await
213+
}
152214
}

src/client/client_request.rs

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
//! This is a purely internal module to represent client requests to the database.
22
3-
use reqwest::Method;
4-
use serde_json::Value;
3+
mod ping;
4+
mod verify_api_token;
5+
mod write_events;
6+
7+
pub use ping::PingRequest;
8+
pub use verify_api_token::VerifyApiTokenRequest;
9+
pub use write_events::WriteEventsRequest;
510

6-
use crate::{error::ClientError, event::ManagementEvent};
11+
use crate::error::ClientError;
12+
use reqwest::Method;
13+
use serde::{Serialize, de::DeserializeOwned};
714

815
/// Represents a request to the database client
916
pub trait ClientRequest {
1017
const URL_PATH: &'static str;
1118
const METHOD: Method;
12-
type Response: serde::de::DeserializeOwned;
1319

1420
/// Returns the URL path for the request
1521
fn url_path(&self) -> &'static str {
@@ -22,44 +28,17 @@ pub trait ClientRequest {
2228
}
2329

2430
/// Returns the body for the request
25-
fn body(&self) -> Option<Result<Value, ClientError>> {
26-
None
31+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
32+
None::<Result<(), _>>
2733
}
34+
}
35+
36+
/// Represents a request to the database that expects a single response
37+
pub trait OneShotRequest: ClientRequest {
38+
type Response: DeserializeOwned;
2839

2940
/// Validate the response from the database
3041
fn validate_response(&self, _response: &Self::Response) -> Result<(), ClientError> {
3142
Ok(())
3243
}
3344
}
34-
35-
/// Ping the Database instance
36-
#[derive(Debug, Clone, Copy)]
37-
pub struct PingRequest;
38-
39-
impl ClientRequest for PingRequest {
40-
const URL_PATH: &'static str = "/api/v1/ping";
41-
const METHOD: Method = Method::GET;
42-
type Response = ManagementEvent;
43-
44-
fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> {
45-
(response.ty() == "io.eventsourcingdb.api.ping-received")
46-
.then_some(())
47-
.ok_or(ClientError::PingFailed)
48-
}
49-
}
50-
51-
/// Verify the API token
52-
#[derive(Debug, Clone, Copy)]
53-
pub struct VerifyApiTokenRequest;
54-
55-
impl ClientRequest for VerifyApiTokenRequest {
56-
const URL_PATH: &'static str = "/api/v1/verify-api-token";
57-
const METHOD: Method = Method::POST;
58-
type Response = ManagementEvent;
59-
60-
fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> {
61-
(response.ty() == "io.eventsourcingdb.api.api-token-verified")
62-
.then_some(())
63-
.ok_or(ClientError::APITokenInvalid)
64-
}
65-
}

src/client/client_request/ping.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use reqwest::Method;
2+
3+
use crate::{error::ClientError, event::ManagementEvent};
4+
5+
use super::{ClientRequest, OneShotRequest};
6+
7+
/// Ping the Database instance
8+
#[derive(Debug, Clone, Copy)]
9+
pub struct PingRequest;
10+
11+
impl ClientRequest for PingRequest {
12+
const URL_PATH: &'static str = "/api/v1/ping";
13+
const METHOD: Method = Method::GET;
14+
}
15+
16+
impl OneShotRequest for PingRequest {
17+
type Response = ManagementEvent;
18+
19+
fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> {
20+
(response.ty() == "io.eventsourcingdb.api.ping-received")
21+
.then_some(())
22+
.ok_or(ClientError::PingFailed)
23+
}
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use reqwest::Method;
2+
3+
use crate::{error::ClientError, event::ManagementEvent};
4+
5+
use super::{ClientRequest, OneShotRequest};
6+
7+
/// Verify the API token
8+
#[derive(Debug, Clone, Copy)]
9+
pub struct VerifyApiTokenRequest;
10+
11+
impl ClientRequest for VerifyApiTokenRequest {
12+
const URL_PATH: &'static str = "/api/v1/verify-api-token";
13+
const METHOD: Method = Method::POST;
14+
}
15+
16+
impl OneShotRequest for VerifyApiTokenRequest {
17+
type Response = ManagementEvent;
18+
19+
fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> {
20+
(response.ty() == "io.eventsourcingdb.api.api-token-verified")
21+
.then_some(())
22+
.ok_or(ClientError::APITokenInvalid)
23+
}
24+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use super::{ClientRequest, OneShotRequest};
2+
use crate::{
3+
client::Precondition,
4+
error::ClientError,
5+
event::{Event, EventCandidate},
6+
};
7+
use reqwest::Method;
8+
use serde::Serialize;
9+
10+
#[derive(Debug, Serialize)]
11+
pub struct WriteEventsRequest {
12+
pub events: Vec<EventCandidate>,
13+
pub preconditions: Vec<Precondition>,
14+
}
15+
16+
impl ClientRequest for WriteEventsRequest {
17+
const URL_PATH: &'static str = "/api/v1/write-events";
18+
const METHOD: Method = Method::POST;
19+
20+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
21+
Some(Ok(self))
22+
}
23+
}
24+
impl OneShotRequest for WriteEventsRequest {
25+
type Response = Vec<Event>;
26+
}

src/client/precondition.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use serde::Serialize;
2+
3+
/// Enum for different preconditions that can be used when writing events
4+
#[derive(Debug, Serialize)]
5+
#[serde(tag = "type", content = "payload")]
6+
pub enum Precondition {
7+
/// Check if the subject with the given path has no other events
8+
#[serde(rename = "isSubjectPristine")]
9+
IsSubjectPristine {
10+
/// The subject to check
11+
subject: String,
12+
},
13+
/// Check if the subject with the given path has no other events
14+
#[serde(rename = "isSubjectOnEventId")]
15+
IsSubjectOnEventId {
16+
/// The subject to check
17+
subject: String,
18+
/// The event ID to check against
19+
#[serde(rename = "eventId")]
20+
event_id: String,
21+
},
22+
}

src/error.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub enum ClientError {
2626
SerdeJsonError(#[from] serde_json::Error),
2727
/// The DB returned an error+
2828
#[error("The DB returned an error: {0}")]
29-
DBError(StatusCode, String),
29+
DBApiError(StatusCode, String),
3030
/// There was a problem with the `cloudevents` message
3131
#[cfg(feature = "cloudevents")]
3232
#[error("The CloudEvents message is invalid: {0}")]
@@ -46,3 +46,12 @@ pub enum ContainerError {
4646
#[error("URL parsing error: {0}")]
4747
URLParseError(#[from] url::ParseError),
4848
}
49+
50+
/// Error type for the event
51+
#[derive(Debug, thiserror::Error)]
52+
pub enum EventError {
53+
/// The passed cloudevent is invalid
54+
#[cfg(feature = "cloudevents")]
55+
#[error("The passed cloudevent is invalid")]
56+
InvalidCloudevent,
57+
}

0 commit comments

Comments
 (0)