Skip to content

Commit a1d35d2

Browse files
fix(client): correct observe-events issues shown by tests (#18)
Signed-off-by: Raphael Höser <[email protected]> Co-authored-by: Kuro <[email protected]>
1 parent 02d1b96 commit a1d35d2

File tree

7 files changed

+92
-9
lines changed

7 files changed

+92
-9
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Based on the [compliance criteria](https://docs.eventsourcingdb.io/client-sdks/c
1010
- 🚀 [Essentials](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#essentials)
1111
- 🚀 [Writing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#writing-events)
1212
- 🚀 [Reading Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#reading-events)
13-
- [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
13+
- 🚀 [Using EventQL](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#using-eventql)
1414
- 🚀 [Observing Events](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#observing-events)
1515
- 🚀 [Metadata and Discovery](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#metadata-and-discovery)
1616
- 🚀 [Testcontainers Support](https://docs.eventsourcingdb.io/client-sdks/compliance-criteria/#testcontainers-support)

src/client.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl Client {
100100
reqwest::Method::POST => self.reqwest.post(url),
101101
_ => return Err(ClientError::InvalidRequestMethod),
102102
}
103-
.header("Authorization", format!("Bearer {}", self.api_token));
103+
.bearer_auth(&self.api_token);
104104
let request = if let Some(body) = endpoint.body() {
105105
request
106106
.header("Content-Type", "application/json")
@@ -224,9 +224,22 @@ impl Client {
224224
/// # let db_url = container.get_base_url().await.unwrap();
225225
/// # let api_token = container.get_api_token();
226226
/// let client = eventsourcingdb_client_rust::client::Client::new(db_url, api_token);
227-
/// let mut event_stream = client.observe_events("/", None).await.expect("Failed to observe events");
228-
/// while let Some(event) = event_stream.next().await {
229-
/// println!("Found Type {:?}", event.expect("Error while reading event"));
227+
/// # client.write_events(
228+
/// # vec![
229+
/// # EventCandidate::builder()
230+
/// # .source("https://www.eventsourcingdb.io".to_string())
231+
/// # .data(json!({"value": 1}))
232+
/// # .subject("/test".to_string())
233+
/// # .r#type("io.eventsourcingdb.test".to_string())
234+
/// # .build()
235+
/// # ],
236+
/// # vec![]
237+
/// # ).await.expect("Failed to write events");
238+
/// let mut event_stream = client.observe_events("/test", None).await.expect("Failed to observe events");
239+
/// match event_stream.next().await {
240+
/// Some(Ok(event)) => println!("Found Event {:?}", event),
241+
/// Some(Err(e)) => eprintln!("Error while reading event: {:?}", e),
242+
/// None => println!("No more events."),
230243
/// }
231244
/// # })
232245
/// ```

src/client/client_request.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub use ping::PingRequest;
1717
pub use read_events::ReadEventsRequest;
1818
pub use register_event_schema::RegisterEventSchemaRequest;
1919
pub use run_eventql_query::RunEventqlQueryRequest;
20+
use serde_json::Value;
2021
pub use verify_api_token::VerifyApiTokenRequest;
2122
pub use write_events::WriteEventsRequest;
2223

@@ -72,7 +73,7 @@ enum StreamLineItem<T> {
7273
Error { error: String },
7374
/// A heardbeat message was sent to keep the connection alive.
7475
/// This is only used when observing events, but it does not hurt to have it everywhere.
75-
Heartbeat,
76+
Heartbeat(Value),
7677
/// A successful response from the database
7778
/// Since the exact type of the payload is not known at this point, we use this as a fallback case.
7879
/// Every request item gets put in here and the type can be checked later on.
@@ -114,7 +115,7 @@ pub trait StreamingRequest: ClientRequest {
114115
Some(Err(ClientError::DBError(error)))
115116
}
116117
// A heartbeat message was sent, which we ignore.
117-
Ok(StreamLineItem::Heartbeat) => None,
118+
Ok(StreamLineItem::Heartbeat(_value)) => None,
118119
// A successful response was sent with the correct type.
119120
Ok(StreamLineItem::Ok { payload, ty }) if ty == Self::ITEM_TYPE_NAME => {
120121
Some(Ok(payload))

src/client/client_request/observe_events.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub struct ObserveEventsRequest<'a> {
1515
}
1616

1717
impl ClientRequest for ObserveEventsRequest<'_> {
18-
const URL_PATH: &'static str = "/api/v1/read-events";
18+
const URL_PATH: &'static str = "/api/v1/observe-events";
1919
const METHOD: Method = Method::POST;
2020

2121
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {

src/event/event_types/event_candidate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use typed_builder::TypedBuilder;
77
use crate::error::EventError;
88

99
/// Represents an event candidate that can be sent to the DB.
10-
/// This is a simplified version of the [`super::Event`] type.
10+
/// This is a simplified version of the [`super::event::Event`] type.
1111
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TypedBuilder)]
1212
pub struct EventCandidate {
1313
/// The data of the event, serialized as JSON

tests/observe_events.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
mod utils;
2+
3+
use eventsourcingdb_client_rust::container::Container;
4+
use futures::stream::StreamExt;
5+
use serde_json::json;
6+
use utils::create_test_eventcandidate;
7+
8+
#[tokio::test]
9+
async fn observe_existing_events() {
10+
let container = Container::start_default().await.unwrap();
11+
let client = container.get_client().await.unwrap();
12+
let event_candidate = create_test_eventcandidate("/test", json!({"value": 1}));
13+
let written = client
14+
.write_events(vec![event_candidate.clone()], vec![])
15+
.await
16+
.expect("Unable to write event");
17+
18+
let mut events_stream = client
19+
.observe_events("/test", None)
20+
.await
21+
.expect("Failed to request events");
22+
let events = events_stream
23+
.next()
24+
.await
25+
.expect("Failed to read events")
26+
.expect("Expected an event, but got an error");
27+
28+
assert_eq!(vec![events], written);
29+
}
30+
31+
#[tokio::test]
32+
async fn keep_observing_events() {
33+
let container = Container::start_default().await.unwrap();
34+
let client = container.get_client().await.unwrap();
35+
36+
let mut events_stream = client
37+
.observe_events("/test", None)
38+
.await
39+
.expect("Failed to observe events");
40+
let event_candidate = create_test_eventcandidate("/test", json!({"value": 1}));
41+
let written = client
42+
.write_events(vec![event_candidate.clone()], vec![])
43+
.await
44+
.expect("Unable to write event");
45+
46+
let event = events_stream
47+
.next()
48+
.await
49+
.expect("Failed to read events")
50+
.expect("Expected an event, but got an error");
51+
52+
assert_eq!(vec![event], written);
53+
}

tests/run_eventql_query.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
use eventsourcingdb_client_rust::container::Container;
2+
use futures::stream::TryStreamExt;
3+
4+
#[tokio::test]
5+
async fn run_empty_query() {
6+
let container = Container::start_default().await.unwrap();
7+
let client = container.get_client().await.unwrap();
8+
let rows = client
9+
.run_eventql_query("FROM e IN events ORDER BY e.time DESC TOP 100 PROJECT INTO e")
10+
.await
11+
.expect("Unable to run query");
12+
let rows: Result<Vec<_>, _> = rows.try_collect().await;
13+
assert!(rows.is_ok(), "Failed to run query: {:?}", rows);
14+
let rows = rows.expect("Failed to read rows");
15+
assert_eq!(rows.len(), 0);
16+
}

0 commit comments

Comments
 (0)