Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,20 @@ match result {
Err(err) => // ...
}
```

### Listing a Specific Event Types

To list a specific event type, call the `read_event_type` function. The function returns the detailed event type, which includes the schema:

```rust
let event_type_name = "io.eventsourcingdb.library.book-acquired";
let result := client.read_event_type(event_type_name).await;
match result {
Ok(event_type) => // ...
Err(err) => // ...
}
```

### Using Testcontainers

Call the `Container::start_default()` function, get a client, and run your test code:
Expand Down
4 changes: 2 additions & 2 deletions examples/event_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::StreamExt;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let db = Container::start_preview().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);
Expand All @@ -14,7 +14,7 @@ async fn main() {
Err(err) => panic!("{}", err),
Ok(mut event_types) => {
while let Some(Ok(event_type)) = event_types.next().await {
println!("{:?}", event_type)
println!("{event_type:?}")
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions examples/listing_subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::StreamExt;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let db = Container::start_preview().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);
Expand All @@ -14,7 +14,7 @@ async fn main() {
Err(err) => panic!("{}", err),
Ok(mut subjects) => {
while let Some(Ok(subject)) = subjects.next().await {
println!("{:?}", subject)
println!("{subject:?}")
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions examples/observing_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::StreamExt;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let db = Container::start_preview().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);
Expand All @@ -26,7 +26,7 @@ async fn main() {
Err(err) => panic!("{}", err),
Ok(mut stream) => {
while let Some(Ok(event)) = stream.next().await {
println!("{:?}", event)
println!("{event:?}")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use eventsourcingdb::{client::Client, container::Container};

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let db = Container::start_preview().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);
Expand Down
4 changes: 2 additions & 2 deletions examples/reading_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::StreamExt;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let db = Container::start_preview().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);
Expand All @@ -28,7 +28,7 @@ async fn main() {
Err(err) => panic!("{}", err),
Ok(mut stream) => {
while let Some(Ok(event)) = stream.next().await {
println!("{:?}", event)
println!("{event:?}")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/registering_event_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde_json::json;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let db = Container::start_preview().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);
Expand Down
4 changes: 2 additions & 2 deletions examples/running_eventql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::StreamExt;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let db = Container::start_preview().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);
Expand All @@ -16,7 +16,7 @@ async fn main() {
Err(err) => panic!("{}", err),
Ok(mut stream) => {
while let Some(Ok(row)) = stream.next().await {
println!("{:?}", row)
println!("{row:?}")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/verify_api_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use eventsourcingdb::{client::Client, container::Container};

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let db = Container::start_preview().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);
Expand Down
4 changes: 2 additions & 2 deletions examples/write_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde_json::json;

#[tokio::main]
async fn main() {
let db = Container::start_default().await.unwrap();
let db = Container::start_preview().await.unwrap();
let base_url = db.get_base_url().await.unwrap();
let api_token = db.get_api_token();
let client = Client::new(base_url, api_token);
Expand All @@ -21,7 +21,7 @@ async fn main() {

let result = client.write_events(vec![event.clone()], vec![]).await;
match result {
Ok(written_events) => println!("{:?}", written_events),
Ok(written_events) => println!("{written_events:?}"),
Err(err) => panic!("{}", err),
}
}
68 changes: 56 additions & 12 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! To use the client, create it with the base URL and API token of your [EventsourcingDB](https://www.eventsourcingdb.io/) instance.
//! ```
//! # tokio_test::block_on(async {
//! # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
//! # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
//! let db_url = "http://localhost:3000/";
//! let api_token = "secrettoken";
//! # let db_url = container.get_base_url().await.unwrap();
Expand All @@ -22,14 +22,15 @@ mod precondition;
pub mod request_options;

use crate::{
client::client_request::ReadEventTypeRequest,
error::ClientError,
event::{Event, EventCandidate, ManagementEvent},
request_options::EventType,
};
use client_request::{
ClientRequest, ListEventTypesRequest, ListSubjectsRequest, ObserveEventsRequest,
OneShotRequest, PingRequest, ReadEventsRequest, RegisterEventSchemaRequest,
RunEventqlQueryRequest, StreamingRequest, VerifyApiTokenRequest, WriteEventsRequest,
list_event_types::EventType,
};
use futures::Stream;
pub use precondition::Precondition;
Expand Down Expand Up @@ -161,7 +162,7 @@ impl Client {
///
/// ```
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
Expand All @@ -185,7 +186,7 @@ impl Client {
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
Expand All @@ -211,14 +212,57 @@ impl Client {
Ok(response)
}

/// Reads a specific event type from the DB instance.
///
/// ```
/// use eventsourcingdb::event::EventCandidate;
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
/// # let api_token = container.get_api_token();
/// let client = eventsourcingdb::client::Client::new(db_url, api_token);
/// let event_type = "io.eventsourcingdb.test";
/// let schema = json!({
/// "type": "object",
/// "properties": {
/// "id": {
/// "type": "string"
/// },
/// "name": {
/// "type": "string"
/// }
/// },
/// "required": ["id", "name"]
/// });
/// client.register_event_schema(event_type, &schema).await.expect("Failed to register event types");
/// let type_info = client.read_event_type(event_type).await.expect("Failed to read event type");
/// println!("Found Type {} with schema {:?}", type_info.name, type_info.schema);
/// # })
/// ```
///
/// # Errors
/// This function will return an error if the request fails or if the URL is invalid.
pub async fn read_event_type(&self, event_type: &str) -> Result<EventType, ClientError> {
let response = self
.request_oneshot(ReadEventTypeRequest {
event_type: event_type.to_string(),
})
.await?;
Ok(response)
}

/// Observe events from the DB instance.
///
/// ```
/// use eventsourcingdb::event::EventCandidate;
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
Expand Down Expand Up @@ -261,7 +305,7 @@ impl Client {
///
/// ```
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
Expand All @@ -285,7 +329,7 @@ impl Client {
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
Expand Down Expand Up @@ -327,7 +371,7 @@ impl Client {
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
Expand All @@ -346,7 +390,7 @@ impl Client {
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
Expand Down Expand Up @@ -380,7 +424,7 @@ impl Client {
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
Expand Down Expand Up @@ -408,7 +452,7 @@ impl Client {
/// use eventsourcingdb::event::EventCandidate;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
Expand Down Expand Up @@ -447,7 +491,7 @@ impl Client {
/// use futures::StreamExt;
/// # use serde_json::json;
/// # tokio_test::block_on(async {
/// # let container = eventsourcingdb::container::Container::start_default().await.unwrap();
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
/// let db_url = "http://localhost:3000/";
/// let api_token = "secrettoken";
/// # let db_url = container.get_base_url().await.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions src/client/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod list_event_types;
mod list_subjects;
mod observe_events;
mod ping;
mod read_event_type;
mod read_events;
mod register_event_schema;
mod run_eventql_query;
Expand All @@ -14,6 +15,7 @@ pub use list_event_types::ListEventTypesRequest;
pub use list_subjects::ListSubjectsRequest;
pub use observe_events::ObserveEventsRequest;
pub use ping::PingRequest;
pub use read_event_type::ReadEventTypeRequest;
pub use read_events::ReadEventsRequest;
pub use register_event_schema::RegisterEventSchemaRequest;
pub use run_eventql_query::RunEventqlQueryRequest;
Expand Down
16 changes: 3 additions & 13 deletions src/client/client_request/list_event_types.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
use super::{ClientRequest, StreamingRequest};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::error::ClientError;
use serde::Serialize;

use super::{ClientRequest, StreamingRequest};
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct EventType {
#[serde(rename = "eventType")]
pub name: String,
pub is_phantom: bool,
pub schema: Option<Value>,
}
use crate::{client::request_options::EventType, error::ClientError};

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down
28 changes: 28 additions & 0 deletions src/client/client_request/read_event_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use reqwest::Method;
use serde::Serialize;

use crate::{
client::client_request::{ClientRequest, OneShotRequest},
error::ClientError,
request_options::EventType,
};

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadEventTypeRequest {
#[serde(rename = "eventType")]
/// The name of the event type to read
pub event_type: String,
}

impl ClientRequest for ReadEventTypeRequest {
const URL_PATH: &'static str = "/api/v1/read-event-type";
const METHOD: Method = Method::POST;

fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
Some(Ok(self))
}
}
impl OneShotRequest for ReadEventTypeRequest {
type Response = EventType;
}
Loading