Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
441 changes: 430 additions & 11 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
doc-valid-idents = ["EventSourcingDB", "EventQL", "CloudEvents", ".."]
21 changes: 21 additions & 0 deletions examples/event_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use eventsourcingdb::{client::Client, container::Container};
use futures::StreamExt;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);

let result = client.list_event_types().await;

match result {
Err(err) => panic!("{}", err),
Ok(mut event_types) => {
while let Some(Ok(event_type)) = event_types.next().await {
println!("{:?}", event_type)
}
}
}
}
21 changes: 21 additions & 0 deletions examples/listing_subjects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use eventsourcingdb::{client::Client, container::Container};
use futures::StreamExt;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);

let result = client.list_subjects(Some("/")).await;

match result {
Err(err) => panic!("{}", err),
Ok(mut subjects) => {
while let Some(Ok(subject)) = subjects.next().await {
println!("{:?}", subject)
}
}
}
}
33 changes: 33 additions & 0 deletions examples/observing_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use eventsourcingdb::{
client::{Client, request_options::ObserveEventsOptions},
container::Container,
};
use futures::StreamExt;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);

let result = client
.observe_events(
"/books/42",
Some(ObserveEventsOptions {
recursive: false,
from_latest_event: None,
lower_bound: None,
}),
)
.await;

match result {
Err(err) => panic!("{}", err),
Ok(mut stream) => {
while let Some(Ok(event)) = stream.next().await {
println!("{:?}", event)
}
}
}
}
15 changes: 15 additions & 0 deletions examples/ping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use eventsourcingdb::{client::Client, container::Container};

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);

let result = client.ping().await;
if let Err(err) = result {
// handle error
panic!("{}", err)
}
}
35 changes: 35 additions & 0 deletions examples/reading_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use eventsourcingdb::{
client::{Client, request_options::ReadEventsOptions},
container::Container,
};
use futures::StreamExt;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);

let result = client
.read_events(
"/books/42",
Some(ReadEventsOptions {
recursive: false,
from_latest_event: None,
order: None,
lower_bound: None,
upper_bound: None,
}),
)
.await;

match result {
Err(err) => panic!("{}", err),
Ok(mut stream) => {
while let Some(Ok(event)) = stream.next().await {
println!("{:?}", event)
}
}
}
}
34 changes: 34 additions & 0 deletions examples/registering_event_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use eventsourcingdb::{client::Client, container::Container};
use serde_json::json;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);

let result = client
.register_event_schema(
"io.eventsourcingdb.library.book-acquired",
&json!({
"type": "object",
"properties": {
"title": { "type": "string" },
"author": { "type": "string" },
"isbn": { "type": "string" },
},
"required": [
"title",
"author",
"isbn",
],
"additionalProperties": false,
}),
)
.await;

if let Err(err) = result {
panic!("{}", err)
}
}
23 changes: 23 additions & 0 deletions examples/running_eventql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use eventsourcingdb::{client::Client, container::Container};
use futures::StreamExt;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);

let result = client
.run_eventql_query("FROM e IN events PROJECT INTO e")
.await;

match result {
Err(err) => panic!("{}", err),
Ok(mut stream) => {
while let Some(Ok(row)) = stream.next().await {
println!("{:?}", row)
}
}
}
}
15 changes: 15 additions & 0 deletions examples/verify_api_token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use eventsourcingdb::{client::Client, container::Container};

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);

let result = client.verify_api_token().await;
if let Err(err) = result {
// handle error
panic!("{}", err)
}
}
27 changes: 27 additions & 0 deletions examples/write_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use eventsourcingdb::{client::Client, container::Container, event::EventCandidate};
use serde_json::json;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);

let event = EventCandidate::builder()
.source("https://library.eventsourcingdb.io".to_string())
.subject("/books/42".to_string())
.ty("io.eventsourcingdb.library.book-acquired")
.data(json!({
"title": "2001 - A Space Odyssey",
"author": "Arthur C. Clarke",
"isbn": "978-0756906788",
}))
.build();

let result = client.write_events(vec![event.clone()], vec![]).await;
match result {
Ok(written_events) => println!("{:?}", written_events),
Err(err) => panic!("{}", err),
}
}
4 changes: 2 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl Client {
pub async fn read_events<'a>(
&self,
subject: &'a str,
options: Option<request_options::ReadEventsRequestOptions<'a>>,
options: Option<request_options::ReadEventsOptions<'a>>,
) -> Result<impl Stream<Item = Result<Event, ClientError>>, ClientError> {
let response = self
.request_streaming(ReadEventsRequest { subject, options })
Expand Down Expand Up @@ -249,7 +249,7 @@ impl Client {
pub async fn observe_events<'a>(
&self,
subject: &'a str,
options: Option<request_options::ObserveEventsRequestOptions<'a>>,
options: Option<request_options::ObserveEventsOptions<'a>>,
) -> Result<impl Stream<Item = Result<Event, ClientError>>, ClientError> {
let response = self
.request_streaming(ObserveEventsRequest { subject, options })
Expand Down
6 changes: 2 additions & 4 deletions src/client/client_request/observe_events.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use reqwest::Method;
use serde::Serialize;

use crate::{
client::request_options::ObserveEventsRequestOptions, error::ClientError, event::Event,
};
use crate::{client::request_options::ObserveEventsOptions, error::ClientError, event::Event};

use super::{ClientRequest, StreamingRequest};

#[derive(Debug, Clone, Serialize)]
pub struct ObserveEventsRequest<'a> {
pub subject: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
pub options: Option<ObserveEventsRequestOptions<'a>>,
pub options: Option<ObserveEventsOptions<'a>>,
}

impl ClientRequest for ObserveEventsRequest<'_> {
Expand Down
4 changes: 2 additions & 2 deletions src/client/client_request/read_events.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use reqwest::Method;
use serde::Serialize;

use crate::{client::request_options::ReadEventsRequestOptions, error::ClientError, event::Event};
use crate::{client::request_options::ReadEventsOptions, error::ClientError, event::Event};

use super::{ClientRequest, StreamingRequest};

#[derive(Debug, Clone, Serialize)]
pub struct ReadEventsRequest<'a> {
pub subject: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
pub options: Option<ReadEventsRequestOptions<'a>>,
pub options: Option<ReadEventsOptions<'a>>,
}

impl ClientRequest for ReadEventsRequest<'_> {
Expand Down
41 changes: 32 additions & 9 deletions src/client/request_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use serde::Serialize;
/// Options for reading events from the database
#[derive(Debug, Default, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadEventsRequestOptions<'a> {
pub struct ReadEventsOptions<'a> {
/// Start reading events from this start event
#[serde(skip_serializing_if = "Option::is_none")]
pub from_latest_event: Option<FromLatestEventOptions<'a>>,
pub from_latest_event: Option<ReadFromLatestEventOptions<'a>>,
/// Lower bound of events to read
#[serde(skip_serializing_if = "Option::is_none")]
pub lower_bound: Option<Bound<'a>>,
Expand All @@ -25,10 +25,10 @@ pub struct ReadEventsRequestOptions<'a> {
/// Options for observing events from the database
#[derive(Debug, Default, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ObserveEventsRequestOptions<'a> {
pub struct ObserveEventsOptions<'a> {
/// Start reading events from this start event
#[serde(skip_serializing_if = "Option::is_none")]
pub from_latest_event: Option<FromLatestEventOptions<'a>>,
pub from_latest_event: Option<ObserveFromLatestEventOptions<'a>>,
/// Lower bound of events to read
#[serde(skip_serializing_if = "Option::is_none")]
pub lower_bound: Option<Bound<'a>>,
Expand Down Expand Up @@ -67,25 +67,48 @@ pub struct Bound<'a> {
pub id: &'a str,
}

/// The strategy for handling missing events
/// The strategy for handling missing events while reading
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum EventMissingStrategy {
pub enum ReadEventMissingStrategy {
/// Read all events if the required one is missing
ReadEverything,
/// Read no events if the required one is missing
ReadNothing,
}

/// Options for reading events from the start reading at
/// The strategy for handling missing events while observing
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum ObserveEventMissingStrategy {
/// Observe all events if the required one is missing
ObserveEverything,
/// Wait for the event until observing
WaitForEvent,
}

/// Options for reading events from the latest event of certain type or subject
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FromLatestEventOptions<'a> {
pub struct ReadFromLatestEventOptions<'a> {
/// The strategy for handling missing events
pub if_event_is_missing: EventMissingStrategy,
pub if_event_is_missing: ReadEventMissingStrategy,
/// The subject the event should be on
pub subject: &'a str,
/// The type of the event to read from
#[serde(rename = "type")]
pub ty: &'a str,
}

/// Options for observe events from the latest event of certain type or subject
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ObserveFromLatestEventOptions<'a> {
/// The strategy for handling missing events
pub if_event_is_missing: ObserveEventMissingStrategy,
/// The subject the event should be on
pub subject: &'a str,
/// The type of the event to observe from
#[serde(rename = "type")]
pub ty: &'a str,
}
Loading