Skip to content

Commit 538b647

Browse files
committed
New feature: cloudevents-warp
Conditionally compile warp module when enabled Signed-off-by: Jim Crossley <[email protected]>
1 parent 9055d71 commit 538b647

File tree

10 files changed

+609
-3
lines changed

10 files changed

+609
-3
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ name = "cloudevents"
2020
cloudevents-actix = ["actix-web", "async-trait", "lazy_static", "bytes", "futures"]
2121
cloudevents-reqwest = ["reqwest", "async-trait", "lazy_static", "bytes"]
2222
cloudevents-rdkafka = ["rdkafka", "lazy_static", "bytes"]
23+
cloudevents-warp = ["warp", "lazy_static", "bytes", "http", "hyper"]
2324

2425
[dependencies]
2526
serde = { version = "^1.0", features = ["derive"] }
@@ -35,10 +36,13 @@ bitflags = "^1.2"
3536
actix-web = { version = "^3", default-features = false, optional = true }
3637
reqwest = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true }
3738
rdkafka = { version = "^0.25", features = ["cmake-build"], optional = true }
39+
warp = { version = "^0.3", optional = true }
3840
async-trait = { version = "^0.1.33", optional = true }
3941
lazy_static = { version = "1.4.0", optional = true }
4042
bytes = { version = "^1.0", optional = true }
4143
futures = { version = "^0.3", optional = true }
44+
http = { version = "0.2", optional = true }
45+
hyper = { version = "^0.14", optional = true }
4246

4347
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
4448
hostname = "^0.3"
@@ -61,6 +65,7 @@ serde_json = { version = "^1.0" }
6165
chrono = { version = "^0.4", features = ["serde"] }
6266
mockito = "0.25.1"
6367
tokio = { version = "^1.0", features = ["full"] }
68+
mime = "0.3"
6469

6570
[workspace]
6671
members = [

example-projects/warp-example/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ categories = ["web-programming", "encoding"]
77
license-file = "../LICENSE"
88

99
[dependencies]
10-
cloudevents-sdk = { path = "../.." }
11-
cloudevents-sdk-warp = { path = "../../cloudevents-sdk-warp"}
10+
cloudevents-sdk = { path = "../..", features = ["cloudevents-warp"] }
1211
warp = "^0.3"
1312
tokio = { version = "^1.0", features = ["full"] }
1413

example-projects/warp-example/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use cloudevents_sdk_warp::{filter, reply};
1+
use cloudevents::warp::{filter, reply};
22
use warp::Filter;
33

44
#[tokio::main]

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ pub mod actix;
4646
pub mod rdkafka;
4747
#[cfg(feature = "cloudevents-reqwest")]
4848
pub mod reqwest;
49+
#[cfg(feature = "cloudevents-warp")]
50+
pub mod warp;
4951

5052
pub mod event;
5153
pub mod message;

src/warp/filter.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
use super::server_request::request_to_event;
2+
3+
use crate::Event;
4+
use warp::http::HeaderMap;
5+
use warp::Filter;
6+
use warp::Rejection;
7+
8+
#[derive(Debug)]
9+
pub struct EventFilterError {
10+
error: crate::message::Error,
11+
}
12+
13+
impl warp::reject::Reject for EventFilterError {}
14+
15+
///
16+
/// # Extracts [`cloudevents::Event`] from incoming request
17+
///
18+
/// ```
19+
/// use cloudevents::warp::filter::to_event;
20+
/// use warp::Filter;
21+
/// use warp::Reply;
22+
///
23+
/// let routes = warp::any()
24+
/// .and(to_event())
25+
/// .map(|event| {
26+
/// // do something with the event
27+
/// }
28+
/// );
29+
/// ```
30+
///
31+
pub fn to_event() -> impl Filter<Extract = (Event,), Error = Rejection> + Copy {
32+
warp::header::headers_cloned()
33+
.and(warp::body::bytes())
34+
.and_then(create_event)
35+
}
36+
37+
async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result<Event, Rejection> {
38+
request_to_event(headers, body)
39+
.map_err(|error| warp::reject::custom(EventFilterError { error }))
40+
}
41+
42+
#[cfg(test)]
43+
mod tests {
44+
use super::to_event;
45+
use warp::test;
46+
47+
use crate::{EventBuilder, EventBuilderV10};
48+
use chrono::Utc;
49+
use serde_json::json;
50+
51+
#[tokio::test]
52+
async fn test_request() {
53+
let time = Utc::now();
54+
let expected = EventBuilderV10::new()
55+
.id("0001")
56+
.ty("example.test")
57+
.source("http://localhost/")
58+
.time(time)
59+
.extension("someint", "10")
60+
.build()
61+
.unwrap();
62+
63+
let result = test::request()
64+
.method("POST")
65+
.header("ce-specversion", "1.0")
66+
.header("ce-id", "0001")
67+
.header("ce-type", "example.test")
68+
.header("ce-source", "http://localhost/")
69+
.header("ce-someint", "10")
70+
.header("ce-time", time.to_rfc3339())
71+
.filter(&to_event())
72+
.await
73+
.unwrap();
74+
75+
assert_eq!(expected, result);
76+
}
77+
78+
#[tokio::test]
79+
async fn test_bad_request() {
80+
let time = Utc::now();
81+
82+
let result = test::request()
83+
.method("POST")
84+
.header("ce-specversion", "BAD SPECIFICATION")
85+
.header("ce-id", "0001")
86+
.header("ce-type", "example.test")
87+
.header("ce-source", "http://localhost/")
88+
.header("ce-someint", "10")
89+
.header("ce-time", time.to_rfc3339())
90+
.filter(&to_event())
91+
.await;
92+
93+
assert!(result.is_err());
94+
let rejection = result.unwrap_err();
95+
96+
let reason = rejection.find::<super::EventFilterError>().unwrap();
97+
assert_eq!(
98+
reason.error.to_string(),
99+
"Invalid specversion BAD SPECIFICATION"
100+
)
101+
}
102+
103+
#[tokio::test]
104+
async fn test_request_with_full_data() {
105+
let time = Utc::now();
106+
let j = json!({"hello": "world"});
107+
108+
let expected = EventBuilderV10::new()
109+
.id("0001")
110+
.ty("example.test")
111+
.source("http://localhost")
112+
.time(time)
113+
.data("application/json", j.to_string().into_bytes())
114+
.extension("someint", "10")
115+
.build()
116+
.unwrap();
117+
118+
let result = test::request()
119+
.method("POST")
120+
.header("ce-specversion", "1.0")
121+
.header("ce-id", "0001")
122+
.header("ce-type", "example.test")
123+
.header("ce-source", "http://localhost")
124+
.header("ce-someint", "10")
125+
.header("ce-time", time.to_rfc3339())
126+
.header("content-type", "application/json")
127+
.json(&j)
128+
.filter(&to_event())
129+
.await
130+
.unwrap();
131+
132+
assert_eq!(expected, result);
133+
}
134+
}

src/warp/headers.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use crate::event::SpecVersion;
2+
use http::header::HeaderName;
3+
use lazy_static::lazy_static;
4+
use warp::http::HeaderValue;
5+
6+
use std::collections::HashMap;
7+
use std::str::FromStr;
8+
9+
macro_rules! unwrap_optional_header {
10+
($headers:expr, $name:expr) => {
11+
$headers
12+
.get::<&'static HeaderName>(&$name)
13+
.map(|a| header_value_to_str!(a))
14+
};
15+
}
16+
17+
macro_rules! header_value_to_str {
18+
($header_value:expr) => {
19+
$header_value
20+
.to_str()
21+
.map_err(|e| crate::message::Error::Other {
22+
source: Box::new(e),
23+
})
24+
};
25+
}
26+
27+
macro_rules! str_name_to_header {
28+
($attribute:expr) => {
29+
HeaderName::from_str($attribute).map_err(|e| crate::message::Error::Other {
30+
source: Box::new(e),
31+
})
32+
};
33+
}
34+
35+
macro_rules! attribute_name_to_header {
36+
($attribute:expr) => {
37+
str_name_to_header!(&["ce-", $attribute].concat())
38+
};
39+
}
40+
41+
fn attributes_to_headers(
42+
it: impl Iterator<Item = &'static str>,
43+
) -> HashMap<&'static str, HeaderName> {
44+
it.map(|s| {
45+
if s == "datacontenttype" {
46+
(s, http::header::CONTENT_TYPE)
47+
} else {
48+
(s, attribute_name_to_header!(s).unwrap())
49+
}
50+
})
51+
.collect()
52+
}
53+
54+
lazy_static! {
55+
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, HeaderName> =
56+
attributes_to_headers(SpecVersion::all_attribute_names());
57+
pub(crate) static ref SPEC_VERSION_HEADER: HeaderName =
58+
HeaderName::from_static("ce-specversion");
59+
pub(crate) static ref CLOUDEVENTS_JSON_HEADER: HeaderValue =
60+
HeaderValue::from_static("application/cloudevents+json");
61+
}

src/warp/mod.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
//! This crate integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with [Warp web service framework](https://docs.rs/warp/)
2+
//! to easily send and receive CloudEvents.
3+
//!
4+
//! To deserialize an HTTP request as CloudEvent
5+
//!
6+
//! To echo events:
7+
//!
8+
//! ```
9+
//! use warp::{Filter, Reply};
10+
//! use cloudevents::warp::reply::from_event;
11+
//! use cloudevents::warp::filter::to_event;
12+
//!
13+
//! let routes = warp::any()
14+
//! // extracting event from request
15+
//! .and(to_event())
16+
//! // returning event back
17+
//! .map(|event| from_event(event));
18+
//!
19+
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
20+
//! ```
21+
//!
22+
//! To create event inside request handlers and send them as responses:
23+
//!
24+
//! ```
25+
//! use cloudevents::{Event, EventBuilder, EventBuilderV10};
26+
//! use http::StatusCode;
27+
//! use serde_json::json;
28+
//! use warp::{Filter, Reply};
29+
//! use cloudevents::warp::reply::from_event;
30+
//!
31+
//! let routes = warp::any().map(|| {
32+
//! let event = EventBuilderV10::new()
33+
//! .id("1")
34+
//! .source("url://example_response/")
35+
//! .ty("example.ce")
36+
//! .data(
37+
//! mime::APPLICATION_JSON.to_string(),
38+
//! json!({
39+
//! "name": "John Doe",
40+
//! "age": 43,
41+
//! "phones": [
42+
//! "+44 1234567",
43+
//! "+44 2345678"
44+
//! ]
45+
//! }),
46+
//! )
47+
//! .build();
48+
//!
49+
//! match event {
50+
//! Ok(event) => Ok(from_event(event)),
51+
//! Err(e) => Ok(warp::reply::with_status(
52+
//! e.to_string(),
53+
//! StatusCode::INTERNAL_SERVER_ERROR,
54+
//! )
55+
//! .into_response()),
56+
//! }
57+
//! });
58+
//! warp::serve(routes).run(([127, 0, 0, 1], 3030));
59+
//! ```
60+
//!
61+
//! Check out the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) docs for more details on how to use [`cloudevents::Event`]
62+
63+
#[macro_use]
64+
mod headers;
65+
66+
mod server_request;
67+
mod server_response;
68+
69+
pub mod filter;
70+
pub mod reply;

0 commit comments

Comments
 (0)