Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6cab303
feat(client): add metadata and discovery compliance matching
Snapstromegon May 13, 2025
6c86db2
fix(client): correct default for listing subjects
Snapstromegon May 13, 2025
548d6dc
Merge branch 'main' into metadata-and-discovery
Snapstromegon May 15, 2025
e70b6ee
chore(client): run cargo fmt
Snapstromegon May 15, 2025
a6c814f
chore(client): further formatting and added documentation
Snapstromegon May 15, 2025
9d6d4fd
chore(formatting): undo some formatting to make PR smaller
Snapstromegon May 15, 2025
31b6bb4
chore(linting): add reasoning to allow clippy::needless_lifetimes
Snapstromegon May 15, 2025
e0026e8
feat(client): add reading and streaming events
Snapstromegon May 15, 2025
5c45884
fix(test): correct compile error in doc test
Snapstromegon May 16, 2025
a4016f6
feat(client): add observing events feature to client
Snapstromegon May 16, 2025
3bbeb7b
fix(Client): fix parsing the read stream
Snapstromegon May 18, 2025
92d3f38
Merge branch 'read-events' into observing-events
Snapstromegon May 18, 2025
9cbde95
Merge branch 'main' into metadata-and-discovery
dotKuro May 20, 2025
c1aac65
chore(lints): fix lints and formatting
Snapstromegon May 20, 2025
9019d49
Merge branch 'metadata-and-discovery' into read-events
Snapstromegon May 20, 2025
587a2de
chore(lints): fix new qa lints
Snapstromegon May 20, 2025
096e52a
Merge branch 'read-events' into observing-events
Snapstromegon May 20, 2025
c97ccd7
chore(lints): fix new qa lints and formatting
Snapstromegon May 20, 2025
2f04b6b
Merge branch 'main' into read-events
Snapstromegon May 22, 2025
a047d21
fix(client): resolve merge issues
Snapstromegon May 22, 2025
ad1ce4f
Merge branch 'read-events' into observing-events
Snapstromegon May 22, 2025
9573c22
Merge branch 'main' into observing-events
dotKuro May 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c
- 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
- 🚀 [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
- ❌ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
- [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
- 🚀 [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)
- 🚀 [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support)
39 changes: 36 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use crate::{
event::{Event, EventCandidate, ManagementEvent},
};
use client_request::{
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, OneShotRequest, PingRequest,
ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest, VerifyApiTokenRequest,
WriteEventsRequest, list_event_types::EventType,
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, ObserveEventsRequest,
OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest,
VerifyApiTokenRequest, WriteEventsRequest, list_event_types::EventType,
};
use futures::Stream;
pub use precondition::Precondition;
Expand Down Expand Up @@ -210,6 +210,39 @@ impl Client {
Ok(response)
}

/// Observe events from the DB instance.
///
/// ```
/// use eventsourcingdb_client_rust::event::EventCandidate;
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
/// # let api_token = container.get_api_token();
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
/// let mut event_stream = client.observe_events("/", None).await.expect("Failed to observe events");
/// while let Some(event) = event_stream.next().await {
/// println!("Found Type {:?}", event.expect("Error while reading event"));
/// }
/// # })
/// ```
///
/// # Errors
/// This function will return an error if the request fails or if the URL is invalid.
pub async fn observe_events<'a>(
&self,
subject: &'a str,
options: Option<request_options::ObserveEventsRequestOptions<'a>>,
) -> Result<impl Stream<Item = Result<Event, ClientError>>, ClientError> {
let response = self
.request_streaming(ObserveEventsRequest { subject, options })
.await?;
Ok(response)
}

/// Verifies the API token by sending a request to the DB instance.
///
/// ```
Expand Down
2 changes: 2 additions & 0 deletions src/client/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod list_event_types;
mod list_subjects;
mod observe_events;
mod ping;
mod read_events;
mod register_event_schema;
Expand All @@ -10,6 +11,7 @@ mod write_events;

pub use list_event_types::ListEventTypesRequest;
pub use list_subjects::ListSubjectsRequest;
pub use observe_events::ObserveEventsRequest;
pub use ping::PingRequest;
pub use read_events::ReadEventsRequest;
pub use register_event_schema::RegisterEventSchemaRequest;
Expand Down
28 changes: 28 additions & 0 deletions src/client/client_request/observe_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use reqwest::Method;
use serde::Serialize;

use crate::{
client::request_options::ObserveEventsRequestOptions, error::ClientError, event::Event,
};

use super::{ClientRequest, StreamingRequest};

#[derive(Debug, Clone, Serialize)]
pub struct ObserveEventsRequest<'a> {
pub subject: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
pub options: Option<ObserveEventsRequestOptions<'a>>,
}

impl ClientRequest for ObserveEventsRequest<'_> {
const URL_PATH: &'static str = "/api/v1/read-events";
const METHOD: Method = Method::POST;

fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
Some(Ok(self))
}
}

impl StreamingRequest for ObserveEventsRequest<'_> {
type ItemType = Event;
}
14 changes: 14 additions & 0 deletions src/client/request_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ pub struct ReadEventsRequestOptions<'a> {
pub upper_bound: Option<Bound<'a>>,
}

/// Options for observing events from the database
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ObserveEventsRequestOptions<'a> {
/// Start reading events from this start event
#[serde(skip_serializing_if = "Option::is_none")]
pub from_latest_event: Option<FromLatestEventOptions<'a>>,
/// Lower bound of events to read
#[serde(skip_serializing_if = "Option::is_none")]
pub lower_bound: Option<Bound<'a>>,
/// Include recursive subject's events
pub recursive: bool,
}

/// Ordering of the responses of requests
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "kebab-case")]
Expand Down