Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c
- 🚀 [Essentials](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#essentials)
- 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
- 🚀 [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
- [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
- 🚀 [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
- 🚀 [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)
- 🚀 [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support)
21 changes: 17 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Client {
reqwest::Method::POST => self.reqwest.post(url),
_ => return Err(ClientError::InvalidRequestMethod),
}
.header("Authorization", format!("Bearer {}", self.api_token));
.bearer_auth(&self.api_token);
let request = if let Some(body) = endpoint.body() {
request
.header("Content-Type", "application/json")
Expand Down Expand Up @@ -224,9 +224,22 @@ impl Client {
/// # let db_url = container.get_base_url().await.unwrap();
/// # let api_token = container.get_api_token();
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
/// let mut event_stream = client.observe_events("/", None).await.expect("Failed to observe events");
/// while let Some(event) = event_stream.next().await {
/// println!("Found Type {:?}", event.expect("Error while reading event"));
/// # client.write_events(
/// # vec![
/// # EventCandidate::builder()
/// # .source("https://www.eventsourcingdb.io".to_string())
/// # .data(json!({"value": 1}))
/// # .subject("/test".to_string())
/// # .r#type("io.eventsourcingdb.test".to_string())
/// # .build()
/// # ],
/// # vec![]
/// # ).await.expect("Failed to write events");
/// let mut event_stream = client.observe_events("/test", None).await.expect("Failed to observe events");
/// match event_stream.next().await {
/// Some(Ok(event)) => println!("Found Event {:?}", event),
/// Some(Err(e)) => eprintln!("Error while reading event: {:?}", e),
/// None => println!("No more events."),
/// }
/// # })
/// ```
Expand Down
5 changes: 3 additions & 2 deletions src/client/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub use ping::PingRequest;
pub use read_events::ReadEventsRequest;
pub use register_event_schema::RegisterEventSchemaRequest;
pub use run_eventql_query::RunEventqlQueryRequest;
use serde_json::Value;
pub use verify_api_token::VerifyApiTokenRequest;
pub use write_events::WriteEventsRequest;

Expand Down Expand Up @@ -72,7 +73,7 @@ enum StreamLineItem<T> {
Error { error: String },
/// A heardbeat message was sent to keep the connection alive.
/// This is only used when observing events, but it does not hurt to have it everywhere.
Heartbeat,
Heartbeat(Value),
/// A successful response from the database
/// Since the exact type of the payload is not known at this point, we use this as a fallback case.
/// Every request item gets put in here and the type can be checked later on.
Expand Down Expand Up @@ -114,7 +115,7 @@ pub trait StreamingRequest: ClientRequest {
Some(Err(ClientError::DBError(error)))
}
// A heartbeat message was sent, which we ignore.
Ok(StreamLineItem::Heartbeat) => None,
Ok(StreamLineItem::Heartbeat(_value)) => None,
// A successful response was sent with the correct type.
Ok(StreamLineItem::Ok { payload, ty }) if ty == Self::ITEM_TYPE_NAME => {
Some(Ok(payload))
Expand Down
2 changes: 1 addition & 1 deletion src/client/client_request/observe_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct ObserveEventsRequest<'a> {
}

impl ClientRequest for ObserveEventsRequest<'_> {
const URL_PATH: &'static str = "/api/v1/read-events";
const URL_PATH: &'static str = "/api/v1/observe-events";
const METHOD: Method = Method::POST;

fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
Expand Down
2 changes: 1 addition & 1 deletion src/event/event_types/event_candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use typed_builder::TypedBuilder;
use crate::error::EventError;

/// Represents an event candidate that can be sent to the DB.
/// This is a simplified version of the [`super::Event`] type.
/// This is a simplified version of the [`super::event::Event`] type.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)]
pub struct EventCandidate {
/// The data of the event, serialized as JSON
Expand Down
53 changes: 53 additions & 0 deletions tests/observe_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
mod utils;

use eventsourcingdb_client_rust::container::Container;
use futures::stream::StreamExt;
use serde_json::json;
use utils::create_test_eventcandidate;

#[tokio::test]
async fn observe_existing_events() {
let container = Container::start_default().await.unwrap();
let client = container.get_client().await.unwrap();
let event_candidate = create_test_eventcandidate("/test", json!({"value": 1}));
let written = client
.write_events(vec![event_candidate.clone()], vec![])
.await
.expect("Unable to write event");

let mut events_stream = client
.observe_events("/test", None)
.await
.expect("Failed to request events");
let events = events_stream
.next()
.await
.expect("Failed to read events")
.expect("Expected an event, but got an error");

assert_eq!(vec![events], written);
}

#[tokio::test]
async fn keep_observing_events() {
let container = Container::start_default().await.unwrap();
let client = container.get_client().await.unwrap();

let mut events_stream = client
.observe_events("/test", None)
.await
.expect("Failed to observe events");
let event_candidate = create_test_eventcandidate("/test", json!({"value": 1}));
let written = client
.write_events(vec![event_candidate.clone()], vec![])
.await
.expect("Unable to write event");

let event = events_stream
.next()
.await
.expect("Failed to read events")
.expect("Expected an event, but got an error");

assert_eq!(vec![event], written);
}
16 changes: 16 additions & 0 deletions tests/run_eventql_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use eventsourcingdb_client_rust::container::Container;
use futures::stream::TryStreamExt;

#[tokio::test]
async fn run_empty_query() {
let container = Container::start_default().await.unwrap();
let client = container.get_client().await.unwrap();
let rows = client
.run_eventql_query("FROM e IN events ORDER BY e.time DESC TOP 100 PROJECT INTO e")
.await
.expect("Unable to run query");
let rows: Result<Vec<_>, _> = rows.try_collect().await;
assert!(rows.is_ok(), "Failed to run query: {:?}", rows);
let rows = rows.expect("Failed to read rows");
assert_eq!(rows.len(), 0);
}