Skip to content

Commit 19d1a0e

Browse files
feat(client): add metadata and discovery compliance matching (#8)
* feat(client): add metadata and discovery compliance matching This implementation is incomplete and lacking tests Signed-off-by: Raphael Höser <[email protected]> * fix(client): correct default for listing subjects Signed-off-by: Raphael Höser <[email protected]> * chore(client): run cargo fmt * chore(client): further formatting and added documentation * chore(formatting): undo some formatting to make PR smaller Signed-off-by: Raphael Höser <[email protected]> * chore(linting): add reasoning to allow clippy::needless_lifetimes Signed-off-by: Raphael Höser <[email protected]> * chore(lints): fix lints and formatting --------- Signed-off-by: Raphael Höser <[email protected]> Co-authored-by: Kuro <[email protected]>
1 parent 0f3ab07 commit 19d1a0e

File tree

12 files changed

+760
-24
lines changed

12 files changed

+760
-24
lines changed

Cargo.lock

Lines changed: 269 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,21 @@ testcontainer = ["dep:testcontainers"]
1111
[dependencies]
1212
chrono = { version = "0.4.41", features = ["serde"] }
1313
cloudevents-sdk = { version = "0.8.0", features = ["reqwest"], optional = true }
14-
url = "2.5.4"
15-
reqwest = { version = "0.12.15", features = ["json"] }
14+
futures = "0.3.31"
15+
futures-util = "0.3.31"
16+
jsonschema = "0.30.0"
17+
reqwest = { version = "0.12.15", features = ["json", "stream"] }
1618
serde = { version = "1.0.219", features = ["derive"] }
1719
serde_json = "1.0.140"
1820
testcontainers = { version = "0.24.0", features = [
1921
"http_wait",
2022
], optional = true }
2123
thiserror = "2.0.12"
24+
tokio = { version = "1.44.2", features = ["io-util"] }
25+
tokio-util = { version = "0.7.15", features = ["io"] }
26+
tokio-stream = { version = "0.1.17", features = ["io-util"] }
2227
typed-builder = "0.21.0"
28+
url = "2.5.4"
2329

2430
[dev-dependencies]
2531
eventsourcingdb-client-rust = { path = ".", features = ["testcontainer"] }

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c
1212
-[Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
1313
-[Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
1414
-[Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
15-
- [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)
15+
- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)
1616
- 🚀 [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support)

src/client.rs

Lines changed: 158 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,26 @@
2020
mod client_request;
2121
mod precondition;
2222

23+
use crate::{
24+
error::ClientError,
25+
event::{Event, EventCandidate, ManagementEvent},
26+
};
2327
use client_request::{
24-
ClientRequest, OneShotRequest, PingRequest, VerifyApiTokenRequest, WriteEventsRequest,
28+
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest,
29+
RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest,
30+
list_event_types::EventType,
2531
};
26-
32+
use futures::Stream;
2733
pub use precondition::Precondition;
2834
use reqwest;
2935
use url::Url;
3036

31-
use crate::{
32-
error::ClientError,
33-
event::{Event, EventCandidate},
34-
};
35-
3637
/// Client for an [EventsourcingDB](https://www.eventsourcingdb.io/) instance.
3738
#[derive(Debug)]
3839
pub struct Client {
3940
base_url: Url,
4041
api_token: String,
41-
client: reqwest::Client,
42+
reqwest: reqwest::Client,
4243
}
4344

4445
impl Client {
@@ -47,7 +48,7 @@ impl Client {
4748
Client {
4849
base_url,
4950
api_token: api_token.into(),
50-
client: reqwest::Client::new(),
51+
reqwest: reqwest::Client::new(),
5152
}
5253
}
5354

@@ -93,8 +94,8 @@ impl Client {
9394
.map_err(ClientError::URLParseError)?;
9495

9596
let request = match endpoint.method() {
96-
reqwest::Method::GET => self.client.get(url),
97-
reqwest::Method::POST => self.client.post(url),
97+
reqwest::Method::GET => self.reqwest.get(url),
98+
reqwest::Method::POST => self.reqwest.post(url),
9899
_ => return Err(ClientError::InvalidRequestMethod),
99100
}
100101
.header("Authorization", format!("Bearer {}", self.api_token));
@@ -132,6 +133,28 @@ impl Client {
132133
}
133134
}
134135

136+
/// Utility function to request an endpoint of the API as a stream.
137+
///
138+
/// This means, that the response is streamed and returned as a stream of values.
139+
///
140+
/// # Errors
141+
/// This function will return an error if the request fails or if the URL is invalid.
142+
async fn request_streaming<R: StreamingRequest>(
143+
&self,
144+
endpoint: R,
145+
) -> Result<impl Stream<Item = Result<R::ItemType, ClientError>>, ClientError> {
146+
let response = self.build_request(&endpoint)?.send().await?;
147+
148+
if response.status().is_success() {
149+
Ok(endpoint.build_stream(response))
150+
} else {
151+
Err(ClientError::DBApiError(
152+
response.status(),
153+
response.text().await.unwrap_or_default(),
154+
))
155+
}
156+
}
157+
135158
/// Pings the DB instance to check if it is reachable.
136159
///
137160
/// ```
@@ -174,6 +197,130 @@ impl Client {
174197
Ok(())
175198
}
176199

200+
/// Registers an event schema with the DB instance.
201+
///
202+
/// ```
203+
/// use eventsourcingdb_client_rust::event::EventCandidate;
204+
/// use futures::StreamExt;
205+
/// # use serde_json::json;
206+
/// # tokio_test::block_on(async {
207+
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
208+
/// let db_url = "http://localhost:3000/";
209+
/// let api_token = "secrettoken";
210+
/// # let db_url = container.get_base_url().await.unwrap();
211+
/// # let api_token = container.get_api_token();
212+
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
213+
/// let event_type = "io.eventsourcingdb.test";
214+
/// let schema = json!({
215+
/// "type": "object",
216+
/// "properties": {
217+
/// "id": {
218+
/// "type": "string"
219+
/// },
220+
/// "name": {
221+
/// "type": "string"
222+
/// }
223+
/// },
224+
/// "required": ["id", "name"]
225+
/// });
226+
/// client.register_event_schema(event_type, &schema).await.expect("Failed to list event types");
227+
/// # })
228+
/// ```
229+
///
230+
/// # Errors
231+
/// This function will return an error if the request fails or if the provided schema is invalid.
232+
pub async fn register_event_schema(
233+
&self,
234+
event_type: &str,
235+
schema: &serde_json::Value,
236+
) -> Result<ManagementEvent, ClientError> {
237+
self.request_oneshot(RegisterEventSchemaRequest::try_new(event_type, schema)?)
238+
.await
239+
}
240+
241+
/// List all subjects in the DB instance.
242+
///
243+
/// To get all subjects in the DB, just pass `None` as the `base_subject`.
244+
/// ```
245+
/// use eventsourcingdb_client_rust::event::EventCandidate;
246+
/// use futures::StreamExt;
247+
/// # use serde_json::json;
248+
/// # tokio_test::block_on(async {
249+
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
250+
/// let db_url = "http://localhost:3000/";
251+
/// let api_token = "secrettoken";
252+
/// # let db_url = container.get_base_url().await.unwrap();
253+
/// # let api_token = container.get_api_token();
254+
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
255+
/// let mut subject_stream = client.list_subjects(None).await.expect("Failed to list event types");
256+
/// while let Some(subject) = subject_stream.next().await {
257+
/// println!("Found Type {}", subject.expect("Error while reading types"));
258+
/// }
259+
/// # })
260+
/// ```
261+
///
262+
/// To get all subjects under /test in the DB, just pass `Some("/test")` as the `base_subject`.
263+
/// ```
264+
/// use eventsourcingdb_client_rust::event::EventCandidate;
265+
/// use futures::StreamExt;
266+
/// # use serde_json::json;
267+
/// # tokio_test::block_on(async {
268+
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
269+
/// let db_url = "http://localhost:3000/";
270+
/// let api_token = "secrettoken";
271+
/// # let db_url = container.get_base_url().await.unwrap();
272+
/// # let api_token = container.get_api_token();
273+
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
274+
/// let mut subject_stream = client.list_subjects(Some("/test")).await.expect("Failed to list event types");
275+
/// while let Some(subject) = subject_stream.next().await {
276+
/// println!("Found Type {}", subject.expect("Error while reading types"));
277+
/// }
278+
/// # })
279+
/// ```
280+
///
281+
/// # Errors
282+
/// This function will return an error if the request fails or if the URL is invalid.
283+
pub async fn list_subjects(
284+
&self,
285+
base_subject: Option<&str>,
286+
) -> Result<impl Stream<Item = Result<String, ClientError>>, ClientError> {
287+
let response = self
288+
.request_streaming(ListSubjectsRequest {
289+
base_subject: base_subject.unwrap_or("/"),
290+
})
291+
.await?;
292+
Ok(response)
293+
}
294+
295+
/// List all event types in the DB instance.
296+
///
297+
/// ```
298+
/// use eventsourcingdb_client_rust::event::EventCandidate;
299+
/// use futures::StreamExt;
300+
/// # use serde_json::json;
301+
/// # tokio_test::block_on(async {
302+
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
303+
/// let db_url = "http://localhost:3000/";
304+
/// let api_token = "secrettoken";
305+
/// # let db_url = container.get_base_url().await.unwrap();
306+
/// # let api_token = container.get_api_token();
307+
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
308+
/// let mut type_stream = client.list_event_types().await.expect("Failed to list event types");
309+
/// while let Some(ty) = type_stream.next().await {
310+
/// println!("Found Type {}", ty.expect("Error while reading types").name);
311+
/// }
312+
/// # })
313+
/// ```
314+
///
315+
/// # Errors
316+
/// This function will return an error if the request fails or if the URL is invalid.
317+
pub async fn list_event_types(
318+
&self,
319+
) -> Result<impl Stream<Item = Result<EventType, ClientError>>, ClientError> {
320+
let response = self.request_streaming(ListEventTypesRequest).await?;
321+
Ok(response)
322+
}
323+
177324
/// Writes events to the DB instance.
178325
///
179326
/// ```

src/client/client_request.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,28 @@
11
//! This is a purely internal module to represent client requests to the database.
22
3+
pub mod list_event_types;
4+
mod list_subjects;
35
mod ping;
6+
mod register_event_schema;
47
mod verify_api_token;
58
mod write_events;
69

10+
pub use list_event_types::ListEventTypesRequest;
11+
pub use list_subjects::ListSubjectsRequest;
712
pub use ping::PingRequest;
13+
pub use register_event_schema::RegisterEventSchemaRequest;
814
pub use verify_api_token::VerifyApiTokenRequest;
915
pub use write_events::WriteEventsRequest;
1016

1117
use crate::error::ClientError;
18+
use futures::{Stream, stream::TryStreamExt};
19+
use futures_util::io;
1220
use reqwest::Method;
13-
use serde::{Serialize, de::DeserializeOwned};
21+
use serde::Serialize;
22+
use serde::de::DeserializeOwned;
23+
use tokio::io::{AsyncBufReadExt, BufReader};
24+
use tokio_stream::wrappers::LinesStream;
25+
use tokio_util::io::StreamReader;
1426

1527
/// Represents a request to the database client
1628
pub trait ClientRequest {
@@ -42,3 +54,23 @@ pub trait OneShotRequest: ClientRequest {
4254
Ok(())
4355
}
4456
}
57+
58+
/// Represents a request to the database that expects a stream of responses
59+
pub trait StreamingRequest: ClientRequest {
60+
type ItemType: DeserializeOwned;
61+
62+
fn build_stream(
63+
self,
64+
response: reqwest::Response,
65+
) -> impl Stream<Item = Result<Self::ItemType, ClientError>>;
66+
67+
fn lines_stream(
68+
response: reqwest::Response,
69+
) -> impl Stream<Item = Result<String, ClientError>> {
70+
let bytes = response
71+
.bytes_stream()
72+
.map_err(|err| io::Error::other(format!("Failed to read response stream: {err}")));
73+
let stream_reader = StreamReader::new(bytes);
74+
LinesStream::new(BufReader::new(stream_reader).lines()).map_err(ClientError::from)
75+
}
76+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use futures::{Stream, stream::StreamExt};
2+
use reqwest::Method;
3+
use serde::{Deserialize, Serialize};
4+
use serde_json::Value;
5+
6+
use crate::error::ClientError;
7+
8+
use super::{ClientRequest, StreamingRequest};
9+
#[derive(Deserialize, Debug)]
10+
#[serde(rename_all = "camelCase")]
11+
pub struct EventType {
12+
#[serde(rename = "eventType")]
13+
pub name: String,
14+
pub is_phantom: bool,
15+
pub schema: Option<Value>,
16+
}
17+
18+
#[derive(Debug, Clone, Serialize)]
19+
#[serde(rename_all = "camelCase")]
20+
pub struct ListEventTypesRequest;
21+
22+
impl ClientRequest for ListEventTypesRequest {
23+
const URL_PATH: &'static str = "/api/v1/read-event-types";
24+
const METHOD: Method = Method::POST;
25+
26+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
27+
Some(Ok(self))
28+
}
29+
}
30+
impl StreamingRequest for ListEventTypesRequest {
31+
type ItemType = EventType;
32+
33+
fn build_stream(
34+
self,
35+
response: reqwest::Response,
36+
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
37+
#[derive(Deserialize, Debug)]
38+
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
39+
enum LineItem {
40+
Error { error: String },
41+
EventType(EventType),
42+
}
43+
44+
impl From<LineItem> for Result<EventType, ClientError> {
45+
fn from(item: LineItem) -> Self {
46+
match item {
47+
LineItem::Error { error } => Err(ClientError::DBError(error)),
48+
LineItem::EventType(event_type) => Ok(event_type),
49+
}
50+
}
51+
}
52+
53+
Self::lines_stream(response).map(|line| {
54+
let line = line?;
55+
let item: LineItem = serde_json::from_str(line.as_str())?;
56+
item.into()
57+
})
58+
}
59+
}

0 commit comments

Comments
 (0)