Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use crate::{
};
use client_request::{
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, ObserveEventsRequest,
OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest,
VerifyApiTokenRequest, WriteEventsRequest, list_event_types::EventType,
OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest,
RunEventqlQueryRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest,
list_event_types::EventType,
};
use futures::Stream;
pub use precondition::Precondition;
Expand Down Expand Up @@ -425,4 +426,37 @@ impl Client {
})
.await
}

/// Run an eventql query against the DB.
///
/// ```
/// use eventsourcingdb_client_rust::event::EventCandidate;
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
/// # let api_token = container.get_api_token();
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
/// let query = "FROM e IN events ORDER BY e.time DESC TOP 100 PROJECT INTO e";
/// let mut row_stream = client.run_eventql_query(query).await.expect("Failed to list event types");
/// while let Some(row) = row_stream.next().await {
/// println!("Found row {:?}", row.expect("Error while reading row"));
/// }
/// # })
/// ```
///
/// # Errors
/// This function will return an error if the request fails or if the URL is invalid.
pub async fn run_eventql_query(
&self,
query: &str,
) -> Result<impl Stream<Item = Result<serde_json::Value, ClientError>>, ClientError> {
let response = self
.request_streaming(RunEventqlQueryRequest { query })
.await?;
Ok(response)
}
}
2 changes: 2 additions & 0 deletions src/client/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod observe_events;
mod ping;
mod read_events;
mod register_event_schema;
mod run_eventql_query;
mod verify_api_token;
mod write_events;

Expand All @@ -15,6 +16,7 @@ pub use observe_events::ObserveEventsRequest;
pub use ping::PingRequest;
pub use read_events::ReadEventsRequest;
pub use register_event_schema::RegisterEventSchemaRequest;
pub use run_eventql_query::RunEventqlQueryRequest;
pub use verify_api_token::VerifyApiTokenRequest;
pub use write_events::WriteEventsRequest;

Expand Down
53 changes: 53 additions & 0 deletions src/client/client_request/run_eventql_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use futures::{Stream, StreamExt};
use reqwest::Method;
use serde::{Deserialize, Serialize};

use crate::error::ClientError;

use super::{ClientRequest, StreamingRequest};

type EventqlRow = serde_json::Value;

#[derive(Debug, Clone, Serialize)]
pub struct RunEventqlQueryRequest<'a> {
pub query: &'a str,
}

impl ClientRequest for RunEventqlQueryRequest<'_> {
const URL_PATH: &'static str = "/api/v1/run-eventql-query";
const METHOD: Method = Method::POST;

fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
Some(Ok(self))
}
}

impl StreamingRequest for RunEventqlQueryRequest<'_> {
type ItemType = EventqlRow;

fn build_stream(
response: reqwest::Response,
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
#[derive(Deserialize, Debug)]
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
enum LineItem {
Error { error: String },
Row(EventqlRow),
}

impl From<LineItem> for Result<EventqlRow, ClientError> {
fn from(item: LineItem) -> Self {
match item {
LineItem::Error { error } => Err(ClientError::DBError(error)),
LineItem::Row(row) => Ok(row),
}
}
}

Self::lines_stream(response).map(|line| {
let line = line?;
let item = serde_json::from_str(line.as_str())?;
Ok(item)
})
}
}