Skip to content

Commit 38469b2

Browse files
markmandeljcrossley3
authored andcommitted
Batch Event implementation for reqwest bindings
Added `events(Vec<Event>)` to `RequestBuilderExt` to provide a batched set of Events to send to an HTTP endpoint, and `into_events() -> Result<Vec<Event>>` to ResponseExt to parse a batched Event response. I deliberately kept things simple, as I thought this would be a good place to start with Batch support throughout the SDK, and the implementation was simple enough, that there didn't seem to be much opportunity for reusable libraries across the SDK. That could be changed as more Batch support is provided across the SDK, and opportunities for code reuse present themselves. Signed-off-by: Mark Mandel <[email protected]>
1 parent 20fd82a commit 38469b2

File tree

3 files changed

+96
-3
lines changed

3 files changed

+96
-3
lines changed

src/binding/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub(crate) mod kafka {
5252
}
5353

5454
pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
55+
pub(crate) static CLOUDEVENTS_BATCH_JSON_HEADER: &str = "application/cloudevents-batch+json";
5556
pub(crate) static CONTENT_TYPE: &str = "content-type";
5657

5758
fn header_prefix(prefix: &str, name: &str) -> String {

src/binding/reqwest/client_request.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use reqwest_lib as reqwest;
22

33
use crate::binding::{
44
http::{header_prefix, SPEC_VERSION_HEADER},
5-
CLOUDEVENTS_JSON_HEADER,
5+
CLOUDEVENTS_BATCH_JSON_HEADER, CLOUDEVENTS_JSON_HEADER,
66
};
77
use crate::event::SpecVersion;
88
use crate::message::{
@@ -72,18 +72,35 @@ pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result
7272
BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder))
7373
}
7474

75+
/// Method to fill a [`RequestBuilder`] with a batched [`Vec<Event>`].
76+
pub fn events_to_request(
77+
events: Vec<Event>,
78+
request_builder: RequestBuilder,
79+
) -> Result<RequestBuilder> {
80+
let bytes = serde_json::to_vec(&events)?;
81+
Ok(request_builder
82+
.header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_BATCH_JSON_HEADER)
83+
.body(bytes))
84+
}
85+
7586
/// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`].
7687
///
7788
/// This trait is sealed and cannot be implemented for types outside of this crate.
7889
pub trait RequestBuilderExt: private::Sealed {
7990
/// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`].
8091
fn event(self, event: Event) -> Result<RequestBuilder>;
92+
/// Write in this [`RequestBuilder`] the provided batched [`Vec<Event>`].
93+
fn events(self, events: Vec<Event>) -> Result<RequestBuilder>;
8194
}
8295

8396
impl RequestBuilderExt for RequestBuilder {
8497
fn event(self, event: Event) -> Result<RequestBuilder> {
8598
event_to_request(event, self)
8699
}
100+
101+
fn events(self, events: Vec<Event>) -> Result<RequestBuilder> {
102+
events_to_request(events, self)
103+
}
87104
}
88105

89106
// Sealing the RequestBuilderExt
@@ -183,4 +200,25 @@ mod tests {
183200

184201
m.assert();
185202
}
203+
204+
#[tokio::test]
205+
async fn test_batched_request() {
206+
let input = vec![fixtures::v10::full_json_data_string_extension()];
207+
let url = mockito::server_url();
208+
let m = mock("POST", "/")
209+
.match_header("content-type", "application/cloudevents-batch+json")
210+
.match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
211+
.create();
212+
213+
let client = reqwest::Client::new();
214+
client
215+
.post(&url)
216+
.events(input)
217+
.unwrap()
218+
.send()
219+
.await
220+
.unwrap();
221+
222+
m.assert();
223+
}
186224
}

src/binding/reqwest/client_response.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use reqwest_lib as reqwest;
22

3-
use crate::binding::http;
3+
use crate::binding;
44
use crate::message::{Error, Result};
55
use crate::Event;
66
use async_trait::async_trait;
7+
use http::header;
78
use reqwest::Response;
89

910
/// Method to transform an incoming [`Response`] to [`Event`].
@@ -12,7 +13,26 @@ pub async fn response_to_event(res: Response) -> Result<Event> {
1213
let b = res.bytes().await.map_err(|e| Error::Other {
1314
source: Box::new(e),
1415
})?;
15-
http::to_event(&h, b.to_vec())
16+
binding::http::to_event(&h, b.to_vec())
17+
}
18+
19+
/// Method to transform an incoming [`Response`] to a batched [`Vec<Event>`]
20+
pub async fn response_to_events(res: Response) -> Result<Vec<Event>> {
21+
if res
22+
.headers()
23+
.get(header::CONTENT_TYPE)
24+
.and_then(|v| v.to_str().ok())
25+
.filter(|&v| v.starts_with(binding::CLOUDEVENTS_BATCH_JSON_HEADER))
26+
.is_none()
27+
{
28+
return Err(Error::WrongEncoding {});
29+
}
30+
31+
let bytes = res.bytes().await.map_err(|e| Error::Other {
32+
source: Box::new(e),
33+
})?;
34+
35+
Ok(serde_json::from_slice(&bytes)?)
1636
}
1737

1838
/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`].
@@ -22,13 +42,19 @@ pub async fn response_to_event(res: Response) -> Result<Event> {
2242
pub trait ResponseExt: private::Sealed {
2343
/// Convert this [`Response`] to [`Event`].
2444
async fn into_event(self) -> Result<Event>;
45+
/// Convert this [`Response`] to a batched [`Vec<Event>`].
46+
async fn into_events(self) -> Result<Vec<Event>>;
2547
}
2648

2749
#[async_trait(?Send)]
2850
impl ResponseExt for Response {
2951
async fn into_event(self) -> Result<Event> {
3052
response_to_event(self).await
3153
}
54+
55+
async fn into_events(self) -> Result<Vec<Event>> {
56+
response_to_events(self).await
57+
}
3258
}
3359

3460
// Sealing the ResponseExt
@@ -44,6 +70,7 @@ mod tests {
4470
use super::*;
4571
use mockito::mock;
4672
use reqwest_lib as reqwest;
73+
use std::vec;
4774

4875
use crate::test::fixtures;
4976

@@ -133,4 +160,31 @@ mod tests {
133160

134161
assert_eq!(expected, res);
135162
}
163+
164+
#[tokio::test]
165+
async fn test_batched_response() {
166+
let expected = vec![fixtures::v10::full_json_data_string_extension()];
167+
168+
let url = mockito::server_url();
169+
let _m = mock("GET", "/")
170+
.with_status(200)
171+
.with_header(
172+
"content-type",
173+
"application/cloudevents-batch+json; charset=utf-8",
174+
)
175+
.with_body(serde_json::to_string(&expected).unwrap())
176+
.create();
177+
178+
let client = reqwest::Client::new();
179+
let res = client
180+
.get(&url)
181+
.send()
182+
.await
183+
.unwrap()
184+
.into_events()
185+
.await
186+
.unwrap();
187+
188+
assert_eq!(expected, res);
189+
}
136190
}

0 commit comments

Comments
 (0)