From 543754a83b3c77637d88bcf68b19d4e90f6b3a57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Mon, 2 Jun 2025 21:30:27 +0200 Subject: [PATCH] fix(client): correct observe-events issues shown by tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Raphael HΓΆser --- README.md | 2 +- src/client.rs | 21 ++++++-- src/client/client_request.rs | 5 +- src/client/client_request/observe_events.rs | 2 +- src/event/event_types/event_candidate.rs | 2 +- tests/observe_events.rs | 53 +++++++++++++++++++++ tests/run_eventql_query.rs | 16 +++++++ 7 files changed, 92 insertions(+), 9 deletions(-) create mode 100644 tests/observe_events.rs create mode 100644 tests/run_eventql_query.rs diff --git a/README.md b/README.md index 0149f5e..8e7d9da 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c - πŸš€ [Essentials](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#essentials) - πŸš€ [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events) - πŸš€ [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events) -- ❌ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql) +- πŸš€ [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql) - πŸš€ [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events) - πŸš€ [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery) - πŸš€ [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support) diff --git a/src/client.rs b/src/client.rs index 981b5c3..592bec4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -100,7 +100,7 @@ impl Client { reqwest::Method::POST => self.reqwest.post(url), _ => return Err(ClientError::InvalidRequestMethod), } - .header("Authorization", format!("Bearer {}", self.api_token)); + .bearer_auth(&self.api_token); let request = if let Some(body) = endpoint.body() { request .header("Content-Type", "application/json") @@ -224,9 +224,22 @@ impl Client { /// # let db_url = container.get_base_url().await.unwrap(); /// # let api_token = container.get_api_token(); /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); - /// let mut event_stream = client.observe_events("/", None).await.expect("Failed to observe events"); - /// while let Some(event) = event_stream.next().await { - /// println!("Found Type {:?}", event.expect("Error while reading event")); + /// # client.write_events( + /// # vec![ + /// # EventCandidate::builder() + /// # .source("https://www.eventsourcingdb.io".to_string()) + /// # .data(json!({"value": 1})) + /// # .subject("/test".to_string()) + /// # .r#type("io.eventsourcingdb.test".to_string()) + /// # .build() + /// # ], + /// # vec![] + /// # ).await.expect("Failed to write events"); + /// let mut event_stream = client.observe_events("/test", None).await.expect("Failed to observe events"); + /// match event_stream.next().await { + /// Some(Ok(event)) => println!("Found Event {:?}", event), + /// Some(Err(e)) => eprintln!("Error while reading event: {:?}", e), + /// None => println!("No more events."), /// } /// # }) /// ``` diff --git a/src/client/client_request.rs b/src/client/client_request.rs index 46b480a..cf5a713 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -17,6 +17,7 @@ pub use ping::PingRequest; pub use read_events::ReadEventsRequest; pub use register_event_schema::RegisterEventSchemaRequest; pub use run_eventql_query::RunEventqlQueryRequest; +use serde_json::Value; pub use verify_api_token::VerifyApiTokenRequest; pub use write_events::WriteEventsRequest; @@ -72,7 +73,7 @@ enum StreamLineItem { Error { error: String }, /// A heardbeat message was sent to keep the connection alive. /// This is only used when observing events, but it does not hurt to have it everywhere. - Heartbeat, + Heartbeat(Value), /// A successful response from the database /// Since the exact type of the payload is not known at this point, we use this as a fallback case. /// Every request item gets put in here and the type can be checked later on. @@ -114,7 +115,7 @@ pub trait StreamingRequest: ClientRequest { Some(Err(ClientError::DBError(error))) } // A heartbeat message was sent, which we ignore. - Ok(StreamLineItem::Heartbeat) => None, + Ok(StreamLineItem::Heartbeat(_value)) => None, // A successful response was sent with the correct type. Ok(StreamLineItem::Ok { payload, ty }) if ty == Self::ITEM_TYPE_NAME => { Some(Ok(payload)) diff --git a/src/client/client_request/observe_events.rs b/src/client/client_request/observe_events.rs index 706a444..489f1b1 100644 --- a/src/client/client_request/observe_events.rs +++ b/src/client/client_request/observe_events.rs @@ -15,7 +15,7 @@ pub struct ObserveEventsRequest<'a> { } impl ClientRequest for ObserveEventsRequest<'_> { - const URL_PATH: &'static str = "/api/v1/read-events"; + const URL_PATH: &'static str = "/api/v1/observe-events"; const METHOD: Method = Method::POST; fn body(&self) -> Option> { diff --git a/src/event/event_types/event_candidate.rs b/src/event/event_types/event_candidate.rs index fd21a0c..d680348 100644 --- a/src/event/event_types/event_candidate.rs +++ b/src/event/event_types/event_candidate.rs @@ -7,7 +7,7 @@ use typed_builder::TypedBuilder; use crate::error::EventError; /// Represents an event candidate that can be sent to the DB. -/// This is a simplified version of the [`super::Event`] type. +/// This is a simplified version of the [`super::event::Event`] type. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)] pub struct EventCandidate { /// The data of the event, serialized as JSON diff --git a/tests/observe_events.rs b/tests/observe_events.rs new file mode 100644 index 0000000..8d3c699 --- /dev/null +++ b/tests/observe_events.rs @@ -0,0 +1,53 @@ +mod utils; + +use eventsourcingdb_client_rust::container::Container; +use futures::stream::StreamExt; +use serde_json::json; +use utils::create_test_eventcandidate; + +#[tokio::test] +async fn observe_existing_events() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + let written = client + .write_events(vec![event_candidate.clone()], vec![]) + .await + .expect("Unable to write event"); + + let mut events_stream = client + .observe_events("/test", None) + .await + .expect("Failed to request events"); + let events = events_stream + .next() + .await + .expect("Failed to read events") + .expect("Expected an event, but got an error"); + + assert_eq!(vec![events], written); +} + +#[tokio::test] +async fn keep_observing_events() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + + let mut events_stream = client + .observe_events("/test", None) + .await + .expect("Failed to observe events"); + let event_candidate = create_test_eventcandidate("/test", json!({"value": 1})); + let written = client + .write_events(vec![event_candidate.clone()], vec![]) + .await + .expect("Unable to write event"); + + let event = events_stream + .next() + .await + .expect("Failed to read events") + .expect("Expected an event, but got an error"); + + assert_eq!(vec![event], written); +} diff --git a/tests/run_eventql_query.rs b/tests/run_eventql_query.rs new file mode 100644 index 0000000..d76661a --- /dev/null +++ b/tests/run_eventql_query.rs @@ -0,0 +1,16 @@ +use eventsourcingdb_client_rust::container::Container; +use futures::stream::TryStreamExt; + +#[tokio::test] +async fn run_empty_query() { + let container = Container::start_default().await.unwrap(); + let client = container.get_client().await.unwrap(); + let rows = client + .run_eventql_query("FROM e IN events ORDER BY e.time DESC TOP 100 PROJECT INTO e") + .await + .expect("Unable to run query"); + let rows: Result, _> = rows.try_collect().await; + assert!(rows.is_ok(), "Failed to run query: {:?}", rows); + let rows = rows.expect("Failed to read rows"); + assert_eq!(rows.len(), 0); +}