Skip to content

Commit 0756a68

Browse files
authored
Merge pull request cloudevents#33 from slinkydeveloper/fetch_implementation
Reqwest integration
2 parents 9bf10fd + d46f11c commit 0756a68

File tree

19 files changed

+1175
-198
lines changed

19 files changed

+1175
-198
lines changed

Cargo.lock

Lines changed: 538 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,6 @@ name = "cloudevents"
3333
[workspace]
3434
members = [
3535
".",
36-
"cloudevents-sdk-actix-web"
36+
"cloudevents-sdk-actix-web",
37+
"cloudevents-sdk-reqwest"
3738
]

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Work in progress SDK for [CloudEvents](https://github.com/cloudevents/spec)
2020

2121
* `cloudevents-sdk`: Provides Event data structure, JSON Event format implementation
2222
* `cloudevents-sdk-actix-web`: Integration with [Actix Web](https://github.com/actix/actix-web)
23+
* `cloudevents-sdk-reqwest`: Integration with [reqwest](https://github.com/seanmonstar/reqwest)
2324

2425
## Development & Contributing
2526

cloudevents-sdk-actix-web/src/server_request.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use actix_web::web::{Bytes, BytesMut};
44
use actix_web::{web, HttpMessage, HttpRequest};
55
use cloudevents::event::SpecVersion;
66
use cloudevents::message::{
7-
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
8-
MessageDeserializer, StructuredDeserializer, StructuredSerializer,
7+
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
8+
Result, StructuredDeserializer, StructuredSerializer,
99
};
1010
use cloudevents::{message, Event};
1111
use futures::StreamExt;
@@ -24,10 +24,7 @@ impl HttpRequestDeserializer<'_> {
2424
}
2525

2626
impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
27-
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(
28-
self,
29-
mut visitor: V,
30-
) -> Result<R, Error> {
27+
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
3128
if self.encoding() != Encoding::BINARY {
3229
return Err(message::Error::WrongEncoding {});
3330
}
@@ -36,7 +33,7 @@ impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
3633
unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?,
3734
)?;
3835

39-
visitor.set_spec_version(spec_version.clone())?;
36+
visitor = visitor.set_spec_version(spec_version.clone())?;
4037

4138
let attributes = cloudevents::event::spec_version::ATTRIBUTE_NAMES
4239
.get(&spec_version)
@@ -50,20 +47,20 @@ impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
5047
let name = &hn.as_str()["ce-".len()..];
5148

5249
if attributes.contains(&name) {
53-
visitor.set_attribute(
50+
visitor = visitor.set_attribute(
5451
name,
5552
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
5653
)?
5754
} else {
58-
visitor.set_extension(
55+
visitor = visitor.set_extension(
5956
name,
6057
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
6158
)?
6259
}
6360
}
6461

6562
if let Some(hv) = self.req.headers().get("content-type") {
66-
visitor.set_attribute(
63+
visitor = visitor.set_attribute(
6764
"datacontenttype",
6865
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
6966
)?
@@ -78,10 +75,7 @@ impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
7875
}
7976

8077
impl<'a> StructuredDeserializer for HttpRequestDeserializer<'a> {
81-
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(
82-
self,
83-
visitor: V,
84-
) -> Result<R, Error> {
78+
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
8579
if self.encoding() != Encoding::STRUCTURED {
8680
return Err(message::Error::WrongEncoding {});
8781
}
@@ -110,7 +104,7 @@ impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> {
110104
pub async fn request_to_event(
111105
req: &HttpRequest,
112106
mut payload: web::Payload,
113-
) -> Result<Event, actix_web::error::Error> {
107+
) -> std::result::Result<Event, actix_web::error::Error> {
114108
let mut bytes = BytesMut::new();
115109
while let Some(item) = payload.next().await {
116110
bytes.extend_from_slice(&item?);

cloudevents-sdk-actix-web/src/server_response.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ use actix_web::http::{HeaderName, HeaderValue};
44
use actix_web::HttpResponse;
55
use cloudevents::event::SpecVersion;
66
use cloudevents::message::{
7-
BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, SerializationResult,
8-
StructuredSerializer,
7+
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
98
};
109
use cloudevents::Event;
1110
use std::str::FromStr;
@@ -21,41 +20,41 @@ impl HttpResponseSerializer {
2120
}
2221

2322
impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
24-
fn set_spec_version(&mut self, spec_version: SpecVersion) -> SerializationResult {
23+
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
2524
self.builder.set_header(
2625
headers::SPEC_VERSION_HEADER.clone(),
2726
str_to_header_value!(spec_version.as_str())?,
2827
);
29-
SerializationResult::Ok(())
28+
Ok(self)
3029
}
3130

32-
fn set_attribute(&mut self, name: &str, value: MessageAttributeValue) -> SerializationResult {
31+
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
3332
self.builder.set_header(
3433
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
3534
str_to_header_value!(value.to_string().as_str())?,
3635
);
37-
SerializationResult::Ok(())
36+
Ok(self)
3837
}
3938

40-
fn set_extension(&mut self, name: &str, value: MessageAttributeValue) -> SerializationResult {
39+
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
4140
self.builder.set_header(
4241
attribute_name_to_header!(name)?,
4342
str_to_header_value!(value.to_string().as_str())?,
4443
);
45-
SerializationResult::Ok(())
44+
Ok(self)
4645
}
4746

48-
fn end_with_data(mut self, bytes: Vec<u8>) -> Result<HttpResponse, Error> {
47+
fn end_with_data(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
4948
Ok(self.builder.body(bytes))
5049
}
5150

52-
fn end(mut self) -> Result<HttpResponse, Error> {
51+
fn end(mut self) -> Result<HttpResponse> {
5352
Ok(self.builder.finish())
5453
}
5554
}
5655

5756
impl StructuredSerializer<HttpResponse> for HttpResponseSerializer {
58-
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<HttpResponse, Error> {
57+
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
5958
Ok(self
6059
.builder
6160
.set_header(
@@ -70,7 +69,7 @@ impl StructuredSerializer<HttpResponse> for HttpResponseSerializer {
7069
pub async fn event_to_response(
7170
event: Event,
7271
response: HttpResponseBuilder,
73-
) -> Result<HttpResponse, actix_web::error::Error> {
72+
) -> std::result::Result<HttpResponse, actix_web::error::Error> {
7473
BinaryDeserializer::deserialize_binary(event, HttpResponseSerializer::new(response))
7574
.map_err(actix_web::error::ErrorBadRequest)
7675
}

cloudevents-sdk-reqwest/Cargo.toml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[package]
2+
name = "cloudevents-sdk-reqwest"
3+
version = "0.1.0"
4+
authors = ["Francesco Guardiani <[email protected]>"]
5+
edition = "2018"
6+
7+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
8+
9+
[dependencies]
10+
cloudevents-sdk = { path = ".." }
11+
reqwest = { version = "0.10" }
12+
lazy_static = "1.4.0"
13+
bytes = "^0.5"
14+
serde_json = "^1.0"
15+
url = { version = "^2.1", features = ["serde"] }
16+
17+
[dev-dependencies]
18+
mockito = "0.25.1"
19+
tokio = { version = "^0.2", features = ["full"] }
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
use super::headers;
2+
use cloudevents::event::SpecVersion;
3+
use cloudevents::message::{
4+
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
5+
};
6+
use cloudevents::Event;
7+
use reqwest::RequestBuilder;
8+
use std::str::FromStr;
9+
10+
/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits
11+
pub struct RequestSerializer {
12+
req: RequestBuilder,
13+
}
14+
15+
impl RequestSerializer {
16+
pub fn new(req: RequestBuilder) -> RequestSerializer {
17+
RequestSerializer { req }
18+
}
19+
}
20+
21+
impl BinarySerializer<RequestBuilder> for RequestSerializer {
22+
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
23+
self.req = self
24+
.req
25+
.header(headers::SPEC_VERSION_HEADER.clone(), spec_version.as_str());
26+
Ok(self)
27+
}
28+
29+
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
30+
self.req = self.req.header(
31+
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
32+
value.to_string(),
33+
);
34+
Ok(self)
35+
}
36+
37+
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
38+
self.req = self
39+
.req
40+
.header(attribute_name_to_header!(name)?, value.to_string());
41+
Ok(self)
42+
}
43+
44+
fn end_with_data(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
45+
Ok(self.req.body(bytes))
46+
}
47+
48+
fn end(self) -> Result<RequestBuilder> {
49+
Ok(self.req)
50+
}
51+
}
52+
53+
impl StructuredSerializer<RequestBuilder> for RequestSerializer {
54+
fn set_structured_event(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
55+
Ok(self
56+
.req
57+
.header(
58+
reqwest::header::CONTENT_TYPE,
59+
headers::CLOUDEVENTS_JSON_HEADER.clone(),
60+
)
61+
.body(bytes))
62+
}
63+
}
64+
65+
/// Method to transform an incoming [`HttpRequest`] to [`Event`]
66+
pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result<RequestBuilder> {
67+
BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder))
68+
}
69+
70+
#[cfg(test)]
71+
mod tests {
72+
use super::*;
73+
use mockito::{mock, Matcher};
74+
75+
use cloudevents::message::StructuredDeserializer;
76+
use cloudevents::EventBuilder;
77+
use serde_json::json;
78+
use url::Url;
79+
80+
#[tokio::test]
81+
async fn test_request() {
82+
let url = mockito::server_url();
83+
let m = mock("POST", "/")
84+
.match_header("ce-specversion", "1.0")
85+
.match_header("ce-id", "0001")
86+
.match_header("ce-type", "example.test")
87+
.match_header("ce-source", "http://localhost/")
88+
.match_header("ce-someint", "10")
89+
.match_body(Matcher::Missing)
90+
.create();
91+
92+
let input = EventBuilder::new()
93+
.id("0001")
94+
.ty("example.test")
95+
.source(Url::from_str("http://localhost/").unwrap())
96+
.extension("someint", "10")
97+
.build();
98+
99+
let client = reqwest::Client::new();
100+
event_to_request(input, client.post(&url))
101+
.unwrap()
102+
.send()
103+
.await
104+
.unwrap();
105+
106+
m.assert();
107+
}
108+
109+
#[tokio::test]
110+
async fn test_request_with_full_data() {
111+
let j = json!({"hello": "world"});
112+
113+
let url = mockito::server_url();
114+
let m = mock("POST", "/")
115+
.match_header("ce-specversion", "1.0")
116+
.match_header("ce-id", "0001")
117+
.match_header("ce-type", "example.test")
118+
.match_header("ce-source", "http://localhost/")
119+
.match_header("content-type", "application/json")
120+
.match_header("ce-someint", "10")
121+
.match_body(Matcher::Exact(j.to_string()))
122+
.create();
123+
124+
let input = EventBuilder::new()
125+
.id("0001")
126+
.ty("example.test")
127+
.source(Url::from_str("http://localhost").unwrap())
128+
.data("application/json", j.clone())
129+
.extension("someint", "10")
130+
.build();
131+
132+
let client = reqwest::Client::new();
133+
event_to_request(input, client.post(&url))
134+
.unwrap()
135+
.send()
136+
.await
137+
.unwrap();
138+
139+
m.assert();
140+
}
141+
142+
#[tokio::test]
143+
async fn test_structured_request_with_full_data() {
144+
let j = json!({"hello": "world"});
145+
146+
let input = EventBuilder::new()
147+
.id("0001")
148+
.ty("example.test")
149+
.source(Url::from_str("http://localhost").unwrap())
150+
.data("application/json", j.clone())
151+
.extension("someint", "10")
152+
.build();
153+
154+
let url = mockito::server_url();
155+
let m = mock("POST", "/")
156+
.match_header("content-type", "application/cloudevents+json")
157+
.match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
158+
.create();
159+
160+
let client = reqwest::Client::new();
161+
StructuredDeserializer::deserialize_structured(
162+
input,
163+
RequestSerializer::new(client.post(&url)),
164+
)
165+
.unwrap()
166+
.send()
167+
.await
168+
.unwrap();
169+
170+
m.assert();
171+
}
172+
}

0 commit comments

Comments
 (0)