diff --git a/README.md b/README.md index 9d26a12..0149f5e 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,6 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c - 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events) - 🚀 [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events) - ❌ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql) -- ❌ [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events) +- 🚀 [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events) - 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery) - 🚀 [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support) diff --git a/src/client.rs b/src/client.rs index 113e125..674a00f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -26,9 +26,9 @@ use crate::{ event::{Event, EventCandidate, ManagementEvent}, }; use client_request::{ - ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest, - ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest, - WriteEventsRequest, list_event_types::EventType, + ClientRequest, ListEventTypesRequest, ListSubjectsRequest, ObserveEventsRequest, + OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest, + VerifyApiTokenRequest, WriteEventsRequest, list_event_types::EventType, }; use futures::Stream; pub use precondition::Precondition; @@ -210,6 +210,39 @@ impl Client { Ok(response) } + /// Observe events from the DB instance. + /// + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let mut event_stream = client.observe_events("/", None).await.expect("Failed to observe events"); + /// while let Some(event) = event_stream.next().await { + /// println!("Found Type {:?}", event.expect("Error while reading event")); + /// } + /// # }) + /// ``` + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + pub async fn observe_events<'a>( + &self, + subject: &'a str, + options: Option>, + ) -> Result>, ClientError> { + let response = self + .request_streaming(ObserveEventsRequest { subject, options }) + .await?; + Ok(response) + } + /// Verifies the API token by sending a request to the DB instance. /// /// ``` diff --git a/src/client/client_request.rs b/src/client/client_request.rs index a5366eb..7dba5ec 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -2,6 +2,7 @@ pub mod list_event_types; mod list_subjects; +mod observe_events; mod ping; mod read_events; mod register_event_schema; @@ -10,6 +11,7 @@ mod write_events; pub use list_event_types::ListEventTypesRequest; pub use list_subjects::ListSubjectsRequest; +pub use observe_events::ObserveEventsRequest; pub use ping::PingRequest; pub use read_events::ReadEventsRequest; pub use register_event_schema::RegisterEventSchemaRequest; diff --git a/src/client/client_request/observe_events.rs b/src/client/client_request/observe_events.rs new file mode 100644 index 0000000..f57f463 --- /dev/null +++ b/src/client/client_request/observe_events.rs @@ -0,0 +1,28 @@ +use reqwest::Method; +use serde::Serialize; + +use crate::{ + client::request_options::ObserveEventsRequestOptions, error::ClientError, event::Event, +}; + +use super::{ClientRequest, StreamingRequest}; + +#[derive(Debug, Clone, Serialize)] +pub struct ObserveEventsRequest<'a> { + pub subject: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + pub options: Option>, +} + +impl ClientRequest for ObserveEventsRequest<'_> { + const URL_PATH: &'static str = "/api/v1/read-events"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} + +impl StreamingRequest for ObserveEventsRequest<'_> { + type ItemType = Event; +} diff --git a/src/client/request_options.rs b/src/client/request_options.rs index 40647ce..c9aecae 100644 --- a/src/client/request_options.rs +++ b/src/client/request_options.rs @@ -22,6 +22,20 @@ pub struct ReadEventsRequestOptions<'a> { pub upper_bound: Option>, } +/// Options for observing events from the database +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ObserveEventsRequestOptions<'a> { + /// Start reading events from this start event + #[serde(skip_serializing_if = "Option::is_none")] + pub from_latest_event: Option>, + /// Lower bound of events to read + #[serde(skip_serializing_if = "Option::is_none")] + pub lower_bound: Option>, + /// Include recursive subject's events + pub recursive: bool, +} + /// Ordering of the responses of requests #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "kebab-case")]