From 33c295aa453ab81e64d649ac14b3acce6221254f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Tue, 27 May 2025 11:05:42 +0200 Subject: [PATCH] feat(client): add support for running eventql queries --- src/client.rs | 38 ++++++++++++- src/client/client_request.rs | 2 + .../client_request/run_eventql_query.rs | 53 +++++++++++++++++++ 3 files changed, 91 insertions(+), 2 deletions(-) create mode 100644 src/client/client_request/run_eventql_query.rs diff --git a/src/client.rs b/src/client.rs index 674a00f..981b5c3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -27,8 +27,9 @@ use crate::{ }; use client_request::{ ClientRequest, ListEventTypesRequest, ListSubjectsRequest, ObserveEventsRequest, - OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest, - VerifyApiTokenRequest, WriteEventsRequest, list_event_types::EventType, + OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest, + RunEventqlQueryRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest, + list_event_types::EventType, }; use futures::Stream; pub use precondition::Precondition; @@ -425,4 +426,37 @@ impl Client { }) .await } + + /// Run an eventql query against the DB. + /// + /// ``` + /// use eventsourcingdb_client_rust::event::EventCandidate; + /// use futures::StreamExt; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token); + /// let query = "FROM e IN events ORDER BY e.time DESC TOP 100 PROJECT INTO e"; + /// let mut row_stream = client.run_eventql_query(query).await.expect("Failed to list event types"); + /// while let Some(row) = row_stream.next().await { + /// println!("Found row {:?}", row.expect("Error while reading row")); + /// } + /// # }) + /// ``` + /// + /// # Errors + /// This function will return an error if the request fails or if the URL is invalid. + pub async fn run_eventql_query( + &self, + query: &str, + ) -> Result>, ClientError> { + let response = self + .request_streaming(RunEventqlQueryRequest { query }) + .await?; + Ok(response) + } } diff --git a/src/client/client_request.rs b/src/client/client_request.rs index 7dba5ec..b4ed7fe 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -6,6 +6,7 @@ mod observe_events; mod ping; mod read_events; mod register_event_schema; +mod run_eventql_query; mod verify_api_token; mod write_events; @@ -15,6 +16,7 @@ pub use observe_events::ObserveEventsRequest; pub use ping::PingRequest; pub use read_events::ReadEventsRequest; pub use register_event_schema::RegisterEventSchemaRequest; +pub use run_eventql_query::RunEventqlQueryRequest; pub use verify_api_token::VerifyApiTokenRequest; pub use write_events::WriteEventsRequest; diff --git a/src/client/client_request/run_eventql_query.rs b/src/client/client_request/run_eventql_query.rs new file mode 100644 index 0000000..478739b --- /dev/null +++ b/src/client/client_request/run_eventql_query.rs @@ -0,0 +1,53 @@ +use futures::{Stream, StreamExt}; +use reqwest::Method; +use serde::{Deserialize, Serialize}; + +use crate::error::ClientError; + +use super::{ClientRequest, StreamingRequest}; + +type EventqlRow = serde_json::Value; + +#[derive(Debug, Clone, Serialize)] +pub struct RunEventqlQueryRequest<'a> { + pub query: &'a str, +} + +impl ClientRequest for RunEventqlQueryRequest<'_> { + const URL_PATH: &'static str = "/api/v1/run-eventql-query"; + const METHOD: Method = Method::POST; + + fn body(&self) -> Option> { + Some(Ok(self)) + } +} + +impl StreamingRequest for RunEventqlQueryRequest<'_> { + type ItemType = EventqlRow; + + fn build_stream( + response: reqwest::Response, + ) -> impl Stream> { + #[derive(Deserialize, Debug)] + #[serde(tag = "type", content = "payload", rename_all = "camelCase")] + enum LineItem { + Error { error: String }, + Row(EventqlRow), + } + + impl From for Result { + fn from(item: LineItem) -> Self { + match item { + LineItem::Error { error } => Err(ClientError::DBError(error)), + LineItem::Row(row) => Ok(row), + } + } + } + + Self::lines_stream(response).map(|line| { + let line = line?; + let item = serde_json::from_str(line.as_str())?; + Ok(item) + }) + } +}