Skip to content

Commit d987002

Browse files
committed
New feature: cloudevents-poem
Signed-off-by: Sunli <[email protected]>
1 parent 1e89203 commit d987002

File tree

6 files changed

+276
-4
lines changed

6 files changed

+276
-4
lines changed

Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
2222
reqwest = ["reqwest-lib", "async-trait", "bytes", "http"]
2323
rdkafka = ["rdkafka-lib", "bytes", "futures"]
2424
warp = ["warp-lib", "bytes", "http", "hyper"]
25-
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
25+
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
26+
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
2627

2728
[dependencies]
2829
serde = { version = "^1.0", features = ["derive"] }
@@ -44,8 +45,9 @@ bytes = { version = "^1.0", optional = true }
4445
futures = { version = "^0.3", optional = true }
4546
http = { version = "0.2", optional = true }
4647
hyper = { version = "^0.14", optional = true }
47-
axum-lib = { version = "^0.2", optional = true, package="axum"}
48-
http-body = { version = "^0.4", optional = true}
48+
axum-lib = { version = "^0.2", optional = true, package = "axum" }
49+
http-body = { version = "^0.4", optional = true }
50+
poem-lib = { version = "1.0.21", optional = true, package = "poem" }
4951

5052
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
5153
hostname = "^0.3"

src/binding/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ pub mod axum;
99
feature = "actix",
1010
feature = "warp",
1111
feature = "reqwest",
12-
feature = "axum"
12+
feature = "axum",
13+
feature = "poem"
1314
))]
1415
pub mod http;
16+
#[cfg(feature = "poem")]
17+
pub mod poem;
1518
#[cfg(feature = "rdkafka")]
1619
pub mod rdkafka;
1720
#[cfg(feature = "reqwest")]

src/binding/poem/extractor.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use async_trait::async_trait;
2+
use poem_lib::error::ReadBodyError;
3+
use poem_lib::http::StatusCode;
4+
use poem_lib::{FromRequest, IntoResponse, Request, RequestBody, Response};
5+
6+
use crate::binding::http::to_event;
7+
use crate::Event;
8+
9+
#[derive(Debug)]
10+
pub enum ParseEventError {
11+
ReadBody(ReadBodyError),
12+
ParseEvent(crate::message::Error),
13+
}
14+
15+
impl From<ReadBodyError> for ParseEventError {
16+
fn from(err: ReadBodyError) -> Self {
17+
ParseEventError::ReadBody(err)
18+
}
19+
}
20+
21+
impl From<crate::message::Error> for ParseEventError {
22+
fn from(err: crate::message::Error) -> Self {
23+
ParseEventError::ParseEvent(err)
24+
}
25+
}
26+
27+
impl IntoResponse for ParseEventError {
28+
fn into_response(self) -> Response {
29+
match self {
30+
ParseEventError::ReadBody(err) => err.into_response(),
31+
ParseEventError::ParseEvent(err) => (StatusCode::BAD_REQUEST, err.to_string()).into(),
32+
}
33+
}
34+
}
35+
36+
#[async_trait]
37+
impl<'a> FromRequest<'a> for Event {
38+
type Error = ParseEventError;
39+
40+
async fn from_request(req: &'a Request, body: &mut RequestBody) -> Result<Self, Self::Error> {
41+
Ok(to_event(req.headers(), body.take()?.into_vec().await?)?)
42+
}
43+
}
44+
45+
#[cfg(test)]
46+
mod tests {
47+
use super::*;
48+
use crate::test::fixtures;
49+
use poem_lib::http::Method;
50+
51+
#[tokio::test]
52+
async fn test_request() {
53+
let expected = fixtures::v10::minimal_string_extension();
54+
55+
let req = Request::builder()
56+
.method(Method::POST)
57+
.header("ce-specversion", "1.0")
58+
.header("ce-id", "0001")
59+
.header("ce-type", "test_event.test_application")
60+
.header("ce-source", "http://localhost/")
61+
.header("ce-someint", "10")
62+
.finish();
63+
let (req, mut body) = req.split();
64+
let result = Event::from_request(&req, &mut body).await.unwrap();
65+
66+
assert_eq!(expected, result);
67+
}
68+
69+
#[tokio::test]
70+
async fn test_bad_request() {
71+
let req = Request::builder()
72+
.method(Method::POST)
73+
.header("ce-specversion", "BAD SPECIFICATION")
74+
.header("ce-id", "0001")
75+
.header("ce-type", "example.test")
76+
.header("ce-source", "http://localhost/")
77+
.header("ce-someint", "10")
78+
.header("ce-time", fixtures::time().to_rfc3339())
79+
.finish();
80+
81+
let (req, mut body) = req.split();
82+
let resp = Event::from_request(&req, &mut body).await.into_response();
83+
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
84+
assert_eq!(
85+
resp.into_body().into_string().await.unwrap(),
86+
"Invalid specversion BAD SPECIFICATION"
87+
);
88+
}
89+
90+
#[tokio::test]
91+
async fn test_request_with_full_data() {
92+
let expected = fixtures::v10::full_binary_json_data_string_extension();
93+
94+
let req = Request::builder()
95+
.method(Method::POST)
96+
.header("ce-specversion", "1.0")
97+
.header("ce-id", "0001")
98+
.header("ce-type", "test_event.test_application")
99+
.header("ce-source", "http://localhost/")
100+
.header("ce-subject", "cloudevents-sdk")
101+
.header("content-type", "application/json")
102+
.header("ce-string_ex", "val")
103+
.header("ce-int_ex", "10")
104+
.header("ce-bool_ex", "true")
105+
.header("ce-time", &fixtures::time().to_rfc3339())
106+
.body(fixtures::json_data_binary());
107+
let (req, mut body) = req.split();
108+
let result = Event::from_request(&req, &mut body).await.unwrap();
109+
110+
assert_eq!(expected, result);
111+
}
112+
}

src/binding/poem/mod.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//! This module integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with
2+
//! [Poem](https://docs.rs/poem/) to easily send and receive CloudEvents.
3+
//!
4+
//! To deserialize an HTTP request as CloudEvent
5+
//!
6+
//! To echo events:
7+
//!
8+
//! ```rust
9+
//! use cloudevents::Event;
10+
//! use poem_lib as poem;
11+
//! use poem::{handler, Route, post};
12+
//!
13+
//! #[handler]
14+
//! async fn index(event: Event) -> Event {
15+
//! println!("received cloudevent {}", &event);
16+
//! event
17+
//! }
18+
//!
19+
//! let app = Route::new().at("/", post(index));
20+
//! ```
21+
//!
22+
//! To create event inside request handlers and send them as responses:
23+
//!
24+
//! ```rust
25+
//! use cloudevents::{Event, EventBuilder, EventBuilderV10};
26+
//! use poem_lib as poem;
27+
//! use poem::{handler, Route, post, Result};
28+
//! use poem::error::InternalServerError;
29+
//! use serde_json::json;
30+
//!
31+
//! #[handler]
32+
//! async fn index() -> Result<Event> {
33+
//! let event = EventBuilderV10::new()
34+
//! .id("1")
35+
//! .source("url://example_response/")
36+
//! .ty("example.ce")
37+
//! .data(
38+
//! mime::APPLICATION_JSON.to_string(),
39+
//! json!({
40+
//! "name": "John Doe",
41+
//! "age": 43,
42+
//! "phones": [
43+
//! "+44 1234567",
44+
//! "+44 2345678"
45+
//! ]
46+
//! }),
47+
//! )
48+
//! .build()
49+
//! .map_err(InternalServerError)?;
50+
//! Ok(event)
51+
//! }
52+
//!
53+
//! let app = Route::new().at("/", post(index));
54+
//! ```
55+
56+
mod extractor;
57+
mod response;
58+
59+
pub use extractor::ParseEventError;

src/binding/poem/response.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use poem_lib::http::StatusCode;
2+
use poem_lib::{IntoResponse, Response};
3+
4+
use crate::binding::http::builder::adapter::to_response;
5+
use crate::Event;
6+
7+
impl IntoResponse for Event {
8+
fn into_response(self) -> Response {
9+
match to_response(self) {
10+
Ok(resp) => resp.into(),
11+
Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into(),
12+
}
13+
}
14+
}
15+
16+
#[cfg(test)]
17+
mod tests {
18+
use crate::test::fixtures;
19+
use poem_lib::IntoResponse;
20+
21+
#[test]
22+
fn test_response() {
23+
let input = fixtures::v10::minimal_string_extension();
24+
25+
let resp = input.into_response();
26+
27+
assert_eq!(
28+
resp.headers()
29+
.get("ce-specversion")
30+
.unwrap()
31+
.to_str()
32+
.unwrap(),
33+
"1.0"
34+
);
35+
assert_eq!(
36+
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
37+
"0001"
38+
);
39+
assert_eq!(
40+
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
41+
"test_event.test_application"
42+
);
43+
assert_eq!(
44+
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
45+
"http://localhost/"
46+
);
47+
assert_eq!(
48+
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
49+
"10"
50+
);
51+
}
52+
53+
#[tokio::test]
54+
async fn test_response_with_full_data() {
55+
let input = fixtures::v10::full_binary_json_data_string_extension();
56+
57+
let resp = input.into_response();
58+
59+
assert_eq!(
60+
resp.headers()
61+
.get("ce-specversion")
62+
.unwrap()
63+
.to_str()
64+
.unwrap(),
65+
"1.0"
66+
);
67+
assert_eq!(
68+
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
69+
"0001"
70+
);
71+
assert_eq!(
72+
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
73+
"test_event.test_application"
74+
);
75+
assert_eq!(
76+
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
77+
"http://localhost/"
78+
);
79+
assert_eq!(
80+
resp.headers()
81+
.get("content-type")
82+
.unwrap()
83+
.to_str()
84+
.unwrap(),
85+
"application/json"
86+
);
87+
assert_eq!(
88+
resp.headers().get("ce-int_ex").unwrap().to_str().unwrap(),
89+
"10"
90+
);
91+
92+
let body = resp.into_body().into_vec().await.unwrap();
93+
assert_eq!(fixtures::json_data_binary(), body);
94+
}
95+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#![feature(in_band_lifetimes)]
12
//! This crate implements the [CloudEvents](https://cloudevents.io/) Spec for Rust.
23
//!
34
//! ```

0 commit comments

Comments
 (0)