Skip to content

Commit 73b61a6

Browse files
committed
Connect ASGI types to http_handler types
1 parent 202b153 commit 73b61a6

File tree

11 files changed

+573
-156
lines changed

11 files changed

+573
-156
lines changed

src/asgi/http.rs

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use pyo3::exceptions::PyValueError;
22
use pyo3::prelude::*;
33
use pyo3::types::{PyAny, PyDict};
4+
use http_handler::{Request, RequestExt, Version};
5+
use std::net::SocketAddr;
46

57
use crate::asgi::{AsgiInfo, HttpMethod, HttpVersion};
68

@@ -62,6 +64,88 @@ pub struct HttpConnectionScope {
6264
state: Option<Py<PyDict>>,
6365
}
6466

67+
impl HttpConnectionScope {
68+
/// Create a new HttpConnectionScope from an http::Request
69+
pub fn from_request(request: &Request) -> Self {
70+
// Extract HTTP version
71+
let http_version = match request.version() {
72+
Version::HTTP_09 => HttpVersion::V1_0, // fallback for HTTP/0.9
73+
Version::HTTP_10 => HttpVersion::V1_0,
74+
Version::HTTP_11 => HttpVersion::V1_1,
75+
Version::HTTP_2 => HttpVersion::V2_0,
76+
Version::HTTP_3 => HttpVersion::V2_0, // treat HTTP/3 as HTTP/2 for ASGI
77+
_ => HttpVersion::V1_1, // default fallback
78+
};
79+
80+
// Extract method
81+
let method = HttpMethod::from(request.method().as_str());
82+
83+
// Extract scheme from URI or default to http
84+
let scheme = request
85+
.uri()
86+
.scheme_str()
87+
.unwrap_or("http")
88+
.to_string();
89+
90+
// Extract path
91+
let path = request.uri().path().to_string();
92+
93+
// Extract raw path (same as path for now, as we don't have the raw bytes)
94+
let raw_path = path.clone();
95+
96+
// Extract query string
97+
let query_string = request
98+
.uri()
99+
.query()
100+
.unwrap_or("")
101+
.to_string();
102+
103+
// Extract root path (default to empty)
104+
let root_path = String::new();
105+
106+
// Convert headers
107+
let headers: Vec<(String, String)> = request
108+
.headers()
109+
.iter()
110+
.map(|(name, value)| {
111+
(
112+
name.as_str().to_lowercase(),
113+
value.to_str().unwrap_or("").to_string(),
114+
)
115+
})
116+
.collect();
117+
118+
// Extract client and server from socket info if available
119+
let (client, server) = if let Some(socket_info) = request.socket_info() {
120+
let client = socket_info.remote.map(|addr| match addr {
121+
SocketAddr::V4(v4) => (v4.ip().to_string(), v4.port()),
122+
SocketAddr::V6(v6) => (v6.ip().to_string(), v6.port()),
123+
});
124+
let server = socket_info.local.map(|addr| match addr {
125+
SocketAddr::V4(v4) => (v4.ip().to_string(), v4.port()),
126+
SocketAddr::V6(v6) => (v6.ip().to_string(), v6.port()),
127+
});
128+
(client, server)
129+
} else {
130+
(None, None)
131+
};
132+
133+
HttpConnectionScope {
134+
http_version,
135+
method,
136+
scheme,
137+
path,
138+
raw_path,
139+
query_string,
140+
root_path,
141+
headers,
142+
client,
143+
server,
144+
state: None,
145+
}
146+
}
147+
}
148+
65149
impl<'py> IntoPyObject<'py> for HttpConnectionScope {
66150
type Target = PyDict;
67151
type Output = Bound<'py, Self::Target>;
@@ -233,12 +317,39 @@ impl<'py> FromPyObject<'py> for HttpSendMessage {
233317
})?
234318
.extract()?;
235319

236-
let headers: Vec<(String, String)> = dict
320+
let headers_py = dict
237321
.get_item("headers")?
238322
.ok_or_else(|| {
239323
PyValueError::new_err("Missing 'headers' key in HTTP response start message")
240-
})?
241-
.extract()?;
324+
})?;
325+
326+
// Convert headers from list of lists to vec of tuples
327+
let mut headers: Vec<(String, String)> = Vec::new();
328+
if let Ok(headers_list) = headers_py.downcast::<pyo3::types::PyList>() {
329+
for item in headers_list.iter() {
330+
if let Ok(header_pair) = item.downcast::<pyo3::types::PyList>() {
331+
if header_pair.len() == 2 {
332+
let name = header_pair.get_item(0)?;
333+
let value = header_pair.get_item(1)?;
334+
335+
// Convert bytes to string
336+
let name_str = if let Ok(bytes) = name.downcast::<pyo3::types::PyBytes>() {
337+
String::from_utf8_lossy(bytes.as_bytes()).to_string()
338+
} else {
339+
name.extract::<String>()?
340+
};
341+
342+
let value_str = if let Ok(bytes) = value.downcast::<pyo3::types::PyBytes>() {
343+
String::from_utf8_lossy(bytes.as_bytes()).to_string()
344+
} else {
345+
value.extract::<String>()?
346+
};
347+
348+
headers.push((name_str, value_str));
349+
}
350+
}
351+
}
352+
}
242353

243354
let trailers: bool = dict
244355
.get_item("trailers")?

src/asgi/http_method.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,23 @@ impl From<HttpMethod> for String {
114114
}
115115
}
116116

117+
impl From<&str> for HttpMethod {
118+
fn from(method: &str) -> Self {
119+
match method.to_uppercase().as_str() {
120+
"GET" => HttpMethod::Get,
121+
"POST" => HttpMethod::Post,
122+
"PUT" => HttpMethod::Put,
123+
"DELETE" => HttpMethod::Delete,
124+
"PATCH" => HttpMethod::Patch,
125+
"HEAD" => HttpMethod::Head,
126+
"OPTIONS" => HttpMethod::Options,
127+
"TRACE" => HttpMethod::Trace,
128+
"CONNECT" => HttpMethod::Connect,
129+
_ => HttpMethod::Get, // Default to GET for unknown methods
130+
}
131+
}
132+
}
133+
117134
#[cfg(test)]
118135
mod tests {
119136
use super::*;

src/asgi/mod.rs

Lines changed: 147 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
use pyo3::prelude::*;
2+
use http_handler::{Request, Response};
3+
use bytes::BytesMut;
4+
use tokio::sync::oneshot;
25

36
mod http;
47
mod http_method;
@@ -23,45 +26,168 @@ pub use websocket::{
2326
WebSocketConnectionScope, WebSocketReceiveMessage, WebSocketSendException, WebSocketSendMessage,
2427
};
2528

26-
fn execute_asgi_http_scope(
29+
pub async fn execute_asgi_http_scope(
2730
py_func: PyObject,
28-
scope: HttpConnectionScope,
29-
py: Python,
30-
) -> PyResult<()> {
31+
request: Request,
32+
) -> PyResult<Response> {
33+
// Create the ASGI scope from the HTTP request
34+
let scope = HttpConnectionScope::from_request(&request);
35+
36+
// Create channels for ASGI communication
3137
let (rx_receiver, rx) = Receiver::http();
3238
let (tx_sender, mut tx) = Sender::http();
3339

34-
tokio::spawn(async move {
40+
// Channel to receive the response data
41+
let (response_tx, response_rx) = oneshot::channel::<(u16, Vec<(String, String)>, Vec<u8>)>();
42+
43+
// Channel to signal Python execution completion
44+
let (python_done_tx, python_done_rx) = oneshot::channel::<Result<(), String>>();
45+
46+
// Task to handle messages from the ASGI app
47+
let response_task = tokio::spawn(async move {
48+
let mut status = 500u16;
49+
let mut headers = Vec::new();
50+
let mut body = Vec::new();
51+
let mut response_started = false;
52+
3553
loop {
3654
if let Some(msg) = tx.recv().await {
37-
println!("received ASGI message: {msg:#?}")
55+
match msg {
56+
HttpSendMessage::HttpResponseStart {
57+
status: s,
58+
headers: h,
59+
trailers: _
60+
} => {
61+
status = s;
62+
headers = h;
63+
response_started = true;
64+
}
65+
HttpSendMessage::HttpResponseBody {
66+
body: b,
67+
more_body
68+
} => {
69+
if response_started {
70+
body.extend_from_slice(&b);
71+
if !more_body {
72+
// Response is complete
73+
let _ = response_tx.send((status, headers, body));
74+
break;
75+
}
76+
}
77+
}
78+
}
79+
} else {
80+
// Channel closed
81+
break;
3882
}
3983
}
4084
});
4185

86+
// Send the request body to the ASGI app
87+
let request_body = request.body().clone();
4288
tokio::spawn(async move {
4389
let request_message = HttpReceiveMessage::Request {
44-
body: b"Hello, world!".to_vec(),
90+
body: request_body.to_vec(),
4591
more_body: false,
4692
};
4793

48-
rx.send(request_message).unwrap();
49-
50-
// Simulate disconnection after sending the request
51-
let disconnect_message = HttpReceiveMessage::Disconnect;
52-
rx.send(disconnect_message).unwrap();
94+
if rx.send(request_message).is_err() {
95+
eprintln!("Failed to send request message");
96+
}
5397
});
5498

55-
let py_func = py_func.clone_ref(py);
99+
// Run the ASGI app in a Python thread
100+
tokio::task::spawn_blocking(move || {
101+
Python::with_gil(|py| {
102+
let scope_py = scope.into_pyobject(py).unwrap();
56103

57-
Python::with_gil(|py| {
58-
if let Err(_e) = py_func.call1(
59-
py,
60-
(scope.into_pyobject(py).unwrap(), rx_receiver, tx_sender),
61-
) {
62-
// Log or handle error
63-
}
104+
// Call the ASGI app to get a coroutine
105+
let coroutine = match py_func.call1(py, (scope_py, rx_receiver, tx_sender)) {
106+
Ok(coro) => coro,
107+
Err(e) => {
108+
eprintln!("Failed to call ASGI app: {}", e);
109+
let _ = python_done_tx.send(Err(format!("Failed to call ASGI app: {}", e)));
110+
return;
111+
}
112+
};
113+
114+
// Run the coroutine using asyncio
115+
let asyncio = match py.import("asyncio") {
116+
Ok(module) => module,
117+
Err(e) => {
118+
eprintln!("Failed to import asyncio: {}", e);
119+
let _ = python_done_tx.send(Err(format!("Failed to import asyncio: {}", e)));
120+
return;
121+
}
122+
};
123+
124+
// Create a new event loop for this request
125+
let loop_ = match asyncio.call_method0("new_event_loop") {
126+
Ok(loop_) => loop_,
127+
Err(e) => {
128+
eprintln!("Failed to create event loop: {}", e);
129+
let _ = python_done_tx.send(Err(format!("Failed to create event loop: {}", e)));
130+
return;
131+
}
132+
};
133+
134+
// Set it as the current event loop
135+
if let Err(e) = asyncio.call_method1("set_event_loop", (&loop_,)) {
136+
eprintln!("Failed to set event loop: {}", e);
137+
let _ = python_done_tx.send(Err(format!("Failed to set event loop: {}", e)));
138+
return;
139+
}
140+
141+
// Run the coroutine
142+
let result = loop_.call_method1("run_until_complete", (coroutine,));
143+
144+
// Close the loop
145+
let _ = loop_.call_method0("close");
146+
147+
// Send the result
148+
let _ = python_done_tx.send(result.map(|_| ()).map_err(|e| {
149+
format!("Failed to run ASGI coroutine: {}", e)
150+
}));
151+
});
64152
});
65153

66-
Ok(())
154+
// Wait for either the response or Python completion
155+
let result = tokio::select! {
156+
response = response_rx => {
157+
response.map_err(|_| pyo3::exceptions::PyRuntimeError::new_err("Failed to receive response"))
158+
}
159+
python_result = python_done_rx => {
160+
match python_result {
161+
Ok(Ok(())) => {
162+
// Python completed but no response was sent
163+
Err(pyo3::exceptions::PyRuntimeError::new_err("ASGI app completed without sending response"))
164+
}
165+
Ok(Err(e)) => {
166+
// Python failed with error
167+
Err(pyo3::exceptions::PyRuntimeError::new_err(e))
168+
}
169+
Err(_) => {
170+
// Channel closed unexpectedly
171+
Err(pyo3::exceptions::PyRuntimeError::new_err("Python execution failed unexpectedly"))
172+
}
173+
}
174+
}
175+
};
176+
177+
let (status, headers, body) = result?;
178+
179+
// Clean up the response task
180+
response_task.abort();
181+
182+
// Build the HTTP response
183+
let mut builder = http_handler::response::Builder::new()
184+
.status(status);
185+
186+
for (name, value) in headers {
187+
builder = builder.header(&name, &value);
188+
}
189+
190+
builder
191+
.body(BytesMut::from(&body[..]))
192+
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("Failed to build response: {}", e)))
67193
}

0 commit comments

Comments
 (0)