Skip to content

Commit 6074b4d

Browse files
andrewwebberjcrossley3
authored andcommitted
Add Axum binding
Signed-off-by: andrew webber (personal) <[email protected]>
1 parent b540542 commit 6074b4d

File tree

14 files changed

+635
-6
lines changed

14 files changed

+635
-6
lines changed

.github/workflows/rust_tests.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,11 @@ jobs:
129129
command: build
130130
toolchain: ${{ matrix.toolchain }}
131131
args: --target ${{ matrix.target }} --manifest-path ./example-projects/warp-example/Cargo.toml
132+
133+
- uses: actions-rs/cargo@v1
134+
name: "Build axum-example"
135+
if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable'
136+
with:
137+
command: build
138+
toolchain: ${{ matrix.toolchain }}
139+
args: --target ${{ matrix.target }} --manifest-path ./example-projects/axum-example/Cargo.toml

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ actix = ["actix-web", "async-trait", "bytes", "futures", "http"]
2121
reqwest = ["reqwest-lib", "async-trait", "bytes", "http"]
2222
rdkafka = ["rdkafka-lib", "bytes", "futures"]
2323
warp = ["warp-lib", "bytes", "http", "hyper"]
24+
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
2425

2526
[dependencies]
2627
serde = { version = "^1.0", features = ["derive"] }
@@ -42,6 +43,8 @@ bytes = { version = "^1.0", optional = true }
4243
futures = { version = "^0.3", optional = true }
4344
http = { version = "0.2", optional = true }
4445
hyper = { version = "^0.14", optional = true }
46+
axum-lib = { version = "^0.2", optional = true , package="axum"}
47+
http-body = { version = "^0.4", optional = true}
4548

4649
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
4750
hostname = "^0.3"
@@ -65,3 +68,4 @@ chrono = { version = "^0.4", features = ["serde"] }
6568
mockito = "0.25.1"
6669
tokio = { version = "^1.0", features = ["full"] }
6770
mime = "0.3"
71+
tower = { version = "0.4", features = ["util"] }
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "axum-example"
3+
version = "0.3.0"
4+
authors = ["Andrew Webber <[email protected]>"]
5+
edition = "2018"
6+
7+
[dependencies]
8+
cloudevents-sdk = { path = "../..", features = ["axum"] }
9+
axum = "^0.2"
10+
http = "^0.2"
11+
tokio = { version = "^1", features = ["full"] }
12+
tracing = "^0.1"
13+
tracing-subscriber = "^0.2"
14+
tower-http = { version = "^0.1", features = ["trace"] }
15+
16+
[dev-dependencies]
17+
tower = { version = "^0.4", features = ["util"] }
18+
serde = { version = "^1.0", features = ["derive"] }
19+
serde_json = "^1.0"
20+
chrono = { version = "^0.4", features = ["serde"] }
21+
hyper = { version = "^0.14" }
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
use axum::{
2+
handler::{get, post},
3+
routing::BoxRoute,
4+
Router,
5+
};
6+
use cloudevents::Event;
7+
use http::StatusCode;
8+
use std::net::SocketAddr;
9+
use tower_http::trace::TraceLayer;
10+
11+
fn echo_app() -> Router<BoxRoute> {
12+
Router::new()
13+
.route("/", get(|| async { "hello from cloudevents server" }))
14+
.route(
15+
"/",
16+
post(|event: Event| async move {
17+
tracing::debug!("received cloudevent {}", &event);
18+
(StatusCode::OK, event)
19+
}),
20+
)
21+
.layer(TraceLayer::new_for_http())
22+
.boxed()
23+
}
24+
25+
#[tokio::main]
26+
async fn main() {
27+
if std::env::var("RUST_LOG").is_err() {
28+
std::env::set_var("RUST_LOG", "axum_example=debug,tower_http=debug")
29+
}
30+
tracing_subscriber::fmt::init();
31+
let service = echo_app();
32+
let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
33+
tracing::debug!("listening on {}", addr);
34+
axum::Server::bind(&addr)
35+
.serve(service.into_make_service())
36+
.await
37+
.unwrap();
38+
}
39+
40+
#[cfg(test)]
41+
mod tests {
42+
43+
use super::echo_app;
44+
45+
use axum::{
46+
body::Body,
47+
http::{self, Request},
48+
};
49+
use chrono::Utc;
50+
use hyper;
51+
use serde_json::json;
52+
use tower::ServiceExt; // for `app.oneshot()`
53+
54+
#[tokio::test]
55+
async fn axum_mod_test() {
56+
if std::env::var("RUST_LOG").is_err() {
57+
std::env::set_var("RUST_LOG", "axum_example=debug,tower_http=debug")
58+
}
59+
tracing_subscriber::fmt::init();
60+
61+
let app = echo_app();
62+
let time = Utc::now();
63+
let j = json!({"hello": "world"});
64+
let request = Request::builder()
65+
.method(http::Method::POST)
66+
.header("ce-specversion", "1.0")
67+
.header("ce-id", "0001")
68+
.header("ce-type", "example.test")
69+
.header("ce-source", "http://localhost/")
70+
.header("ce-someint", "10")
71+
.header("ce-time", time.to_rfc3339())
72+
.header("content-type", "application/json")
73+
.body(Body::from(serde_json::to_vec(&j).unwrap()))
74+
.unwrap();
75+
76+
let resp = app.oneshot(request).await.unwrap();
77+
assert_eq!(
78+
resp.headers()
79+
.get("ce-specversion")
80+
.unwrap()
81+
.to_str()
82+
.unwrap(),
83+
"1.0"
84+
);
85+
assert_eq!(
86+
resp.headers().get("ce-id").unwrap().to_str().unwrap(),
87+
"0001"
88+
);
89+
assert_eq!(
90+
resp.headers().get("ce-type").unwrap().to_str().unwrap(),
91+
"example.test"
92+
);
93+
assert_eq!(
94+
resp.headers().get("ce-source").unwrap().to_str().unwrap(),
95+
"http://localhost/"
96+
);
97+
assert_eq!(
98+
resp.headers()
99+
.get("content-type")
100+
.unwrap()
101+
.to_str()
102+
.unwrap(),
103+
"application/json"
104+
);
105+
assert_eq!(
106+
resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
107+
"10"
108+
);
109+
110+
let (_, body) = resp.into_parts();
111+
let body = hyper::body::to_bytes(body).await.unwrap();
112+
113+
assert_eq!(j.to_string().as_bytes(), body);
114+
}
115+
}

src/binding/axum/extract.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
use axum_lib as axum;
2+
3+
use async_trait::async_trait;
4+
use axum::extract::{FromRequest, RequestParts};
5+
use http::StatusCode;
6+
use http_body::Body;
7+
use hyper::body;
8+
9+
use crate::binding::http::to_event;
10+
use crate::event::Event;
11+
12+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
13+
14+
#[async_trait]
15+
impl<B> FromRequest<B> for Event
16+
where
17+
B: Body + Send,
18+
B::Data: Send,
19+
B::Error: Into<BoxError>,
20+
{
21+
type Rejection = (StatusCode, String);
22+
23+
async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
24+
let headers = req.headers().cloned().ok_or(0).map_err(|_| {
25+
(
26+
StatusCode::BAD_REQUEST,
27+
"unexpected empty headers".to_string(),
28+
)
29+
})?;
30+
31+
let req_body = req
32+
.take_body()
33+
.ok_or(0)
34+
.map_err(|_| (StatusCode::BAD_REQUEST, "unexpected empty body".to_string()))?;
35+
36+
let buf = body::to_bytes(req_body)
37+
.await
38+
.map_err(|e| (StatusCode::BAD_REQUEST, format!("{}", e.into())))?;
39+
to_event(&headers, buf.to_vec()).map_err(|e| (StatusCode::BAD_REQUEST, format!("{}", e)))
40+
}
41+
}
42+
43+
#[cfg(test)]
44+
mod tests {
45+
use axum_lib as axum;
46+
47+
use super::*;
48+
use axum::body::Body;
49+
use axum::http::{self, Request, StatusCode};
50+
use chrono::Utc;
51+
use serde_json::json;
52+
53+
use crate::{EventBuilder, EventBuilderV10};
54+
55+
#[tokio::test]
56+
async fn axum_test_request() {
57+
let time = Utc::now();
58+
let expected = EventBuilderV10::new()
59+
.id("0001")
60+
.ty("example.test")
61+
.source("http://localhost/")
62+
.time(time)
63+
.extension("someint", "10")
64+
.build()
65+
.unwrap();
66+
67+
let mut request = RequestParts::new(
68+
Request::builder()
69+
.method(http::Method::POST)
70+
.header("ce-specversion", "1.0")
71+
.header("ce-id", "0001")
72+
.header("ce-type", "example.test")
73+
.header("ce-source", "http://localhost/")
74+
.header("ce-someint", "10")
75+
.header("ce-time", time.to_rfc3339())
76+
.body(Body::empty())
77+
.unwrap(),
78+
);
79+
80+
let result = Event::from_request(&mut request).await.unwrap();
81+
82+
assert_eq!(expected, result);
83+
}
84+
85+
#[tokio::test]
86+
async fn axum_test_bad_request() {
87+
let time = Utc::now();
88+
89+
let mut request = RequestParts::new(
90+
Request::builder()
91+
.method(http::Method::POST)
92+
.header("ce-specversion", "BAD SPECIFICATION")
93+
.header("ce-id", "0001")
94+
.header("ce-type", "example.test")
95+
.header("ce-source", "http://localhost/")
96+
.header("ce-someint", "10")
97+
.header("ce-time", time.to_rfc3339())
98+
.body(Body::empty())
99+
.unwrap(),
100+
);
101+
102+
let result = Event::from_request(&mut request).await;
103+
assert!(result.is_err());
104+
let rejection = result.unwrap_err();
105+
106+
let reason = rejection.0;
107+
assert_eq!(reason, StatusCode::BAD_REQUEST)
108+
}
109+
110+
#[tokio::test]
111+
async fn axum_test_request_with_full_data() {
112+
let time = Utc::now();
113+
let j = json!({"hello": "world"});
114+
115+
let expected = EventBuilderV10::new()
116+
.id("0001")
117+
.ty("example.test")
118+
.source("http://localhost")
119+
.time(time)
120+
.data("application/json", j.to_string().into_bytes())
121+
.extension("someint", "10")
122+
.build()
123+
.unwrap();
124+
125+
let mut request = RequestParts::new(
126+
Request::builder()
127+
.method(http::Method::POST)
128+
.header("ce-specversion", "1.0")
129+
.header("ce-id", "0001")
130+
.header("ce-type", "example.test")
131+
.header("ce-source", "http://localhost")
132+
.header("ce-someint", "10")
133+
.header("ce-time", time.to_rfc3339())
134+
.header("content-type", "application/json")
135+
.body(Body::from(serde_json::to_vec(&j).unwrap()))
136+
.unwrap(),
137+
);
138+
139+
let result = Event::from_request(&mut request).await.unwrap();
140+
141+
assert_eq!(expected, result);
142+
}
143+
}

0 commit comments

Comments
 (0)