Skip to content

Commit 3bbeb7b

Browse files
committed
fix(Client): fix parsing the read stream
Signed-off-by: Raphael Höser <[email protected]>
1 parent 5c45884 commit 3bbeb7b

File tree

4 files changed

+366
-3
lines changed

4 files changed

+366
-3
lines changed

src/client/client_request/read_events.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use futures::{Stream, stream::StreamExt};
12
use reqwest::Method;
2-
use serde::Serialize;
3+
use serde::{Deserialize, Serialize};
34

45
use crate::{client::request_options::ReadEventsRequestOptions, error::ClientError, event::Event};
56

@@ -23,4 +24,30 @@ impl<'a> ClientRequest for ReadEventsRequest<'a> {
2324

2425
impl<'a> StreamingRequest for ReadEventsRequest<'a> {
2526
type ItemType = Event;
27+
28+
fn build_stream(
29+
response: reqwest::Response,
30+
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
31+
#[derive(Deserialize, Debug)]
32+
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
33+
enum LineItem {
34+
Error { error: String },
35+
Event(Event),
36+
}
37+
38+
impl From<LineItem> for Result<Event, ClientError> {
39+
fn from(item: LineItem) -> Self {
40+
match item {
41+
LineItem::Error { error } => Err(ClientError::DBError(error)),
42+
LineItem::Event(event_type) => Ok(event_type),
43+
}
44+
}
45+
}
46+
47+
Self::lines_stream(response).map(|line| {
48+
let line = line?;
49+
let item: LineItem = serde_json::from_str(line.as_str())?;
50+
item.into()
51+
})
52+
}
2653
}

src/client/request_options.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use serde::Serialize;
44

55
/// Options for reading events from the database
6-
#[derive(Debug, Clone, Serialize)]
6+
#[derive(Debug, Default, Clone, Serialize)]
77
#[serde(rename_all = "camelCase")]
88
pub struct ReadEventsRequestOptions<'a> {
99
/// Start reading events from this start event

tests/read_events.rs

Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
mod utils;
2+
3+
use eventsourcingdb_client_rust::{
4+
client::request_options::{
5+
EventMissingStrategy, FromLatestEventOptions, Ordering, ReadEventsRequestOptions,
6+
},
7+
container::Container,
8+
};
9+
use futures::TryStreamExt;
10+
use serde_json::json;
11+
use utils::{
12+
assert_event_match_eventcandidate, create_numbered_eventcandidates, create_test_eventcandidate,
13+
};
14+
15+
#[tokio::test]
16+
async fn make_read_call() {
17+
let container = Container::start_default().await.unwrap();
18+
let client = container.get_client().await.unwrap();
19+
let events_stream = client
20+
.read_events("/", None)
21+
.await
22+
.expect("Failed to read events");
23+
let events: Result<Vec<_>, _> = events_stream.try_collect().await;
24+
assert!(events.is_ok(), "Failed to write events: {:?}", events);
25+
let events = events.expect("Failed to read events");
26+
assert_eq!(events.len(), 0);
27+
}
28+
29+
#[tokio::test]
30+
async fn make_read_call_with_event() {
31+
let container = Container::start_default().await.unwrap();
32+
let client = container.get_client().await.unwrap();
33+
let event_candidate = create_test_eventcandidate("/test", json!({"value": 1}));
34+
let written = client
35+
.write_events(vec![event_candidate.clone()], vec![])
36+
.await
37+
.expect("Unable to write event");
38+
39+
let events_stream = client
40+
.read_events("/test", None)
41+
.await
42+
.expect("Failed to request events");
43+
let events: Vec<_> = events_stream
44+
.try_collect()
45+
.await
46+
.expect("Failed to read events");
47+
48+
assert_eq!(events, written);
49+
}
50+
51+
#[tokio::test]
52+
async fn make_read_call_with_multiple_events() {
53+
let container = Container::start_default().await.unwrap();
54+
let client = container.get_client().await.unwrap();
55+
let event_candidates = create_numbered_eventcandidates(10);
56+
let written = client
57+
.write_events(event_candidates.clone(), vec![])
58+
.await
59+
.expect("Failed to write events");
60+
61+
let events_stream = client
62+
.read_events("/test", None)
63+
.await
64+
.expect("Failed to request events");
65+
let events: Vec<_> = events_stream
66+
.try_collect()
67+
.await
68+
.expect("Failed to read events");
69+
70+
assert_eq!(events, written);
71+
}
72+
73+
#[tokio::test]
74+
async fn read_from_exact_topic() {
75+
let container = Container::start_default().await.unwrap();
76+
let client = container.get_client().await.unwrap();
77+
let event_candidate = create_test_eventcandidate("/test", json!({"value": 1}));
78+
client
79+
.write_events(vec![event_candidate.clone()], vec![])
80+
.await
81+
.expect("Unable to write event");
82+
client
83+
.write_events(
84+
vec![create_test_eventcandidate("/wrong", json!({"value": 1}))],
85+
vec![],
86+
)
87+
.await
88+
.expect("Unable to write event");
89+
90+
let events_stream = client
91+
.read_events("/test", None)
92+
.await
93+
.expect("Failed to request events");
94+
let events: Vec<_> = events_stream
95+
.try_collect()
96+
.await
97+
.expect("Failed to read events");
98+
99+
assert_eq!(events.len(), 1);
100+
assert_event_match_eventcandidate(&events[0], &event_candidate, None, None);
101+
}
102+
103+
#[tokio::test]
104+
async fn read_recursive() {
105+
let container = Container::start_default().await.unwrap();
106+
let client = container.get_client().await.unwrap();
107+
let event_candidate_parent = create_test_eventcandidate("/test", json!({"value": 1}));
108+
let event_candidate_child = create_test_eventcandidate("/test/sub", json!({"value": 2}));
109+
let written = client
110+
.write_events(
111+
vec![
112+
event_candidate_parent.clone(),
113+
event_candidate_child.clone(),
114+
],
115+
vec![],
116+
)
117+
.await
118+
.expect("Unable to write event");
119+
120+
let events_stream = client
121+
.read_events(
122+
"/test",
123+
Some(ReadEventsRequestOptions {
124+
recursive: true,
125+
..Default::default()
126+
}),
127+
)
128+
.await
129+
.expect("Failed to request events");
130+
let events: Vec<_> = events_stream
131+
.try_collect()
132+
.await
133+
.expect("Failed to read events");
134+
135+
assert_eq!(events, written);
136+
}
137+
138+
#[tokio::test]
139+
async fn read_not_recursive() {
140+
let container = Container::start_default().await.unwrap();
141+
let client = container.get_client().await.unwrap();
142+
let event_candidate_parent = create_test_eventcandidate("/test", json!({"value": 1}));
143+
let event_candidate_child = create_test_eventcandidate("/test/sub", json!({"value": 2}));
144+
client
145+
.write_events(
146+
vec![
147+
event_candidate_parent.clone(),
148+
event_candidate_child.clone(),
149+
],
150+
vec![],
151+
)
152+
.await
153+
.expect("Unable to write event");
154+
155+
let events_stream = client
156+
.read_events(
157+
"/test",
158+
Some(ReadEventsRequestOptions {
159+
recursive: false,
160+
..Default::default()
161+
}),
162+
)
163+
.await
164+
.expect("Failed to request events");
165+
let events: Vec<_> = events_stream
166+
.try_collect()
167+
.await
168+
.expect("Failed to read events");
169+
assert_eq!(events.len(), 1);
170+
assert_event_match_eventcandidate(&events[0], &event_candidate_parent, None, None);
171+
}
172+
173+
#[tokio::test]
174+
async fn read_chronological() {
175+
let container = Container::start_default().await.unwrap();
176+
let client = container.get_client().await.unwrap();
177+
let event_candidates = create_numbered_eventcandidates(10);
178+
let written = client
179+
.write_events(event_candidates.clone(), vec![])
180+
.await
181+
.expect("Failed to write events");
182+
183+
let events_stream = client
184+
.read_events(
185+
"/test",
186+
Some(ReadEventsRequestOptions {
187+
order: Some(Ordering::Chronological),
188+
..Default::default()
189+
}),
190+
)
191+
.await
192+
.expect("Failed to request events");
193+
let events: Vec<_> = events_stream
194+
.try_collect()
195+
.await
196+
.expect("Failed to read events");
197+
198+
assert_eq!(events, written);
199+
}
200+
201+
#[tokio::test]
202+
async fn read_antichronological() {
203+
let container = Container::start_default().await.unwrap();
204+
let client = container.get_client().await.unwrap();
205+
let event_candidates = create_numbered_eventcandidates(10);
206+
let written = client
207+
.write_events(event_candidates.clone(), vec![])
208+
.await
209+
.expect("Failed to write events");
210+
211+
let events_stream = client
212+
.read_events(
213+
"/test",
214+
Some(ReadEventsRequestOptions {
215+
order: Some(Ordering::Antichronological),
216+
..Default::default()
217+
}),
218+
)
219+
.await
220+
.expect("Failed to request events");
221+
let events: Vec<_> = events_stream
222+
.try_collect()
223+
.await
224+
.expect("Failed to read events");
225+
226+
// Reverse the reversed results from DB should result in the original order
227+
let reversed_events: Vec<_> = events.iter().rev().cloned().collect();
228+
assert_eq!(reversed_events, written);
229+
}
230+
231+
#[tokio::test]
232+
async fn read_everything_from_missing_latest_event() {
233+
let container = Container::start_default().await.unwrap();
234+
let client = container.get_client().await.unwrap();
235+
let event_candidates = create_numbered_eventcandidates(10);
236+
let written = client
237+
.write_events(event_candidates.clone(), vec![])
238+
.await
239+
.expect("Failed to write events");
240+
241+
let events_stream = client
242+
.read_events(
243+
"/test",
244+
Some(ReadEventsRequestOptions {
245+
from_latest_event: Some(FromLatestEventOptions {
246+
subject: "/",
247+
ty: "io.eventsourcingdb.test.does-not-exist",
248+
if_event_is_missing: EventMissingStrategy::ReadEverything,
249+
}),
250+
..Default::default()
251+
}),
252+
)
253+
.await
254+
.expect("Failed to request events");
255+
let events: Vec<_> = events_stream
256+
.try_collect()
257+
.await
258+
.expect("Failed to read events");
259+
260+
assert_eq!(events, written);
261+
}
262+
263+
#[tokio::test]
264+
async fn read_nothing_from_missing_latest_event() {
265+
let container = Container::start_default().await.unwrap();
266+
let client = container.get_client().await.unwrap();
267+
let event_candidates = create_numbered_eventcandidates(10);
268+
client
269+
.write_events(event_candidates.clone(), vec![])
270+
.await
271+
.expect("Failed to write events");
272+
273+
let events_stream = client
274+
.read_events(
275+
"/test",
276+
Some(ReadEventsRequestOptions {
277+
from_latest_event: Some(FromLatestEventOptions {
278+
subject: "/",
279+
ty: "io.eventsourcingdb.test.does-not-exist",
280+
if_event_is_missing: EventMissingStrategy::ReadNothing,
281+
}),
282+
..Default::default()
283+
}),
284+
)
285+
.await
286+
.expect("Failed to request events");
287+
let events: Vec<_> = events_stream
288+
.try_collect()
289+
.await
290+
.expect("Failed to read events");
291+
292+
assert_eq!(events.len(), 0);
293+
}
294+
295+
#[tokio::test]
296+
async fn read_from_latest_event() {
297+
let container = Container::start_default().await.unwrap();
298+
let client = container.get_client().await.unwrap();
299+
let event_candidates = create_numbered_eventcandidates(10);
300+
client
301+
.write_events(event_candidates.clone(), vec![])
302+
.await
303+
.expect("Failed to write events");
304+
client
305+
.write_events(
306+
vec![create_test_eventcandidate("/marker", json!({"value": 1}))],
307+
vec![],
308+
)
309+
.await
310+
.expect("Failed to write events");
311+
let written = client
312+
.write_events(event_candidates.clone(), vec![])
313+
.await
314+
.expect("Failed to write events");
315+
316+
let events_stream = client
317+
.read_events(
318+
"/test",
319+
Some(ReadEventsRequestOptions {
320+
from_latest_event: Some(FromLatestEventOptions {
321+
subject: "/marker",
322+
ty: "io.eventsourcingdb.test",
323+
if_event_is_missing: EventMissingStrategy::ReadNothing,
324+
}),
325+
..Default::default()
326+
}),
327+
)
328+
.await
329+
.expect("Failed to request events");
330+
let events: Vec<_> = events_stream
331+
.try_collect()
332+
.await
333+
.expect("Failed to read events");
334+
335+
assert_eq!(events, written);
336+
}

tests/utils/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub fn assert_event_match_eventcandidate(
5757
event.predecessorhash(),
5858
previous_event_hash
5959
.unwrap_or("0000000000000000000000000000000000000000000000000000000000000000"),
60-
"Time should be present"
60+
"Previous hash should be present"
6161
);
6262
assert_eq!(event.specversion(), "1.0", "Spec version should be 1.0");
6363
assert!(

0 commit comments

Comments
 (0)