Skip to content

Commit 211792f

Browse files
authored
Refactor redundant header logic to a shared lib (#146)
* Refactor redundant header logic to a shared lib * Remove some unused code * Share macro between actix and warp, eliminating actix header mod Fixes #145 Signed-off-by: Jim Crossley <[email protected]>
1 parent 2cae3f0 commit 211792f

19 files changed

+154
-385
lines changed

Cargo.toml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ categories = ["web-programming", "encoding", "data-structures"]
1717
name = "cloudevents"
1818

1919
[features]
20-
actix = ["actix-web", "async-trait", "lazy_static", "bytes", "futures"]
21-
reqwest = ["reqwest-lib", "async-trait", "lazy_static", "bytes"]
22-
rdkafka = ["rdkafka-lib", "lazy_static", "bytes", "futures"]
23-
warp = ["warp-lib", "lazy_static", "bytes", "http", "hyper"]
20+
actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
21+
reqwest = ["reqwest-lib", "async-trait", "bytes"]
22+
rdkafka = ["rdkafka-lib", "bytes", "futures"]
23+
warp = ["warp-lib", "bytes", "http", "hyper"]
2424

2525
[dependencies]
2626
serde = { version = "^1.0", features = ["derive"] }
@@ -38,7 +38,6 @@ reqwest-lib = { version = "^0.11", default-features = false, features = ["rustls
3838
rdkafka-lib = { version = "^0.25", features = ["cmake-build"], optional = true, package = "rdkafka" }
3939
warp-lib = { version = "^0.3", optional = true, package = "warp" }
4040
async-trait = { version = "^0.1.33", optional = true }
41-
lazy_static = { version = "1.4.0", optional = true }
4241
bytes = { version = "^1.0", optional = true }
4342
futures = { version = "^0.3", optional = true }
4443
http = { version = "0.2", optional = true }

src/binding/actix/headers.rs

Lines changed: 0 additions & 68 deletions
This file was deleted.

src/binding/actix/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
4242
#![deny(broken_intra_doc_links)]
4343

44-
#[macro_use]
45-
mod headers;
4644
mod server_request;
4745
mod server_response;
4846

src/binding/actix/server_request.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
use super::headers;
1+
use crate::binding::http::SPEC_VERSION_HEADER;
22
use crate::event::SpecVersion;
33
use crate::message::{
44
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
55
Result, StructuredDeserializer, StructuredSerializer,
66
};
7-
use crate::{message, Event};
8-
use actix_web::http::HeaderName;
7+
use crate::{header_value_to_str, message, Event};
98
use actix_web::web::{Bytes, BytesMut};
109
use actix_web::{web, HttpMessage, HttpRequest};
1110
use async_trait::async_trait;
@@ -32,18 +31,21 @@ impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
3231
}
3332

3433
let spec_version = SpecVersion::try_from(
35-
unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?,
34+
self.req
35+
.headers()
36+
.get(SPEC_VERSION_HEADER)
37+
.map(|a| header_value_to_str!(a))
38+
.unwrap()?,
3639
)?;
3740

3841
visitor = visitor.set_spec_version(spec_version.clone())?;
3942

4043
let attributes = spec_version.attribute_names();
4144

42-
for (hn, hv) in
43-
self.req.headers().iter().filter(|(hn, _)| {
44-
headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")
45-
})
46-
{
45+
for (hn, hv) in self.req.headers().iter().filter(|(hn, _)| {
46+
let key = hn.as_str();
47+
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
48+
}) {
4749
let name = &hn.as_str()["ce-".len()..];
4850

4951
if attributes.contains(&name) {
@@ -87,12 +89,7 @@ impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> {
8789
fn encoding(&self) -> Encoding {
8890
if self.req.content_type() == "application/cloudevents+json" {
8991
Encoding::STRUCTURED
90-
} else if self
91-
.req
92-
.headers()
93-
.get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
94-
.is_some()
95-
{
92+
} else if self.req.headers().get(SPEC_VERSION_HEADER).is_some() {
9693
Encoding::BINARY
9794
} else {
9895
Encoding::UNKNOWN

src/binding/actix/server_response.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
1-
use super::headers;
21
use crate::event::SpecVersion;
32
use crate::message::{
43
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
54
};
65
use crate::Event;
6+
use crate::{
7+
binding::{
8+
http::{header_prefix, SPEC_VERSION_HEADER},
9+
CLOUDEVENTS_JSON_HEADER,
10+
},
11+
str_to_header_value,
12+
};
713
use actix_web::dev::HttpResponseBuilder;
8-
use actix_web::http::{HeaderName, HeaderValue, StatusCode};
14+
use actix_web::http::StatusCode;
915
use actix_web::HttpResponse;
1016
use async_trait::async_trait;
1117
use futures::future::LocalBoxFuture;
1218
use futures::FutureExt;
13-
use std::str::FromStr;
1419

1520
/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`].
1621
pub struct HttpResponseSerializer {
@@ -25,26 +30,20 @@ impl HttpResponseSerializer {
2530

2631
impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
2732
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
28-
self.builder.set_header(
29-
headers::SPEC_VERSION_HEADER.clone(),
30-
str_to_header_value!(spec_version.as_str())?,
31-
);
33+
self.builder
34+
.set_header(SPEC_VERSION_HEADER, str_to_header_value!(spec_version)?);
3235
Ok(self)
3336
}
3437

3538
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
36-
self.builder.set_header(
37-
headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
38-
str_to_header_value!(value.to_string().as_str())?,
39-
);
39+
self.builder
40+
.set_header(&header_prefix(name), str_to_header_value!(value)?);
4041
Ok(self)
4142
}
4243

4344
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
44-
self.builder.set_header(
45-
attribute_name_to_header!(name)?,
46-
str_to_header_value!(value.to_string().as_str())?,
47-
);
45+
self.builder
46+
.set_header(&header_prefix(name), str_to_header_value!(value)?);
4847
Ok(self)
4948
}
5049

@@ -63,7 +62,7 @@ impl StructuredSerializer<HttpResponse> for HttpResponseSerializer {
6362
.builder
6463
.set_header(
6564
actix_web::http::header::CONTENT_TYPE,
66-
headers::CLOUDEVENTS_JSON_HEADER.clone(),
65+
CLOUDEVENTS_JSON_HEADER,
6766
)
6867
.body(bytes))
6968
}

src/binding/mod.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,53 @@ pub mod rdkafka;
88
pub mod reqwest;
99
#[cfg(feature = "warp")]
1010
pub mod warp;
11+
12+
#[cfg(feature = "rdkafka")]
13+
pub(crate) mod kafka {
14+
pub static SPEC_VERSION_HEADER: &str = "ce_specversion";
15+
pub fn header_prefix(name: &str) -> String {
16+
super::header_prefix("ce_", name)
17+
}
18+
}
19+
20+
#[cfg(any(feature = "actix", feature = "warp", feature = "reqwest"))]
21+
pub(crate) mod http {
22+
pub static SPEC_VERSION_HEADER: &str = "ce-specversion";
23+
pub fn header_prefix(name: &str) -> String {
24+
super::header_prefix("ce-", name)
25+
}
26+
}
27+
28+
#[cfg(any(feature = "actix", feature = "warp"))]
29+
#[macro_export]
30+
macro_rules! str_to_header_value {
31+
($header_value:expr) => {
32+
http::header::HeaderValue::from_str(&$header_value.to_string()).map_err(|e| {
33+
crate::message::Error::Other {
34+
source: Box::new(e),
35+
}
36+
})
37+
};
38+
}
39+
40+
pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
41+
pub(crate) static CONTENT_TYPE: &str = "content-type";
42+
43+
fn header_prefix(prefix: &str, name: &str) -> String {
44+
if name == "datacontenttype" {
45+
CONTENT_TYPE.to_string()
46+
} else {
47+
[prefix, name].concat()
48+
}
49+
}
50+
51+
#[macro_export]
52+
macro_rules! header_value_to_str {
53+
($header_value:expr) => {
54+
$header_value
55+
.to_str()
56+
.map_err(|e| crate::message::Error::Other {
57+
source: Box::new(e),
58+
})
59+
};
60+
}

src/binding/rdkafka/headers.rs

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/binding/rdkafka/kafka_consumer_record.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use rdkafka_lib as rdkafka;
22

3-
use super::headers;
3+
use crate::binding::{kafka::SPEC_VERSION_HEADER, CLOUDEVENTS_JSON_HEADER, CONTENT_TYPE};
44
use crate::event::SpecVersion;
55
use crate::message::{
66
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
@@ -47,17 +47,18 @@ impl BinaryDeserializer for ConsumerRecordDeserializer {
4747
}
4848

4949
let spec_version = SpecVersion::try_from(
50-
str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
51-
.map_err(|e| crate::message::Error::Other {
50+
str::from_utf8(&self.headers.remove(SPEC_VERSION_HEADER).unwrap()).map_err(|e| {
51+
crate::message::Error::Other {
5252
source: Box::new(e),
53-
})?,
53+
}
54+
})?,
5455
)?;
5556

5657
visitor = visitor.set_spec_version(spec_version.clone())?;
5758

5859
let attributes = spec_version.attribute_names();
5960

60-
if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
61+
if let Some(hv) = self.headers.remove(CONTENT_TYPE) {
6162
visitor = visitor.set_attribute(
6263
"datacontenttype",
6364
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
@@ -71,7 +72,7 @@ impl BinaryDeserializer for ConsumerRecordDeserializer {
7172
for (hn, hv) in self
7273
.headers
7374
.into_iter()
74-
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
75+
.filter(|(hn, _)| SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
7576
{
7677
let name = &hn["ce_".len()..];
7778

@@ -120,9 +121,9 @@ impl MessageDeserializer for ConsumerRecordDeserializer {
120121
.get("content-type")
121122
.map(|s| String::from_utf8(s.to_vec()).ok())
122123
.flatten()
123-
.map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
124+
.map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
124125
.unwrap_or(false),
125-
self.headers.get(headers::SPEC_VERSION_HEADER),
126+
self.headers.get(SPEC_VERSION_HEADER),
126127
) {
127128
(true, _) => Encoding::STRUCTURED,
128129
(_, Some(_)) => Encoding::BINARY,

0 commit comments

Comments
 (0)