Skip to content

Commit 6cab303

Browse files
committed
feat(client): add metadata and discovery compliance matching
This implementation is incomplete and lacking tests Signed-off-by: Raphael Höser <[email protected]>
1 parent 96583d4 commit 6cab303

File tree

12 files changed

+660
-43
lines changed

12 files changed

+660
-43
lines changed

Cargo.lock

Lines changed: 269 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,19 @@ testcontainer = ["dep:testcontainers"]
1212
chrono = { version = "0.4.41", features = ["serde"] }
1313
cloudevents-sdk = { version = "0.8.0", features = ["reqwest"], optional = true }
1414
url = "2.5.4"
15-
reqwest = { version = "0.12.15", features = ["json"] }
15+
reqwest = { version = "0.12.15", features = ["json", "stream"] }
1616
serde = { version = "1.0.219", features = ["derive"] }
1717
serde_json = "1.0.140"
1818
testcontainers = { version = "0.24.0", features = [
1919
"http_wait",
2020
], optional = true }
2121
thiserror = "2.0.12"
22+
jsonschema = "0.30.0"
23+
futures-util = "0.3.31"
24+
tokio-util = { version = "0.7.15", features = ["io"] }
25+
tokio-stream = { version = "0.1.17", features = ["io-util"] }
26+
futures = "0.3.31"
27+
tokio = { version = "1.44.2", features = ["io-util"] }
2228

2329
[dev-dependencies]
2430
eventsourcingdb-client-rust = { path = ".", features = ["testcontainer"] }

src/client.rs

Lines changed: 87 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919
2020
mod client_request;
2121

22-
use client_request::{ClientRequest, PingRequest, VerifyApiTokenRequest};
22+
use client_request::{
23+
list_event_types::EventType, ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest
24+
};
2325

26+
use futures::Stream;
2427
use reqwest;
2528
use url::Url;
2629

27-
use crate::error::ClientError;
30+
use crate::{error::ClientError, event::ManagementEvent};
2831

2932
/// Client for an [EventsourcingDB](https://www.eventsourcingdb.io/) instance.
3033
#[derive(Debug)]
@@ -72,9 +75,14 @@ impl Client {
7275

7376
/// Utility function to request an endpoint of the API.
7477
///
78+
/// This function will return a [`reqwest::RequestBuilder`] which can be used to send the request.
79+
///
7580
/// # Errors
7681
/// This function will return an error if the request fails or if the URL is invalid.
77-
async fn request<R: ClientRequest>(&self, endpoint: R) -> Result<R::Response, ClientError> {
82+
fn build_request<R: ClientRequest>(
83+
&self,
84+
endpoint: &R,
85+
) -> Result<reqwest::RequestBuilder, ClientError> {
7886
let url = self
7987
.base_url
8088
.join(endpoint.url_path())
@@ -93,15 +101,49 @@ impl Client {
93101
} else {
94102
request
95103
};
104+
Ok(request)
105+
}
96106

97-
let response = request.send().await?;
107+
/// Utility function to request an endpoint of the API as a oneshot.
108+
///
109+
/// This means, that the response is not streamed, but returned as a single value.
110+
///
111+
/// # Errors
112+
/// This function will return an error if the request fails or if the URL is invalid.
113+
async fn request_oneshot<R: OneShotRequest>(
114+
&self,
115+
endpoint: R,
116+
) -> Result<R::Response, ClientError> {
117+
let response = self.build_request(&endpoint)?.send().await?;
98118

99119
if response.status().is_success() {
100120
let result = response.json().await?;
101121
endpoint.validate_response(&result)?;
102122
Ok(result)
103123
} else {
104-
Err(ClientError::DBError(
124+
Err(ClientError::DBApiError(
125+
response.status(),
126+
response.text().await.unwrap_or_default(),
127+
))
128+
}
129+
}
130+
131+
/// Utility function to request an endpoint of the API as a stream.
132+
///
133+
/// This means, that the response is streamed and returned as a stream of values.
134+
///
135+
/// # Errors
136+
/// This function will return an error if the request fails or if the URL is invalid.
137+
async fn request_streaming<R: StreamingRequest>(
138+
&self,
139+
endpoint: R,
140+
) -> Result<impl Stream<Item = Result<R::ItemType, ClientError>>, ClientError> {
141+
let response = self.build_request(&endpoint)?.send().await?;
142+
143+
if response.status().is_success() {
144+
Ok(endpoint.build_stream(response))
145+
} else {
146+
Err(ClientError::DBApiError(
105147
response.status(),
106148
response.text().await.unwrap_or_default(),
107149
))
@@ -125,7 +167,7 @@ impl Client {
125167
/// # Errors
126168
/// This function will return an error if the request fails or if the URL is invalid.
127169
pub async fn ping(&self) -> Result<(), ClientError> {
128-
let _ = self.request(PingRequest).await?;
170+
let _ = self.request_oneshot(PingRequest).await?;
129171
Ok(())
130172
}
131173

@@ -146,7 +188,45 @@ impl Client {
146188
/// # Errors
147189
/// This function will return an error if the request fails or if the URL is invalid.
148190
pub async fn verify_api_token(&self) -> Result<(), ClientError> {
149-
let _ = self.request(VerifyApiTokenRequest).await?;
191+
let _ = self.request_oneshot(VerifyApiTokenRequest).await?;
150192
Ok(())
151193
}
194+
195+
/// Registers an event schema with the DB instance.
196+
///
197+
/// # Errors
198+
/// This function will return an error if the request fails or if the provided schema is invalid.
199+
pub async fn register_event_schema(
200+
&self,
201+
event_type: &str,
202+
schema: &serde_json::Value,
203+
) -> Result<ManagementEvent, ClientError> {
204+
self.request_oneshot(RegisterEventSchemaRequest::try_new(event_type, schema)?)
205+
.await
206+
}
207+
208+
/// List all subjects in the DB instance.
209+
///
210+
/// # Errors
211+
/// This function will return an error if the request fails or if the URL is invalid.
212+
pub async fn list_subjects(
213+
&self,
214+
base_subject: Option<&str>,
215+
) -> Result<impl Stream<Item = Result<String, ClientError>>, ClientError> {
216+
let response = self
217+
.request_streaming(ListSubjectsRequest { base_subject })
218+
.await?;
219+
Ok(response)
220+
}
221+
222+
/// List all event types in the DB instance.
223+
///
224+
/// # Errors
225+
/// This function will return an error if the request fails or if the URL is invalid.
226+
pub async fn list_event_types(
227+
&self,
228+
) -> Result<impl Stream<Item = Result<EventType, ClientError>>, ClientError> {
229+
let response = self.request_streaming(ListEventTypesRequest).await?;
230+
Ok(response)
231+
}
152232
}

src/client/client_request.rs

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,31 @@
11
//! This is a purely internal module to represent client requests to the database.
22
3-
use reqwest::Method;
4-
use serde_json::Value;
3+
pub mod list_event_types;
4+
mod list_subjects;
5+
mod ping;
6+
mod register_event_schema;
7+
mod verify_api_token;
8+
9+
pub use list_event_types::ListEventTypesRequest;
10+
pub use list_subjects::ListSubjectsRequest;
11+
pub use ping::PingRequest;
12+
pub use register_event_schema::RegisterEventSchemaRequest;
13+
pub use verify_api_token::VerifyApiTokenRequest;
514

6-
use crate::{error::ClientError, event::ManagementEvent};
15+
use crate::error::ClientError;
16+
use futures::{Stream, stream::TryStreamExt};
17+
use futures_util::io;
18+
use reqwest::Method;
19+
use serde::Serialize;
20+
use serde::de::DeserializeOwned;
21+
use tokio::io::{AsyncBufReadExt, BufReader};
22+
use tokio_stream::wrappers::LinesStream;
23+
use tokio_util::io::StreamReader;
724

825
/// Represents a request to the database client
926
pub trait ClientRequest {
1027
const URL_PATH: &'static str;
1128
const METHOD: Method;
12-
type Response: serde::de::DeserializeOwned;
1329

1430
/// Returns the URL path for the request
1531
fn url_path(&self) -> &'static str {
@@ -22,44 +38,40 @@ pub trait ClientRequest {
2238
}
2339

2440
/// Returns the body for the request
25-
fn body(&self) -> Option<Result<Value, ClientError>> {
26-
None
41+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
42+
None::<Result<(), _>>
2743
}
44+
}
45+
46+
/// Represents a request to the database that expects a single response
47+
pub trait OneShotRequest: ClientRequest {
48+
type Response: DeserializeOwned;
2849

2950
/// Validate the response from the database
3051
fn validate_response(&self, _response: &Self::Response) -> Result<(), ClientError> {
3152
Ok(())
3253
}
3354
}
3455

35-
/// Ping the Database instance
36-
#[derive(Debug, Clone, Copy)]
37-
pub struct PingRequest;
38-
39-
impl ClientRequest for PingRequest {
40-
const URL_PATH: &'static str = "/api/v1/ping";
41-
const METHOD: Method = Method::GET;
42-
type Response = ManagementEvent;
43-
44-
fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> {
45-
(response.ty() == "io.eventsourcingdb.api.ping-received")
46-
.then_some(())
47-
.ok_or(ClientError::PingFailed)
48-
}
49-
}
50-
51-
/// Verify the API token
52-
#[derive(Debug, Clone, Copy)]
53-
pub struct VerifyApiTokenRequest;
56+
/// Represents a request to the database that expects a stream of responses
57+
pub trait StreamingRequest: ClientRequest {
58+
type ItemType: DeserializeOwned;
5459

55-
impl ClientRequest for VerifyApiTokenRequest {
56-
const URL_PATH: &'static str = "/api/v1/verify-api-token";
57-
const METHOD: Method = Method::POST;
58-
type Response = ManagementEvent;
60+
fn build_stream(
61+
self,
62+
response: reqwest::Response,
63+
) -> impl Stream<Item = Result<Self::ItemType, ClientError>>;
5964

60-
fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> {
61-
(response.ty() == "io.eventsourcingdb.api.api-token-verified")
62-
.then_some(())
63-
.ok_or(ClientError::APITokenInvalid)
65+
fn lines_stream(
66+
response: reqwest::Response,
67+
) -> impl Stream<Item = Result<String, ClientError>> {
68+
let bytes = response.bytes_stream().map_err(|err| {
69+
io::Error::new(
70+
io::ErrorKind::Other,
71+
format!("Failed to read response stream: {err}"),
72+
)
73+
});
74+
let stream_reader = StreamReader::new(bytes);
75+
LinesStream::new(BufReader::new(stream_reader).lines()).map_err(ClientError::from)
6476
}
6577
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use futures::{Stream, stream::StreamExt};
2+
use reqwest::Method;
3+
use serde::{Deserialize, Serialize};
4+
use serde_json::Value;
5+
6+
use crate::error::ClientError;
7+
8+
use super::{ClientRequest, StreamingRequest};
9+
#[derive(Deserialize, Debug)]
10+
pub struct EventType {
11+
pub name: String,
12+
pub is_phantom: bool,
13+
pub schema: Option<Value>,
14+
}
15+
16+
#[derive(Debug, Clone, Serialize)]
17+
#[serde(rename_all = "camelCase")]
18+
pub struct ListEventTypesRequest;
19+
20+
impl ClientRequest for ListEventTypesRequest {
21+
const URL_PATH: &'static str = "/api/v1/read-event-types";
22+
const METHOD: Method = Method::POST;
23+
24+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
25+
Some(Ok(self))
26+
}
27+
}
28+
impl StreamingRequest for ListEventTypesRequest {
29+
type ItemType = EventType;
30+
31+
fn build_stream(
32+
self,
33+
response: reqwest::Response,
34+
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
35+
#[derive(Deserialize, Debug)]
36+
enum LineItem {
37+
Error(String),
38+
EventType(EventType),
39+
}
40+
41+
impl From<LineItem> for Result<EventType, ClientError> {
42+
fn from(item: LineItem) -> Self {
43+
match item {
44+
LineItem::Error(err) => Err(ClientError::DBError(err)),
45+
LineItem::EventType(event_type) => Ok(event_type),
46+
}
47+
}
48+
}
49+
50+
Self::lines_stream(response).map(|line| {
51+
let line = line?;
52+
let item: LineItem = serde_json::from_str(line.as_str())?;
53+
item.into()
54+
})
55+
}
56+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use futures::{Stream, stream::StreamExt};
2+
use reqwest::Method;
3+
use serde::{Deserialize, Serialize};
4+
5+
use crate::error::ClientError;
6+
7+
use super::{ClientRequest, StreamingRequest};
8+
9+
#[derive(Debug, Clone, Serialize)]
10+
#[serde(rename_all = "camelCase")]
11+
pub struct ListSubjectsRequest<'a> {
12+
pub base_subject: Option<&'a str>,
13+
}
14+
15+
impl<'a> ClientRequest for ListSubjectsRequest<'a> {
16+
const URL_PATH: &'static str = "/api/v1/read-subjects";
17+
const METHOD: Method = Method::POST;
18+
19+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
20+
Some(Ok(self))
21+
}
22+
}
23+
impl<'a> StreamingRequest for ListSubjectsRequest<'a> {
24+
type ItemType = String;
25+
26+
fn build_stream(
27+
self,
28+
response: reqwest::Response,
29+
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
30+
#[derive(Deserialize, Debug)]
31+
struct LineItem {
32+
payload: LineItemPayload,
33+
r#type: String,
34+
}
35+
#[derive(Deserialize, Debug)]
36+
struct LineItemPayload {
37+
subject: String,
38+
}
39+
Self::lines_stream(response).map(|line| {
40+
let line = line?;
41+
let item: LineItem = serde_json::from_str(line.as_str())?;
42+
if item.r#type != "subject" {
43+
return Err(ClientError::InvalidEventType);
44+
}
45+
Ok(item.payload.subject)
46+
})
47+
}
48+
}

src/client/client_request/ping.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use reqwest::Method;
2+
3+
use crate::{error::ClientError, event::ManagementEvent};
4+
5+
use super::{ClientRequest, OneShotRequest};
6+
7+
/// Ping the Database instance
8+
#[derive(Debug, Clone, Copy)]
9+
pub struct PingRequest;
10+
11+
impl ClientRequest for PingRequest {
12+
const URL_PATH: &'static str = "/api/v1/ping";
13+
const METHOD: Method = Method::GET;
14+
}
15+
16+
impl OneShotRequest for PingRequest {
17+
type Response = ManagementEvent;
18+
19+
fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> {
20+
(response.ty() == "io.eventsourcingdb.api.ping-received")
21+
.then_some(())
22+
.ok_or(ClientError::PingFailed)
23+
}
24+
}

0 commit comments

Comments
 (0)