Skip to content
This repository was archived by the owner on Jul 14, 2025. It is now read-only.

Commit fd92a9e

Browse files
committed
Reinstate HTTP headers
1 parent b3b3f89 commit fd92a9e

File tree

3 files changed

+97
-47
lines changed

3 files changed

+97
-47
lines changed

src/dispatcher.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl RoutingTable {
5858
let request_context = RequestContext {
5959
client_addr,
6060
};
61-
let response = rte.handle_request(&parts, data, &request_context, &self.global_context);
61+
let response = rte.handle_request(&parts, data, &request_context, &self.global_context).await;
6262
Ok(response)
6363
},
6464
Err(_) => Ok(not_found()),
@@ -149,7 +149,7 @@ impl RoutingTableEntry {
149149
// TODO: I don't think this rightly belongs here. But
150150
// reasonable place to at least understand the decomposition and
151151
// dependencies.
152-
pub fn handle_request(
152+
pub async fn handle_request(
153153
&self,
154154
req: &Parts,
155155
body: Vec<u8>,
@@ -159,7 +159,7 @@ impl RoutingTableEntry {
159159
match &self.handler_info {
160160
RouteHandler::HealthCheck => Response::new(Body::from("OK")),
161161
RouteHandler::Wasm(w) => {
162-
let response = w.handle_request(&self.route_pattern, req, body, request_context, global_context, self.unique_key());
162+
let response = w.handle_request(&self.route_pattern, req, body, request_context, global_context, self.unique_key()).await;
163163
match response {
164164
Ok(res) => res,
165165
Err(e) => {

src/handlers.rs

Lines changed: 32 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::{collections::HashMap};
2-
use std::sync::{Arc, RwLock};
32

43
use cap_std::fs::Dir;
54
use hyper::{
@@ -16,6 +15,7 @@ use crate::dispatcher::RoutePattern;
1615
use crate::http_util::{internal_error, parse_cgi_headers};
1716
use crate::request::{RequestContext, RequestGlobalContext};
1817

18+
use crate::stream_writer::StreamWriter;
1919
use crate::wasm_module::WasmModuleSource;
2020
use crate::wasm_runner::{prepare_stdio_streams_for_http, prepare_wasm_instance, run_prepared_wasm_instance};
2121

@@ -36,7 +36,7 @@ pub struct WasmRouteHandler {
3636
}
3737

3838
impl WasmRouteHandler {
39-
pub fn handle_request(
39+
pub async fn handle_request(
4040
&self,
4141
matched_route: &RoutePattern,
4242
req: &Parts,
@@ -45,7 +45,25 @@ impl WasmRouteHandler {
4545
global_context: &RequestGlobalContext,
4646
logging_key: String,
4747
) -> Result<Response<Body>, anyhow::Error> {
48+
49+
// These broken-out functions are slightly artificial but help solve some lifetime
50+
// issues (where otherwise you get errors about things not being Send across an
51+
// await).
52+
let (stream_writer, instance, store) =
53+
self.set_up_runtime_environment(matched_route, req, request_body, request_context, global_context, logging_key)?;
54+
self.spawn_wasm_instance(instance, store, stream_writer.clone());
55+
56+
let response = compose_response(stream_writer).await?; // TODO: handle errors
57+
58+
// TODO: c'mon man
59+
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
60+
61+
Ok(response)
62+
}
63+
64+
fn set_up_runtime_environment(&self, matched_route: &RoutePattern, req: &Parts, request_body: Vec<u8>, request_context: &RequestContext, global_context: &RequestGlobalContext, logging_key: String) -> anyhow::Result<(crate::stream_writer::StreamWriter, Instance, Store<WasiCtx>)> {
4865
let startup_span = tracing::info_span!("module instantiation").entered();
66+
4967
let headers = crate::http_util::build_headers(
5068
matched_route,
5169
req,
@@ -57,33 +75,25 @@ impl WasmRouteHandler {
5775
);
5876

5977
let stream_writer = crate::stream_writer::StreamWriter::new();
60-
6178
let redirects = prepare_stdio_streams_for_http(request_body, stream_writer.clone(), global_context, logging_key)?;
62-
6379
let ctx = self.build_wasi_context_for_request(req, headers, redirects.streams)?;
64-
6580
let (store, instance) = self.prepare_wasm_instance(global_context, ctx)?;
66-
67-
// Drop manually to get instantiation time
81+
6882
drop(startup_span);
83+
84+
Ok((stream_writer, instance, store))
85+
}
6986

87+
fn spawn_wasm_instance(&self, instance: Instance, store: Store<WasiCtx>, mut stream_writer: StreamWriter) {
7088
let entrypoint = self.entrypoint.clone();
7189
let wasm_module_name = self.wasm_module_name.clone();
72-
let mut sw = stream_writer.clone();
90+
7391
tokio::spawn(async move {
7492
match run_prepared_wasm_instance(instance, store, &entrypoint, &wasm_module_name) {
75-
Ok(()) => sw.done().unwrap(), // TODO: <--
93+
Ok(()) => stream_writer.done().unwrap(), // TODO: <--
7694
Err(e) => tracing::error!("oh no {}", e), // TODO: behaviour? message? MESSAGE, IVAN?!
7795
};
7896
});
79-
80-
let response = Response::new(Body::wrap_stream(stream_writer.as_stream()));
81-
// TODO: c'mon man
82-
std::thread::sleep(std::time::Duration::from_millis(10));
83-
84-
// TODO: headers headers headers
85-
86-
Ok(response)
8797
}
8898

8999
fn build_wasi_context_for_request(&self, req: &Parts, headers: HashMap<String, String>, redirects: crate::wasm_module::IOStreamRedirects<crate::stream_writer::StreamWriter>) -> Result<WasiCtx, Error> {
@@ -132,34 +142,12 @@ impl WasmRouteHandler {
132142
}
133143
}
134144

135-
pub fn compose_response(stdout_mutex: Arc<RwLock<Vec<u8>>>) -> Result<Response<Body>, Error> {
136-
// Okay, once we get here, all the information we need to send back in the response
137-
// should be written to the STDOUT buffer. We fetch that, format it, and send
138-
// it back. In the process, we might need to alter the status code of the result.
139-
//
140-
// This is a little janky, but basically we are looping through the output once,
141-
// looking for the double-newline that distinguishes the headers from the body.
142-
// The headers can then be parsed separately, while the body can be sent back
143-
// to the client.
144-
145-
let out = stdout_mutex.read().unwrap();
146-
let mut last = 0;
147-
let mut scan_headers = true;
148-
let mut buffer: Vec<u8> = Vec::new();
149-
let mut out_headers: Vec<u8> = Vec::new();
150-
out.iter().for_each(|i| {
151-
if scan_headers && *i == 10 && last == 10 {
152-
out_headers.append(&mut buffer);
153-
buffer = Vec::new();
154-
scan_headers = false;
155-
return; // Consume the linefeed
156-
}
157-
last = *i;
158-
buffer.push(*i)
159-
});
160-
let mut res = Response::new(Body::from(buffer));
145+
pub async fn compose_response(mut stream_writer: StreamWriter) -> anyhow::Result<Response<Body>> {
146+
let header_block = stream_writer.header_block().await?;
147+
let mut res = Response::new(Body::wrap_stream(stream_writer.as_stream()));
148+
161149
let mut sufficient_response = false;
162-
parse_cgi_headers(String::from_utf8(out_headers)?)
150+
parse_cgi_headers(String::from_utf8(header_block)?)
163151
.iter()
164152
.for_each(|h| {
165153
use hyper::header::{CONTENT_TYPE, LOCATION};

src/stream_writer.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,24 @@ impl StreamWriter {
5454
}
5555
}
5656

57+
pub async fn header_block(&mut self) -> anyhow::Result<Vec<u8>> {
58+
loop {
59+
match self.pending.write().as_deref_mut() {
60+
Ok(pending) => match split_at_two_newlines(&pending) {
61+
None => (),
62+
Some((header_block, rest)) => {
63+
*pending = rest;
64+
return Ok(header_block);
65+
}
66+
},
67+
Err(e) => {
68+
return Err(anyhow::anyhow!("Internal error: StreamWriter::header_block can't take lock: {}", e));
69+
},
70+
}
71+
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
72+
}
73+
}
74+
5775
pub fn as_stream(mut self) -> impl futures_core::stream::Stream<Item = anyhow::Result<Vec<u8>>> {
5876
stream! {
5977
loop {
@@ -132,3 +150,47 @@ impl Write for StreamWriter {
132150
Ok(())
133151
}
134152
}
153+
154+
fn split_at_two_newlines(source: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
155+
let mut buffer = vec![];
156+
let mut last: u8 = 0;
157+
for value in source {
158+
if *value == 10 && last == 10 {
159+
let rest_slice = &source[(buffer.len() + 1)..];
160+
let rest = Vec::from(rest_slice);
161+
return Some((buffer, rest));
162+
} else {
163+
buffer.push(*value);
164+
last = *value;
165+
}
166+
}
167+
None
168+
}
169+
170+
#[cfg(test)]
171+
mod test {
172+
use super::*;
173+
174+
#[test]
175+
fn splits_at_two_newlines_if_pair_only() {
176+
let source: Vec<u8> = vec![0x41, 0x42, 0x0a, 0x0a, 0x43, 0x44];
177+
let result = split_at_two_newlines(&source).expect("did not split at all");
178+
assert_eq!(vec![0x41, 0x42, 0x0a], result.0);
179+
assert_eq!(vec![0x43, 0x44], result.1);
180+
}
181+
182+
#[test]
183+
fn doesnt_splits_at_two_newlines_if_no_pair() {
184+
let source: Vec<u8> = vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a, 0x45, 0x46];
185+
let result = split_at_two_newlines(&source);
186+
assert_eq!(None, result);
187+
}
188+
189+
#[test]
190+
fn splits_at_two_newlines_empty_rest_if_at_end() {
191+
let source: Vec<u8> = vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a, 0x0a];
192+
let result = split_at_two_newlines(&source).expect("did not split at all");
193+
assert_eq!(vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a], result.0);
194+
assert!(result.1.is_empty());
195+
}
196+
}

0 commit comments

Comments
 (0)