Skip to content

Commit c4e8780

Browse files
committed
Encapsulate shared event deserialization behind a Headers trait
Both warp and reqwest use the HeaderMap from the http crate. Actix has its own. Fortunately, both contain (HeaderName, HeaderValue) tuples. Further, actix uses a conflicting version of the bytes crate, so I store a Vec<u8> instead of a Bytes member in the Deserializer struct. Not sure if that's a problem, but the tests pass. :) We use an associated type in the Headers trait to facilitate static dispatch for warp/reqwest since their concrete iterator is public, but the actix Iter struct is private, so we use a Box for its impl. We're using AsHeaderName for the get() param to avoid having to call as_str() on any header constants, but of course actix uses its own AsName trait, which isn't public, so we must call as_str() for the passed header name in its impl. Signed-off-by: Jim Crossley <[email protected]>
1 parent ca3ba3b commit c4e8780

File tree

10 files changed

+85
-233
lines changed

10 files changed

+85
-233
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ name = "cloudevents"
1818

1919
[features]
2020
actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
21-
reqwest = ["reqwest-lib", "async-trait", "bytes"]
21+
reqwest = ["reqwest-lib", "async-trait", "bytes", "http"]
2222
rdkafka = ["rdkafka-lib", "bytes", "futures"]
2323
warp = ["warp-lib", "bytes", "http", "hyper"]
2424

src/binding/actix/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ mod server_request;
4545
mod server_response;
4646

4747
pub use server_request::request_to_event;
48-
pub use server_request::HttpRequestDeserializer;
4948
pub use server_request::HttpRequestExt;
5049
pub use server_response::event_to_response;
5150
pub use server_response::HttpResponseBuilderExt;

src/binding/actix/server_request.rs

Lines changed: 13 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,20 @@
1-
use crate::binding::http::SPEC_VERSION_HEADER;
2-
use crate::event::SpecVersion;
3-
use crate::message::{
4-
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
5-
Result, StructuredDeserializer, StructuredSerializer,
6-
};
7-
use crate::{header_value_to_str, message, Event};
8-
use actix_web::web::{Bytes, BytesMut};
9-
use actix_web::{web, HttpMessage, HttpRequest};
1+
use crate::binding::http::{to_event, Headers};
2+
use crate::Event;
3+
use actix_web::web::BytesMut;
4+
use actix_web::{web, HttpRequest};
105
use async_trait::async_trait;
116
use futures::future::LocalBoxFuture;
127
use futures::{FutureExt, StreamExt};
13-
use std::convert::TryFrom;
8+
use http::header::{AsHeaderName, HeaderName, HeaderValue};
149

15-
/// Wrapper for [`HttpRequest`] that implements [`MessageDeserializer`] trait.
16-
pub struct HttpRequestDeserializer<'a> {
17-
req: &'a HttpRequest,
18-
body: Bytes,
19-
}
20-
21-
impl HttpRequestDeserializer<'_> {
22-
pub fn new(req: &HttpRequest, body: Bytes) -> HttpRequestDeserializer {
23-
HttpRequestDeserializer { req, body }
24-
}
25-
}
26-
27-
impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
28-
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
29-
if self.encoding() != Encoding::BINARY {
30-
return Err(message::Error::WrongEncoding {});
31-
}
32-
33-
let spec_version = SpecVersion::try_from(
34-
self.req
35-
.headers()
36-
.get(SPEC_VERSION_HEADER)
37-
.map(|a| header_value_to_str!(a))
38-
.unwrap()?,
39-
)?;
40-
41-
visitor = visitor.set_spec_version(spec_version.clone())?;
42-
43-
let attributes = spec_version.attribute_names();
44-
45-
for (hn, hv) in self.req.headers().iter().filter(|(hn, _)| {
46-
let key = hn.as_str();
47-
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
48-
}) {
49-
let name = &hn.as_str()["ce-".len()..];
50-
51-
if attributes.contains(&name) {
52-
visitor = visitor.set_attribute(
53-
name,
54-
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
55-
)?
56-
} else {
57-
visitor = visitor.set_extension(
58-
name,
59-
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
60-
)?
61-
}
62-
}
63-
64-
if let Some(hv) = self.req.headers().get("content-type") {
65-
visitor = visitor.set_attribute(
66-
"datacontenttype",
67-
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
68-
)?
69-
}
70-
71-
if !self.body.is_empty() {
72-
visitor.end_with_data(self.body.to_vec())
73-
} else {
74-
visitor.end()
75-
}
10+
/// Implement Headers for the actix HeaderMap
11+
impl<'a> Headers<'a> for actix_web::http::HeaderMap {
12+
type Iterator = Box<dyn Iterator<Item = (&'a HeaderName, &'a HeaderValue)> + 'a>;
13+
fn get<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
14+
self.get(key.as_str())
7615
}
77-
}
78-
79-
impl<'a> StructuredDeserializer for HttpRequestDeserializer<'a> {
80-
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
81-
if self.encoding() != Encoding::STRUCTURED {
82-
return Err(message::Error::WrongEncoding {});
83-
}
84-
visitor.set_structured_event(self.body.to_vec())
85-
}
86-
}
87-
88-
impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> {
89-
fn encoding(&self) -> Encoding {
90-
if self.req.content_type() == "application/cloudevents+json" {
91-
Encoding::STRUCTURED
92-
} else if self.req.headers().get(SPEC_VERSION_HEADER).is_some() {
93-
Encoding::BINARY
94-
} else {
95-
Encoding::UNKNOWN
96-
}
16+
fn iter(&'a self) -> Self::Iterator {
17+
Box::new(self.iter())
9718
}
9819
}
9920

@@ -106,8 +27,7 @@ pub async fn request_to_event(
10627
while let Some(item) = payload.next().await {
10728
bytes.extend_from_slice(&item?);
10829
}
109-
MessageDeserializer::into_event(HttpRequestDeserializer::new(req, bytes.freeze()))
110-
.map_err(actix_web::error::ErrorBadRequest)
30+
to_event(req.headers(), bytes.to_vec()).map_err(actix_web::error::ErrorBadRequest)
11131
}
11232

11333
/// So that an actix-web handler may take an Event parameter
Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,27 @@
1-
use bytes::Bytes;
2-
use http::HeaderMap;
3-
4-
use crate::binding::http::SPEC_VERSION_HEADER;
5-
use crate::event::SpecVersion;
6-
use crate::header_value_to_str;
7-
use crate::message::{
8-
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
9-
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
1+
use super::{Headers, SPEC_VERSION_HEADER};
2+
use crate::{
3+
binding::CLOUDEVENTS_JSON_HEADER,
4+
event::SpecVersion,
5+
header_value_to_str, message,
6+
message::{
7+
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
8+
Result, StructuredDeserializer, StructuredSerializer,
9+
},
1010
};
11-
use crate::{message, Event};
1211
use std::convert::TryFrom;
1312

14-
pub struct RequestDeserializer {
15-
headers: HeaderMap,
16-
body: Bytes,
13+
pub struct Deserializer<'a, T: Headers<'a>> {
14+
headers: &'a T,
15+
body: Vec<u8>,
1716
}
1817

19-
impl RequestDeserializer {
20-
pub fn new(headers: HeaderMap, body: Bytes) -> RequestDeserializer {
21-
RequestDeserializer { headers, body }
18+
impl<'a, T: Headers<'a>> Deserializer<'a, T> {
19+
pub fn new(headers: &'a T, body: Vec<u8>) -> Deserializer<'a, T> {
20+
Deserializer { headers, body }
2221
}
2322
}
2423

25-
impl BinaryDeserializer for RequestDeserializer {
24+
impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
2625
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
2726
if self.encoding() != Encoding::BINARY {
2827
return Err(message::Error::WrongEncoding {});
@@ -58,48 +57,44 @@ impl BinaryDeserializer for RequestDeserializer {
5857
}
5958
}
6059

61-
if let Some(hv) = self.headers.get("content-type") {
60+
if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
6261
visitor = visitor.set_attribute(
6362
"datacontenttype",
6463
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
6564
)?
6665
}
6766

6867
if !self.body.is_empty() {
69-
visitor.end_with_data(self.body.to_vec())
68+
visitor.end_with_data(self.body)
7069
} else {
7170
visitor.end()
7271
}
7372
}
7473
}
7574

76-
impl StructuredDeserializer for RequestDeserializer {
75+
impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
7776
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
7877
if self.encoding() != Encoding::STRUCTURED {
7978
return Err(message::Error::WrongEncoding {});
8079
}
81-
visitor.set_structured_event(self.body.to_vec())
80+
visitor.set_structured_event(self.body)
8281
}
8382
}
8483

85-
impl MessageDeserializer for RequestDeserializer {
84+
impl<'a, T: Headers<'a>> MessageDeserializer for Deserializer<'a, T> {
8685
fn encoding(&self) -> Encoding {
8786
if self
8887
.headers
89-
.get("content-type")
90-
.map(|v| v.to_str().unwrap_or(""))
91-
.unwrap_or("")
92-
== "application/cloudevents+json"
88+
.get(http::header::CONTENT_TYPE)
89+
.and_then(|v| v.to_str().ok())
90+
.filter(|&v| v.starts_with(CLOUDEVENTS_JSON_HEADER))
91+
.is_some()
9392
{
9493
Encoding::STRUCTURED
95-
} else if self.headers.contains_key(SPEC_VERSION_HEADER) {
94+
} else if self.headers.get(SPEC_VERSION_HEADER).is_some() {
9695
Encoding::BINARY
9796
} else {
9897
Encoding::UNKNOWN
9998
}
10099
}
101100
}
102-
103-
pub fn request_to_event(req: HeaderMap, bytes: bytes::Bytes) -> std::result::Result<Event, Error> {
104-
MessageDeserializer::into_event(RequestDeserializer::new(req, bytes))
105-
}

src/binding/http/headers.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use http::header::{AsHeaderName, HeaderMap, HeaderName, HeaderValue};
2+
3+
/// Any http library should be able to use the
4+
/// [`to_event`](super::to_event) function with an implementation of
5+
/// this trait.
6+
pub trait Headers<'a> {
7+
type Iterator: Iterator<Item = (&'a HeaderName, &'a HeaderValue)>;
8+
fn get<K: AsHeaderName>(&self, name: K) -> Option<&HeaderValue>;
9+
fn iter(&'a self) -> Self::Iterator;
10+
}
11+
12+
/// Implemention for the HeaderMap used by warp/reqwest
13+
impl<'a> Headers<'a> for HeaderMap<HeaderValue> {
14+
type Iterator = http::header::Iter<'a, HeaderValue>;
15+
fn get<K: AsHeaderName>(&self, name: K) -> Option<&HeaderValue> {
16+
self.get(name)
17+
}
18+
fn iter(&'a self) -> Self::Iterator {
19+
self.iter()
20+
}
21+
}

src/binding/http/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,23 @@
1+
mod deserializer;
2+
mod headers;
3+
4+
use crate::{
5+
message::{Error, MessageDeserializer},
6+
Event,
7+
};
8+
use deserializer::Deserializer;
9+
pub use headers::Headers;
10+
111
pub static SPEC_VERSION_HEADER: &str = "ce-specversion";
212

13+
/// Turn a pile of HTTP headers and a body into a CloudEvent
14+
pub fn to_event<'a, T: Headers<'a>>(
15+
headers: &'a T,
16+
body: Vec<u8>,
17+
) -> std::result::Result<Event, Error> {
18+
MessageDeserializer::into_event(Deserializer::new(headers, body))
19+
}
20+
321
pub fn header_prefix(name: &str) -> String {
422
super::header_prefix("ce-", name)
523
}

0 commit comments

Comments
 (0)