Skip to content

Commit 65a4782

Browse files
authored
Merge pull request #159 from sunli829/master
New feature: cloudevents-poem
2 parents 82f08f8 + 127d0ce commit 65a4782

File tree

9 files changed

+444
-4
lines changed

9 files changed

+444
-4
lines changed

.github/workflows/rust_tests.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,12 @@ jobs:
137137
command: build
138138
toolchain: ${{ matrix.toolchain }}
139139
args: --target ${{ matrix.target }} --manifest-path ./example-projects/axum-example/Cargo.toml
140+
141+
- uses: actions-rs/cargo@v1
142+
name: "Build poem-example"
143+
if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable'
144+
with:
145+
command: build
146+
toolchain: ${{ matrix.toolchain }}
147+
args: --target ${{ matrix.target }} --manifest-path ./example-projects/poem-example/Cargo.toml
148+

Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
2222
reqwest = ["reqwest-lib", "async-trait", "bytes", "http"]
2323
rdkafka = ["rdkafka-lib", "bytes", "futures"]
2424
warp = ["warp-lib", "bytes", "http", "hyper"]
25-
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
25+
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
26+
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
2627

2728
[dependencies]
2829
serde = { version = "^1.0", features = ["derive"] }
@@ -44,8 +45,9 @@ bytes = { version = "^1.0", optional = true }
4445
futures = { version = "^0.3", optional = true }
4546
http = { version = "0.2", optional = true }
4647
hyper = { version = "^0.14", optional = true }
47-
axum-lib = { version = "^0.2", optional = true, package="axum"}
48-
http-body = { version = "^0.4", optional = true}
48+
axum-lib = { version = "^0.2", optional = true, package = "axum" }
49+
http-body = { version = "^0.4", optional = true }
50+
poem-lib = { version = "1.0.23", optional = true, package = "poem" }
4951

5052
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
5153
hostname = "^0.3"
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "poem-example"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
cloudevents-sdk = { path = "../..", features = ["poem"] }
8+
tokio = { version = "1.13", features = ["macros", "rt-multi-thread"] }
9+
tracing = "0.1"
10+
poem = { version = "1.0.23" }
11+
tracing-subscriber = "0.2"
12+
serde_json = "1.0"
13+
14+
[dev-dependencies]
15+
chrono = { version = "0.4", features = ["serde"] }
16+
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
To run the server:
2+
3+
```console
4+
cargo run
5+
```
6+
7+
To test a GET:
8+
9+
```console
10+
curl http://localhost:8080
11+
```
12+
13+
To test a POST:
14+
15+
```console
16+
curl -d '{"hello": "world"}' \
17+
-H'content-type: application/json' \
18+
-H'ce-specversion: 1.0' \
19+
-H'ce-id: 1' \
20+
-H'ce-source: http://cloudevents.io' \
21+
-H'ce-type: dev.knative.example' \
22+
http://localhost:8080
23+
```
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use cloudevents::{Event, EventBuilder, EventBuilderV10};
2+
use poem::error::InternalServerError;
3+
use poem::listener::TcpListener;
4+
use poem::middleware::Tracing;
5+
use poem::{get, handler, Endpoint, EndpointExt, Response, Result, Route, Server};
6+
use serde_json::json;
7+
8+
#[handler]
9+
async fn get_event() -> Result<Event> {
10+
let event = EventBuilderV10::new()
11+
.id("1")
12+
.source("url://example_response/")
13+
.ty("example.ce")
14+
.data(
15+
"application/json",
16+
json!({
17+
"name": "John Doe",
18+
"age": 43,
19+
"phones": [
20+
"+44 1234567",
21+
"+44 2345678"
22+
]
23+
}),
24+
)
25+
.build()
26+
.map_err(InternalServerError)?;
27+
Ok(event)
28+
}
29+
30+
#[handler]
31+
async fn post_event(event: Event) -> Event {
32+
tracing::debug!("received cloudevent {}", &event);
33+
event
34+
}
35+
36+
fn echo_app() -> impl Endpoint<Output = Response> {
37+
Route::new()
38+
.at("/", get(get_event).post(post_event))
39+
.with(Tracing)
40+
}
41+
42+
#[tokio::main]
43+
async fn main() -> Result<(), std::io::Error> {
44+
if std::env::var("RUST_LOG").is_err() {
45+
std::env::set_var("RUST_LOG", "poem=debug")
46+
}
47+
tracing_subscriber::fmt::init();
48+
49+
let server = Server::new(TcpListener::bind("127.0.0.1:8080")).await?;
50+
server.run(echo_app()).await
51+
}
52+
53+
#[cfg(test)]
54+
mod tests {
55+
use super::*;
56+
use chrono::Utc;
57+
use poem::http::Method;
58+
use poem::{Body, Request};
59+
use serde_json::json;
60+
61+
#[tokio::test]
62+
async fn poem_test() {
63+
if std::env::var("RUST_LOG").is_err() {
64+
std::env::set_var("RUST_LOG", "poem_example=debug")
65+
}
66+
tracing_subscriber::fmt::init();
67+
68+
let app = echo_app();
69+
let time = Utc::now();
70+
let j = json!({"hello": "world"});
71+
let request = Request::builder()
72+
.method(Method::POST)
73+
.header("ce-specversion", "1.0")
74+
.header("ce-id", "0001")
75+
.header("ce-type", "example.test")
76+
.header("ce-source", "http://localhost/")
77+
.header("ce-someint", "10")
78+
.header("ce-time", time.to_rfc3339())
79+
.header("content-type", "application/json")
80+
.body(Body::from_json(&j).unwrap());
81+
82+
let resp: Response = app.call(request).await;
83+
assert_eq!(
84+
resp.headers()
85+
.get("ce-specversion")
86+
.unwrap()
87+
.to_str()
88+
.unwrap(),
89+
"1.0"
90+
);
91+
assert_eq!(
92+
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
93+
"0001"
94+
);
95+
assert_eq!(
96+
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
97+
"example.test"
98+
);
99+
assert_eq!(
100+
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
101+
"http://localhost/"
102+
);
103+
assert_eq!(
104+
resp.headers()
105+
.get("content-type")
106+
.unwrap()
107+
.to_str()
108+
.unwrap(),
109+
"application/json"
110+
);
111+
assert_eq!(
112+
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
113+
"10"
114+
);
115+
116+
assert_eq!(
117+
j.to_string().as_bytes(),
118+
resp.into_body().into_vec().await.unwrap()
119+
);
120+
}
121+
}

src/binding/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ pub mod axum;
99
feature = "actix",
1010
feature = "warp",
1111
feature = "reqwest",
12-
feature = "axum"
12+
feature = "axum",
13+
feature = "poem"
1314
))]
1415
pub mod http;
16+
#[cfg(feature = "poem")]
17+
pub mod poem;
1518
#[cfg(feature = "rdkafka")]
1619
pub mod rdkafka;
1720
#[cfg(feature = "reqwest")]

src/binding/poem/extractor.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use async_trait::async_trait;
2+
use poem_lib::error::ReadBodyError;
3+
use poem_lib::http::StatusCode;
4+
use poem_lib::{FromRequest, IntoResponse, Request, RequestBody, Response};
5+
6+
use crate::binding::http::to_event;
7+
use crate::Event;
8+
9+
#[derive(Debug)]
10+
pub enum ParseEventError {
11+
ReadBody(ReadBodyError),
12+
ParseEvent(crate::message::Error),
13+
}
14+
15+
impl From<ReadBodyError> for ParseEventError {
16+
fn from(err: ReadBodyError) -> Self {
17+
ParseEventError::ReadBody(err)
18+
}
19+
}
20+
21+
impl From<crate::message::Error> for ParseEventError {
22+
fn from(err: crate::message::Error) -> Self {
23+
ParseEventError::ParseEvent(err)
24+
}
25+
}
26+
27+
impl IntoResponse for ParseEventError {
28+
fn into_response(self) -> Response {
29+
match self {
30+
ParseEventError::ReadBody(err) => err.into_response(),
31+
ParseEventError::ParseEvent(err) => (StatusCode::BAD_REQUEST, err.to_string()).into(),
32+
}
33+
}
34+
}
35+
36+
#[async_trait]
37+
impl<'a> FromRequest<'a> for Event {
38+
type Error = ParseEventError;
39+
40+
async fn from_request(req: &'a Request, body: &mut RequestBody) -> Result<Self, Self::Error> {
41+
Ok(to_event(req.headers(), body.take()?.into_vec().await?)?)
42+
}
43+
}
44+
45+
#[cfg(test)]
46+
mod tests {
47+
use super::*;
48+
use crate::test::fixtures;
49+
use poem_lib::http::Method;
50+
51+
#[tokio::test]
52+
async fn test_request() {
53+
let expected = fixtures::v10::minimal_string_extension();
54+
55+
let req = Request::builder()
56+
.method(Method::POST)
57+
.header("ce-specversion", "1.0")
58+
.header("ce-id", "0001")
59+
.header("ce-type", "test_event.test_application")
60+
.header("ce-source", "http://localhost/")
61+
.header("ce-someint", "10")
62+
.finish();
63+
let (req, mut body) = req.split();
64+
let result = Event::from_request(&req, &mut body).await.unwrap();
65+
66+
assert_eq!(expected, result);
67+
}
68+
69+
#[tokio::test]
70+
async fn test_bad_request() {
71+
let req = Request::builder()
72+
.method(Method::POST)
73+
.header("ce-specversion", "BAD SPECIFICATION")
74+
.header("ce-id", "0001")
75+
.header("ce-type", "example.test")
76+
.header("ce-source", "http://localhost/")
77+
.header("ce-someint", "10")
78+
.header("ce-time", fixtures::time().to_rfc3339())
79+
.finish();
80+
81+
let (req, mut body) = req.split();
82+
let resp = Event::from_request(&req, &mut body).await.into_response();
83+
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
84+
assert_eq!(
85+
resp.into_body().into_string().await.unwrap(),
86+
"Invalid specversion BAD SPECIFICATION"
87+
);
88+
}
89+
90+
#[tokio::test]
91+
async fn test_request_with_full_data() {
92+
let expected = fixtures::v10::full_binary_json_data_string_extension();
93+
94+
let req = Request::builder()
95+
.method(Method::POST)
96+
.header("ce-specversion", "1.0")
97+
.header("ce-id", "0001")
98+
.header("ce-type", "test_event.test_application")
99+
.header("ce-source", "http://localhost/")
100+
.header("ce-subject", "cloudevents-sdk")
101+
.header("content-type", "application/json")
102+
.header("ce-string_ex", "val")
103+
.header("ce-int_ex", "10")
104+
.header("ce-bool_ex", "true")
105+
.header("ce-time", &fixtures::time().to_rfc3339())
106+
.body(fixtures::json_data_binary());
107+
let (req, mut body) = req.split();
108+
let result = Event::from_request(&req, &mut body).await.unwrap();
109+
110+
assert_eq!(expected, result);
111+
}
112+
}

src/binding/poem/mod.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//! This module integrates the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with
2+
//! [Poem](https://docs.rs/poem/) to easily send and receive CloudEvents.
3+
//!
4+
//! To deserialize an HTTP request as CloudEvent
5+
//!
6+
//! To echo events:
7+
//!
8+
//! ```rust
9+
//! use cloudevents::Event;
10+
//! use poem_lib as poem;
11+
//! use poem::{handler, Route, post};
12+
//!
13+
//! #[handler]
14+
//! async fn index(event: Event) -> Event {
15+
//! println!("received cloudevent {}", &event);
16+
//! event
17+
//! }
18+
//!
19+
//! let app = Route::new().at("/", post(index));
20+
//! ```
21+
//!
22+
//! To create event inside request handlers and send them as responses:
23+
//!
24+
//! ```rust
25+
//! use cloudevents::{Event, EventBuilder, EventBuilderV10};
26+
//! use poem_lib as poem;
27+
//! use poem::{handler, Route, post, Result};
28+
//! use poem::error::InternalServerError;
29+
//! use serde_json::json;
30+
//!
31+
//! #[handler]
32+
//! async fn index() -> Result<Event> {
33+
//! let event = EventBuilderV10::new()
34+
//! .id("1")
35+
//! .source("url://example_response/")
36+
//! .ty("example.ce")
37+
//! .data(
38+
//! mime::APPLICATION_JSON.to_string(),
39+
//! json!({
40+
//! "name": "John Doe",
41+
//! "age": 43,
42+
//! "phones": [
43+
//! "+44 1234567",
44+
//! "+44 2345678"
45+
//! ]
46+
//! }),
47+
//! )
48+
//! .build()
49+
//! .map_err(InternalServerError)?;
50+
//! Ok(event)
51+
//! }
52+
//!
53+
//! let app = Route::new().at("/", post(index));
54+
//! ```
55+
56+
mod extractor;
57+
mod response;
58+
59+
pub use extractor::ParseEventError;

0 commit comments

Comments
 (0)