Skip to content

Commit 096e52a

Browse files
committed
Merge branch 'read-events' into observing-events
2 parents 92d3f38 + 587a2de commit 096e52a

File tree

14 files changed

+83
-102
lines changed

14 files changed

+83
-102
lines changed

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@ qa: analyze test
22

33
analyze:
44
@cargo clippy
5+
@cargo fmt --check
56

67
test:
78
@cargo test
89

9-
.PHONY: analyze qa test
10+
format:
11+
@cargo fmt
12+
13+
.PHONY: analyze format qa test

src/client.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use url::Url;
3838
pub struct Client {
3939
base_url: Url,
4040
api_token: String,
41-
client: reqwest::Client,
41+
reqwest: reqwest::Client,
4242
}
4343

4444
impl Client {
@@ -47,7 +47,7 @@ impl Client {
4747
Client {
4848
base_url,
4949
api_token: api_token.into(),
50-
client: reqwest::Client::new(),
50+
reqwest: reqwest::Client::new(),
5151
}
5252
}
5353

@@ -93,8 +93,8 @@ impl Client {
9393
.map_err(ClientError::URLParseError)?;
9494

9595
let request = match endpoint.method() {
96-
reqwest::Method::GET => self.client.get(url),
97-
reqwest::Method::POST => self.client.post(url),
96+
reqwest::Method::GET => self.reqwest.get(url),
97+
reqwest::Method::POST => self.reqwest.post(url),
9898
_ => return Err(ClientError::InvalidRequestMethod),
9999
}
100100
.header("Authorization", format!("Bearer {}", self.api_token));
@@ -376,7 +376,7 @@ impl Client {
376376
/// }
377377
/// # })
378378
/// ```
379-
///
379+
///
380380
/// # Errors
381381
/// This function will return an error if the request fails or if the URL is invalid.
382382
pub async fn list_event_types(

src/client/client_request.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,9 @@ pub trait StreamingRequest: ClientRequest {
7979
fn lines_stream(
8080
response: reqwest::Response,
8181
) -> impl Stream<Item = Result<String, ClientError>> {
82-
let bytes = response.bytes_stream().map_err(|err| {
83-
io::Error::new(
84-
io::ErrorKind::Other,
85-
format!("Failed to read response stream: {err}"),
86-
)
87-
});
82+
let bytes = response
83+
.bytes_stream()
84+
.map_err(|err| io::Error::other(format!("Failed to read response stream: {err}")));
8885
let stream_reader = StreamReader::new(bytes);
8986
LinesStream::new(BufReader::new(stream_reader).lines()).map_err(ClientError::from)
9087
}

src/client/client_request/list_subjects.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ pub struct ListSubjectsRequest<'a> {
1212
pub base_subject: &'a str,
1313
}
1414

15-
impl<'a> ClientRequest for ListSubjectsRequest<'a> {
15+
impl ClientRequest for ListSubjectsRequest<'_> {
1616
const URL_PATH: &'static str = "/api/v1/read-subjects";
1717
const METHOD: Method = Method::POST;
1818

1919
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
2020
Some(Ok(self))
2121
}
2222
}
23-
impl<'a> StreamingRequest for ListSubjectsRequest<'a> {
23+
impl StreamingRequest for ListSubjectsRequest<'_> {
2424
type ItemType = String;
2525

2626
fn build_stream(

src/client/client_request/read_events.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub struct ReadEventsRequest<'a> {
1313
pub options: Option<ReadEventsRequestOptions<'a>>,
1414
}
1515

16-
impl<'a> ClientRequest for ReadEventsRequest<'a> {
16+
impl ClientRequest for ReadEventsRequest<'_> {
1717
const URL_PATH: &'static str = "/api/v1/read-events";
1818
const METHOD: Method = Method::POST;
1919

@@ -22,7 +22,7 @@ impl<'a> ClientRequest for ReadEventsRequest<'a> {
2222
}
2323
}
2424

25-
impl<'a> StreamingRequest for ReadEventsRequest<'a> {
25+
impl StreamingRequest for ReadEventsRequest<'_> {
2626
type ItemType = Event;
2727

2828
fn build_stream(
@@ -32,14 +32,14 @@ impl<'a> StreamingRequest for ReadEventsRequest<'a> {
3232
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
3333
enum LineItem {
3434
Error { error: String },
35-
Event(Event),
35+
Event(Box<Event>),
3636
}
3737

3838
impl From<LineItem> for Result<Event, ClientError> {
3939
fn from(item: LineItem) -> Self {
4040
match item {
4141
LineItem::Error { error } => Err(ClientError::DBError(error)),
42-
LineItem::Event(event_type) => Ok(event_type),
42+
LineItem::Event(event_type) => Ok(*event_type),
4343
}
4444
}
4545
}

src/client/client_request/register_event_schema.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ impl<'a> RegisterEventSchemaRequest<'a> {
2323
}
2424
}
2525

26-
impl<'a> ClientRequest for RegisterEventSchemaRequest<'a> {
26+
impl ClientRequest for RegisterEventSchemaRequest<'_> {
2727
const URL_PATH: &'static str = "/api/v1/register-event-schema";
2828
const METHOD: Method = Method::POST;
2929

3030
fn body(&self) -> Option<Result<impl Serialize, ClientError>> {
3131
Some(Ok(self))
3232
}
3333
}
34-
impl<'a> OneShotRequest for RegisterEventSchemaRequest<'a> {
34+
impl OneShotRequest for RegisterEventSchemaRequest<'_> {
3535
type Response = ManagementEvent;
3636

3737
fn validate_response(&self, response: &Self::Response) -> Result<(), ClientError> {

src/container.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl ContainerBuilder {
114114
Ok(Container {
115115
internal_port: self.internal_port,
116116
api_token: self.api_token.clone(),
117-
container: GenericImage::new(self.image_name, self.image_tag)
117+
instance: GenericImage::new(self.image_name, self.image_tag)
118118
.with_exposed_port(self.internal_port)
119119
.with_wait_for(WaitFor::Http(Box::new(
120120
HttpWaitStrategy::new("/api/v1/ping")
@@ -137,10 +137,10 @@ impl ContainerBuilder {
137137
}
138138

139139
/// A running test container for the [EventSourcingDB](https://www.eventsourcingdb.io/).
140-
///
140+
///
141141
/// Aside from managing the container, this struct also provides methods to get the data needed to connect to
142142
/// the database or even a fully configured client.
143-
///
143+
///
144144
/// You'll most likely want to use the [`Container::start_default`] method to create a new container instance for your tests.
145145
/// For more details, see the [`crate::container`] module documentation.
146146
/// ```
@@ -152,7 +152,7 @@ impl ContainerBuilder {
152152
/// ```
153153
#[derive(Debug)]
154154
pub struct Container {
155-
container: ContainerAsync<GenericImage>,
155+
instance: ContainerAsync<GenericImage>,
156156
internal_port: ContainerPort,
157157
api_token: String,
158158
}
@@ -167,7 +167,7 @@ impl Container {
167167
}
168168

169169
/// Shortcut method to start the container with default settings.
170-
///
170+
///
171171
/// This is the same as calling [`Container::builder`] and then [`ContainerBuilder::start`].
172172
/// In most cases this will create a contaienr with the latest image tag and a working configuration.
173173
///
@@ -178,26 +178,23 @@ impl Container {
178178
}
179179

180180
/// Get the host of the container.
181-
///
181+
///
182182
/// This is the host that you can use to connect to the database. In most cases this will be `localhost`.
183183
///
184184
/// # Errors
185185
/// This function will return an error if the container is not running (e.g. because it crashed) or if the host could not be retrieved
186186
pub async fn get_host(&self) -> Result<Host, ContainerError> {
187-
Ok(self.container.get_host().await?)
187+
Ok(self.instance.get_host().await?)
188188
}
189189

190190
/// Get the mapped port for the database.
191-
///
191+
///
192192
/// This is the port that you can use to connect to the database. This will be a random port that is mapped to the internal port configured via [`ContainerBuilder::with_port`].
193193
///
194194
/// # Errors
195195
/// This function will return an error if the container is not running (e.g. because it crashed) or if the host could not be retrieved
196196
pub async fn get_mapped_port(&self) -> Result<u16, ContainerError> {
197-
Ok(self
198-
.container
199-
.get_host_port_ipv4(self.internal_port)
200-
.await?)
197+
Ok(self.instance.get_host_port_ipv4(self.internal_port).await?)
201198
}
202199

203200
/// Get the complete http base URL for the database.
@@ -227,7 +224,7 @@ impl Container {
227224
/// # Errors
228225
/// This function will return an error if the container could not be stopped.
229226
pub async fn stop(self) -> Result<(), ContainerError> {
230-
self.container.stop().await?;
227+
self.instance.stop().await?;
231228
Ok(())
232229
}
233230

src/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ pub use event_types::event_candidate::EventCandidate;
1010
pub use event_types::management_event::ManagementEvent;
1111
pub use trace_info::TraceInfo;
1212

13-
#[cfg(feature="cloudevents")]
13+
#[cfg(feature = "cloudevents")]
1414
use crate::error::EventError;

src/event/event_types/event.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ use chrono::{DateTime, Utc};
22
use serde::{Deserialize, Serialize};
33
use serde_json::Value;
44

5-
use crate::event::{trace_info::TraceInfo, EventCandidate};
6-
5+
use crate::event::{EventCandidate, trace_info::TraceInfo};
76

87
/// Represents an event that has been received from the DB.
98
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]

src/event/event_types/event_candidate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
use crate::event::trace_info::TraceInfo;
12
use serde::{Deserialize, Serialize};
23
use serde_json::Value;
34
use typed_builder::TypedBuilder;
4-
use crate::event::trace_info::TraceInfo;
55

66
#[cfg(feature = "cloudevents")]
77
use crate::error::EventError;

0 commit comments

Comments
 (0)