Skip to content

Commit 4cf2e8a

Browse files
authored
Merge branch 'main' into dependabot/cargo/reqwest-0.12.18
2 parents 9d1d528 + 4c2fbc4 commit 4cf2e8a

File tree

3 files changed

+91
-2
lines changed

3 files changed

+91
-2
lines changed

src/client.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ use crate::{
2727
};
2828
use client_request::{
2929
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, ObserveEventsRequest,
30-
OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest, StreamingRequest,
31-
VerifyApiTokenRequest, WriteEventsRequest, list_event_types::EventType,
30+
OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest,
31+
RunEventqlQueryRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest,
32+
list_event_types::EventType,
3233
};
3334
use futures::Stream;
3435
pub use precondition::Precondition;
@@ -425,4 +426,37 @@ impl Client {
425426
})
426427
.await
427428
}
429+
430+
/// Run an eventql query against the DB.
431+
///
432+
/// ```
433+
/// use eventsourcingdb_client_rust::event::EventCandidate;
434+
/// use futures::StreamExt;
435+
/// # use serde_json::json;
436+
/// # tokio_test::block_on(async {
437+
/// # let container = eventsourcingdb_client_rust::container::Container::start_default().await.unwrap();
438+
/// let db_url = "http://localhost:3000/";
439+
/// let api_token = "secrettoken";
440+
/// # let db_url = container.get_base_url().await.unwrap();
441+
/// # let api_token = container.get_api_token();
442+
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
443+
/// let query = "FROM e IN events ORDER BY e.time DESC TOP 100 PROJECT INTO e";
444+
/// let mut row_stream = client.run_eventql_query(query).await.expect("Failed to list event types");
445+
/// while let Some(row) = row_stream.next().await {
446+
/// println!("Found row {:?}", row.expect("Error while reading row"));
447+
/// }
448+
/// # })
449+
/// ```
450+
///
451+
/// # Errors
452+
/// This function will return an error if the request fails or if the URL is invalid.
453+
pub async fn run_eventql_query(
454+
&self,
455+
query: &str,
456+
) -> Result<impl Stream<Item = Result<serde_json::Value, ClientError>>, ClientError> {
457+
let response = self
458+
.request_streaming(RunEventqlQueryRequest { query })
459+
.await?;
460+
Ok(response)
461+
}
428462
}

src/client/client_request.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod observe_events;
66
mod ping;
77
mod read_events;
88
mod register_event_schema;
9+
mod run_eventql_query;
910
mod verify_api_token;
1011
mod write_events;
1112

@@ -15,6 +16,7 @@ pub use observe_events::ObserveEventsRequest;
1516
pub use ping::PingRequest;
1617
pub use read_events::ReadEventsRequest;
1718
pub use register_event_schema::RegisterEventSchemaRequest;
19+
pub use run_eventql_query::RunEventqlQueryRequest;
1820
pub use verify_api_token::VerifyApiTokenRequest;
1921
pub use write_events::WriteEventsRequest;
2022

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use futures::{Stream, StreamExt};
2+
use reqwest::Method;
3+
use serde::{Deserialize, Serialize};
4+
5+
use crate::error::ClientError;
6+
7+
use super::{ClientRequest, StreamingRequest};
8+
9+
type EventqlRow = serde_json::Value;
10+
11+
#[derive(Debug, Clone, Serialize)]
12+
pub struct RunEventqlQueryRequest<'a> {
13+
pub query: &'a str,
14+
}
15+
16+
impl ClientRequest for RunEventqlQueryRequest<'_> {
17+
const URL_PATH: &'static str = "/api/v1/run-eventql-query";
18+
const METHOD: Method = Method::POST;
19+
20+
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
21+
Some(Ok(self))
22+
}
23+
}
24+
25+
impl StreamingRequest for RunEventqlQueryRequest<'_> {
26+
type ItemType = EventqlRow;
27+
28+
fn build_stream(
29+
response: reqwest::Response,
30+
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
31+
#[derive(Deserialize, Debug)]
32+
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
33+
enum LineItem {
34+
Error { error: String },
35+
Row(EventqlRow),
36+
}
37+
38+
impl From<LineItem> for Result<EventqlRow, ClientError> {
39+
fn from(item: LineItem) -> Self {
40+
match item {
41+
LineItem::Error { error } => Err(ClientError::DBError(error)),
42+
LineItem::Row(row) => Ok(row),
43+
}
44+
}
45+
}
46+
47+
Self::lines_stream(response).map(|line| {
48+
let line = line?;
49+
let item = serde_json::from_str(line.as_str())?;
50+
Ok(item)
51+
})
52+
}
53+
}

0 commit comments

Comments
 (0)