Skip to content

Commit a4016f6

Browse files
committed
feat(client): add observing events feature to client
Signed-off-by: Raphael Höser <[email protected]>
1 parent 5c45884 commit a4016f6

File tree

5 files changed

+77
-4
lines changed

5 files changed

+77
-4
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c
1111
- 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
1212
- 🚀 [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
1313
-[Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
14-
- [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
14+
- 🚀 [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
1515
- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)
1616
- 🚀 [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support)

src/client.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ use crate::{
2626
event::{Event, EventCandidate, ManagementEvent},
2727
};
2828
use client_request::{
29-
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest,
30-
ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest,
31-
WriteEventsRequest, list_event_types::EventType,
29+
list_event_types::EventType, ClientRequest, ListEventTypesRequest, ListSubjectsRequest, ObserveEventsRequest, OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest
3230
};
3331
use futures::Stream;
3432
pub use precondition::Precondition;
@@ -210,6 +208,39 @@ impl Client {
210208
Ok(response)
211209
}
212210

211+
/// Observe events from the DB instance.
212+
///
213+
/// ```
214+
/// use eventsourcingdb_client_rust::event::EventCandidate;
215+
/// use futures::StreamExt;
216+
/// # use serde_json::json;
217+
/// # tokio_test::block_on(async {
218+
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
219+
/// let db_url = "http://localhost:3000/";
220+
/// let api_token = "secrettoken";
221+
/// # let db_url = container.get_base_url().await.unwrap();
222+
/// # let api_token = container.get_api_token();
223+
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
224+
/// let mut event_stream = client.observe_events("/", None).await.expect("Failed to observe events");
225+
/// while let Some(event) = event_stream.next().await {
226+
/// println!("Found Type {:?}", event.expect("Error while reading event"));
227+
/// }
228+
/// # })
229+
/// ```
230+
///
231+
/// # Errors
232+
/// This function will return an error if the request fails or if the URL is invalid.
233+
pub async fn observe_events<'a>(
234+
&self,
235+
subject: &'a str,
236+
options: Option<request_options::ObserveEventsRequestOptions<'a>>,
237+
) -> Result<impl Stream<Item = Result<Event, ClientError>>, ClientError> {
238+
let response = self
239+
.request_streaming(ObserveEventsRequest { subject, options })
240+
.await?;
241+
Ok(response)
242+
}
243+
213244
/// Verifies the API token by sending a request to the DB instance.
214245
///
215246
/// ```

src/client/client_request.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
pub mod list_event_types;
44
mod list_subjects;
5+
mod observe_events;
56
mod ping;
67
mod read_events;
78
mod register_event_schema;
@@ -10,6 +11,7 @@ mod write_events;
1011

1112
pub use list_event_types::ListEventTypesRequest;
1213
pub use list_subjects::ListSubjectsRequest;
14+
pub use observe_events::ObserveEventsRequest;
1315
pub use ping::PingRequest;
1416
pub use read_events::ReadEventsRequest;
1517
pub use register_event_schema::RegisterEventSchemaRequest;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use reqwest::Method;
2+
use serde::Serialize;
3+
4+
use crate::{client::request_options::ObserveEventsRequestOptions, error::ClientError, event::Event};
5+
6+
use super::{ClientRequest, StreamingRequest};
7+
8+
#[derive(Debug, Clone, Serialize)]
9+
pub struct ObserveEventsRequest<'a> {
10+
pub subject: &'a str,
11+
#[serde(skip_serializing_if = "Option::is_none")]
12+
pub options: Option<ObserveEventsRequestOptions<'a>>,
13+
}
14+
15+
impl<'a> ClientRequest for ObserveEventsRequest<'a> {
16+
const URL_PATH: &'static str = "/api/v1/read-events";
17+
const METHOD: Method = Method::POST;
18+
19+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
20+
Some(Ok(self))
21+
}
22+
}
23+
24+
impl<'a> StreamingRequest for ObserveEventsRequest<'a> {
25+
type ItemType = Event;
26+
}

src/client/request_options.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,20 @@ pub struct ReadEventsRequestOptions<'a> {
2222
pub upper_bound: Option<Bound<'a>>,
2323
}
2424

25+
/// Options for observing events from the database
26+
#[derive(Debug, Clone, Serialize)]
27+
#[serde(rename_all = "camelCase")]
28+
pub struct ObserveEventsRequestOptions<'a> {
29+
/// Start reading events from this start event
30+
#[serde(skip_serializing_if = "Option::is_none")]
31+
pub from_latest_event: Option<FromLatestEventOptions<'a>>,
32+
/// Lower bound of events to read
33+
#[serde(skip_serializing_if = "Option::is_none")]
34+
pub lower_bound: Option<Bound<'a>>,
35+
/// Include recursive subject's events
36+
pub recursive: bool,
37+
}
38+
2539
/// Ordering of the responses of requests
2640
#[derive(Debug, Clone, Serialize)]
2741
#[serde(rename_all = "kebab-case")]

0 commit comments

Comments
 (0)