Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,23 @@ match result {

*Note that according to the CloudEvents standard, event IDs must be of type string.*

#### Using the `IsEventQLTrue` precondition

If you want to write events depending on an EventQL query, use the `IsEventQLTrue` precondition to create a precondition and pass it in a vector as the second argument:

```rust
let result = client.write_events(
vec![event.clone()],
vec![Precondition::IsEventQLTrue {
query: "FROM e IN events WHERE e.type == 'io.eventsourcingdb.library.book-borrowed' PROJECT INTO COUNT() < 10".to_string(),
}],
).await;
match result {
Ok(written_events) => // ...
Err(err) => // ...
}
```

### Reading Events

To read all events of a subject, call the `read_events` function with the subject and an options object. Set the `recursive` option to `false`. This ensures that only events of the given subject are returned, not events of nested subjects.
Expand Down
6 changes: 6 additions & 0 deletions src/client/precondition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@ pub enum Precondition {
#[serde(rename = "eventId")]
event_id: String,
},
/// Check if an EventQL query returns true
#[serde(rename = "isEventQlTrue")]
IsEventQLTrue {
/// The EventQL query to check
query: String,
},
}
16 changes: 9 additions & 7 deletions tests/essentials.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use eventsourcingdb::{Client, container::Container};
mod utils;
use eventsourcingdb::Client;
use utils::create_test_container;

#[tokio::test]
async fn ping() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
client.ping().await.expect("Failed to ping");
}
Expand All @@ -11,12 +13,12 @@ async fn ping() {
async fn ping_unavailable_server_errors() {
let client = Client::new("http://localhost:12345".parse().unwrap(), "secrettoken");
let result = client.ping().await;
assert!(result.is_err(), "Expected an error, but got: {:?}", result);
assert!(result.is_err(), "Expected an error, but got: {result:?}");
}

#[tokio::test]
async fn verify_api_token() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
client
.verify_api_token()
Expand All @@ -28,14 +30,14 @@ async fn verify_api_token() {
async fn verify_api_token_unavailable_server_errors() {
let client = Client::new("http://localhost:12345".parse().unwrap(), "secrettoken");
let result = client.verify_api_token().await;
assert!(result.is_err(), "Expected an error, but got: {:?}", result);
assert!(result.is_err(), "Expected an error, but got: {result:?}");
}

#[tokio::test]
async fn verify_api_token_invalid_token_errors() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let invalid_client = Client::new(client.get_base_url().clone(), "invalid_token");
let result = invalid_client.verify_api_token().await;
assert!(result.is_err(), "Expected an error, but got: {:?}", result);
assert!(result.is_err(), "Expected an error, but got: {result:?}");
}
23 changes: 11 additions & 12 deletions tests/metadata_and_discovery.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use eventsourcingdb::container::Container;
mod utils;
use futures::StreamExt;
use serde_json::json;
use utils::create_test_container;

#[tokio::test]
async fn register_event_schema() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
client
.register_event_schema(
Expand All @@ -28,7 +29,7 @@ async fn register_event_schema() {

#[tokio::test]
async fn register_invalid_event_schema() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let res = client
.register_event_schema(
Expand All @@ -38,24 +39,23 @@ async fn register_invalid_event_schema() {
}),
)
.await;
assert!(res.is_err(), "Expected an error, but got: {:?}", res);
assert!(res.is_err(), "Expected an error, but got: {res:?}");
}

#[tokio::test]
async fn list_all_subjects() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let res = client.list_subjects(None).await;
match res {
Ok(subjects) => {
let subjects = subjects.collect::<Vec<_>>().await;
assert!(
subjects.is_empty(),
"Expected no subjects, but got: {:?}",
subjects
"Expected no subjects, but got: {subjects:?}"
);
}
Err(err) => panic!("Failed to list subjects: {:?}", err),
Err(err) => panic!("Failed to list subjects: {err:?}"),
}
}

Expand All @@ -65,7 +65,7 @@ async fn list_all_subjects() {

#[tokio::test]
async fn list_all_event_types() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let test_event_type = "io.eventsourcingdb.test";
let schema = json!({
Expand All @@ -90,8 +90,7 @@ async fn list_all_event_types() {
let mut event_types = event_types.collect::<Vec<_>>().await;
assert!(
event_types.len() == 1,
"Expected one event types, but got: {:?}",
event_types
"Expected one event types, but got: {event_types:?}"
);
assert!(event_types[0].is_ok(), "Expected event type to be ok");
let response_event_type = event_types.pop().unwrap().unwrap();
Expand All @@ -113,7 +112,7 @@ async fn list_all_event_types() {
response_event_type.is_phantom
);
}
Err(err) => panic!("Failed to list event types: {:?}", err),
Err(err) => panic!("Failed to list event types: {err:?}"),
}
}

Expand Down
7 changes: 3 additions & 4 deletions tests/observe_events.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
mod utils;

use eventsourcingdb::container::Container;
use futures::stream::StreamExt;
use serde_json::json;
use utils::create_test_container;
use utils::create_test_eventcandidate;

#[tokio::test]
async fn observe_existing_events() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidate = create_test_eventcandidate("/test", json!({"value": 1}));
let written = client
Expand All @@ -30,7 +29,7 @@ async fn observe_existing_events() {

#[tokio::test]
async fn keep_observing_events() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();

let mut events_stream = client
Expand Down
32 changes: 15 additions & 17 deletions tests/read_events.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,32 @@
mod utils;

use eventsourcingdb::{
container::Container,
request_options::{
Ordering, ReadEventMissingStrategy, ReadEventsOptions, ReadFromLatestEventOptions,
},
use eventsourcingdb::request_options::{
Ordering, ReadEventMissingStrategy, ReadEventsOptions, ReadFromLatestEventOptions,
};
use futures::TryStreamExt;
use serde_json::json;
use utils::create_test_container;
use utils::{
assert_event_match_eventcandidate, create_numbered_eventcandidates, create_test_eventcandidate,
};

#[tokio::test]
async fn make_read_call() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let events_stream = client
.read_events("/", None)
.await
.expect("Failed to read events");
let events: Result<Vec<_>, _> = events_stream.try_collect().await;
assert!(events.is_ok(), "Failed to write events: {:?}", events);
assert!(events.is_ok(), "Failed to write events: {events:?}");
let events = events.expect("Failed to read events");
assert_eq!(events.len(), 0);
}

#[tokio::test]
async fn make_read_call_with_event() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidate = create_test_eventcandidate("/test", json!({"value": 1}));
let written = client
Expand All @@ -50,7 +48,7 @@ async fn make_read_call_with_event() {

#[tokio::test]
async fn make_read_call_with_multiple_events() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidates = create_numbered_eventcandidates(10);
let written = client
Expand All @@ -72,7 +70,7 @@ async fn make_read_call_with_multiple_events() {

#[tokio::test]
async fn read_from_exact_topic() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidate = create_test_eventcandidate("/test", json!({"value": 1}));
client
Expand Down Expand Up @@ -102,7 +100,7 @@ async fn read_from_exact_topic() {

#[tokio::test]
async fn read_recursive() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidate_parent = create_test_eventcandidate("/test", json!({"value": 1}));
let event_candidate_child = create_test_eventcandidate("/test/sub", json!({"value": 2}));
Expand Down Expand Up @@ -137,7 +135,7 @@ async fn read_recursive() {

#[tokio::test]
async fn read_not_recursive() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidate_parent = create_test_eventcandidate("/test", json!({"value": 1}));
let event_candidate_child = create_test_eventcandidate("/test/sub", json!({"value": 2}));
Expand Down Expand Up @@ -172,7 +170,7 @@ async fn read_not_recursive() {

#[tokio::test]
async fn read_chronological() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidates = create_numbered_eventcandidates(10);
let written = client
Expand Down Expand Up @@ -200,7 +198,7 @@ async fn read_chronological() {

#[tokio::test]
async fn read_antichronological() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidates = create_numbered_eventcandidates(10);
let written = client
Expand Down Expand Up @@ -230,7 +228,7 @@ async fn read_antichronological() {

#[tokio::test]
async fn read_everything_from_missing_latest_event() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidates = create_numbered_eventcandidates(10);
let written = client
Expand Down Expand Up @@ -262,7 +260,7 @@ async fn read_everything_from_missing_latest_event() {

#[tokio::test]
async fn read_nothing_from_missing_latest_event() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidates = create_numbered_eventcandidates(10);
client
Expand Down Expand Up @@ -294,7 +292,7 @@ async fn read_nothing_from_missing_latest_event() {

#[tokio::test]
async fn read_from_latest_event() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let event_candidates = create_numbered_eventcandidates(10);
client
Expand Down
7 changes: 4 additions & 3 deletions tests/run_eventql_query.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use eventsourcingdb::container::Container;
mod utils;
use futures::stream::TryStreamExt;
use utils::create_test_container;

#[tokio::test]
async fn run_empty_query() {
let container = Container::start_default().await.unwrap();
let container = create_test_container().await;
let client = container.get_client().await.unwrap();
let rows = client
.run_eventql_query("FROM e IN events ORDER BY e.time DESC TOP 100 PROJECT INTO e")
.await
.expect("Unable to run query");
let rows: Result<Vec<_>, _> = rows.try_collect().await;
assert!(rows.is_ok(), "Failed to run query: {:?}", rows);
assert!(rows.is_ok(), "Failed to run query: {rows:?}");
let rows = rows.expect("Failed to read rows");
assert_eq!(rows.len(), 0);
}
10 changes: 9 additions & 1 deletion tests/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
use chrono::{TimeDelta, Utc};
use eventsourcingdb::{Event, EventCandidate};
use eventsourcingdb::{Event, EventCandidate, container::Container};
use serde_json::{Value, json};

pub async fn create_test_container() -> Container {
Container::builder()
.with_image_tag("preview")
.start()
.await
.expect("Failed to start test container")
}

pub fn create_test_eventcandidate(
subject: impl ToString,
data: impl Into<Value>,
Expand Down
Loading