Skip to content

Commit b308223

Browse files
feat(client): add Observing-events feature (#11)
* feat(client): add metadata and discovery compliance matching This implementation is incomplete and lacking tests Signed-off-by: Raphael Höser <[email protected]> * fix(client): correct default for listing subjects Signed-off-by: Raphael Höser <[email protected]> * chore(client): run cargo fmt * chore(client): further formatting and added documentation * chore(formatting): undo some formatting to make PR smaller Signed-off-by: Raphael Höser <[email protected]> * chore(linting): add reasoning to allow clippy::needless_lifetimes Signed-off-by: Raphael Höser <[email protected]> * feat(client): add reading and streaming events Tests are still missing here Signed-off-by: Raphael Höser <[email protected]> * fix(test): correct compile error in doc test Signed-off-by: Raphael Höser <[email protected]> * feat(client): add observing events feature to client Signed-off-by: Raphael Höser <[email protected]> * fix(Client): fix parsing the read stream Signed-off-by: Raphael Höser <[email protected]> * chore(lints): fix lints and formatting * chore(lints): fix new qa lints * chore(lints): fix new qa lints and formatting * fix(client): resolve merge issues --------- Signed-off-by: Raphael Höser <[email protected]> Co-authored-by: Kuro <[email protected]> Co-authored-by: Alexander Kampf <[email protected]>
1 parent f741bf6 commit b308223

File tree

5 files changed

+81
-4
lines changed

5 files changed

+81
-4
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c
1111
- 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
1212
- 🚀 [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
1313
-[Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
14-
- [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
14+
- 🚀 [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
1515
- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)
1616
- 🚀 [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support)

src/client.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ use crate::{
2626
event::{Event, EventCandidate, ManagementEvent},
2727
};
2828
use client_request::{
29-
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest,
30-
ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest,
31-
WriteEventsRequest, list_event_types::EventType,
29+
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, ObserveEventsRequest,
30+
OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest,
31+
VerifyApiTokenRequest, WriteEventsRequest, list_event_types::EventType,
3232
};
3333
use futures::Stream;
3434
pub use precondition::Precondition;
@@ -210,6 +210,39 @@ impl Client {
210210
Ok(response)
211211
}
212212

213+
/// Observe events from the DB instance.
214+
///
215+
/// ```
216+
/// use eventsourcingdb_client_rust::event::EventCandidate;
217+
/// use futures::StreamExt;
218+
/// # use serde_json::json;
219+
/// # tokio_test::block_on(async {
220+
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
221+
/// let db_url = "http://localhost:3000/";
222+
/// let api_token = "secrettoken";
223+
/// # let db_url = container.get_base_url().await.unwrap();
224+
/// # let api_token = container.get_api_token();
225+
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
226+
/// let mut event_stream = client.observe_events("/", None).await.expect("Failed to observe events");
227+
/// while let Some(event) = event_stream.next().await {
228+
/// println!("Found Type {:?}", event.expect("Error while reading event"));
229+
/// }
230+
/// # })
231+
/// ```
232+
///
233+
/// # Errors
234+
/// This function will return an error if the request fails or if the URL is invalid.
235+
pub async fn observe_events<'a>(
236+
&self,
237+
subject: &'a str,
238+
options: Option<request_options::ObserveEventsRequestOptions<'a>>,
239+
) -> Result<impl Stream<Item = Result<Event, ClientError>>, ClientError> {
240+
let response = self
241+
.request_streaming(ObserveEventsRequest { subject, options })
242+
.await?;
243+
Ok(response)
244+
}
245+
213246
/// Verifies the API token by sending a request to the DB instance.
214247
///
215248
/// ```

src/client/client_request.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
pub mod list_event_types;
44
mod list_subjects;
5+
mod observe_events;
56
mod ping;
67
mod read_events;
78
mod register_event_schema;
@@ -10,6 +11,7 @@ mod write_events;
1011

1112
pub use list_event_types::ListEventTypesRequest;
1213
pub use list_subjects::ListSubjectsRequest;
14+
pub use observe_events::ObserveEventsRequest;
1315
pub use ping::PingRequest;
1416
pub use read_events::ReadEventsRequest;
1517
pub use register_event_schema::RegisterEventSchemaRequest;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use reqwest::Method;
2+
use serde::Serialize;
3+
4+
use crate::{
5+
client::request_options::ObserveEventsRequestOptions, error::ClientError, event::Event,
6+
};
7+
8+
use super::{ClientRequest, StreamingRequest};
9+
10+
#[derive(Debug, Clone, Serialize)]
11+
pub struct ObserveEventsRequest<'a> {
12+
pub subject: &'a str,
13+
#[serde(skip_serializing_if = "Option::is_none")]
14+
pub options: Option<ObserveEventsRequestOptions<'a>>,
15+
}
16+
17+
impl ClientRequest for ObserveEventsRequest<'_> {
18+
const URL_PATH: &'static str = "/api/v1/read-events";
19+
const METHOD: Method = Method::POST;
20+
21+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
22+
Some(Ok(self))
23+
}
24+
}
25+
26+
impl StreamingRequest for ObserveEventsRequest<'_> {
27+
type ItemType = Event;
28+
}

src/client/request_options.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,20 @@ pub struct ReadEventsRequestOptions<'a> {
2222
pub upper_bound: Option<Bound<'a>>,
2323
}
2424

25+
/// Options for observing events from the database
26+
#[derive(Debug, Clone, Serialize)]
27+
#[serde(rename_all = "camelCase")]
28+
pub struct ObserveEventsRequestOptions<'a> {
29+
/// Start reading events from this start event
30+
#[serde(skip_serializing_if = "Option::is_none")]
31+
pub from_latest_event: Option<FromLatestEventOptions<'a>>,
32+
/// Lower bound of events to read
33+
#[serde(skip_serializing_if = "Option::is_none")]
34+
pub lower_bound: Option<Bound<'a>>,
35+
/// Include recursive subject's events
36+
pub recursive: bool,
37+
}
38+
2539
/// Ordering of the responses of requests
2640
#[derive(Debug, Clone, Serialize)]
2741
#[serde(rename_all = "kebab-case")]

0 commit comments

Comments
 (0)