Skip to content

Commit d1ff945

Browse files
authored
wasi-http: make the buffer and budget capacity of the OutgoingBody writer configurable (#9670)
1 parent abcd6ac commit d1ff945

File tree

5 files changed

+77
-26
lines changed

5 files changed

+77
-26
lines changed

crates/test-programs/src/bin/http_outbound_request_large_post.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ use std::io::{self, Read};
33
use test_programs::wasi::http::types::{Method, Scheme};
44

55
fn main() {
6-
// TODO: ensure more than 700 bytes is allowed without error
7-
const LEN: usize = 700;
6+
// Make sure the final body is larger than 1024*1024, but we cannot allocate
7+
// so much memory directly in the wasm program, so we use the `repeat`
8+
// method to increase the body size.
9+
const LEN: usize = 1024;
10+
const REPEAT: usize = 1025;
811
let mut buffer = [0; LEN];
912
let addr = std::env::var("HTTP_SERVER").unwrap();
1013
io::repeat(0b001).read_exact(&mut buffer).unwrap();
@@ -13,7 +16,7 @@ fn main() {
1316
Scheme::Http,
1417
&addr,
1518
"/post",
16-
Some(&buffer),
19+
Some(&buffer.repeat(REPEAT)),
1720
None,
1821
None,
1922
None,
@@ -26,5 +29,5 @@ fn main() {
2629
assert_eq!(res.status, 200);
2730
let method = res.header("x-wasmtime-test-method").unwrap();
2831
assert_eq!(std::str::from_utf8(method).unwrap(), "POST");
29-
assert_eq!(res.body.len(), LEN);
32+
assert_eq!(res.body.len(), LEN * REPEAT);
3033
}

crates/test-programs/src/http.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@ pub fn request(
7474
.body()
7575
.map_err(|_| anyhow!("outgoing request write failed"))?;
7676

77+
let options = http_types::RequestOptions::new();
78+
options
79+
.set_connect_timeout(connect_timeout)
80+
.map_err(|()| anyhow!("failed to set connect_timeout"))?;
81+
options
82+
.set_first_byte_timeout(first_by_timeout)
83+
.map_err(|()| anyhow!("failed to set first_byte_timeout"))?;
84+
options
85+
.set_between_bytes_timeout(between_bytes_timeout)
86+
.map_err(|()| anyhow!("failed to set between_bytes_timeout"))?;
87+
let options = Some(options);
88+
89+
let future_response = outgoing_handler::handle(request, options)?;
90+
7791
if let Some(mut buf) = body {
7892
let request_body = outgoing_body
7993
.write()
@@ -110,21 +124,6 @@ pub fn request(
110124
Err(_) => anyhow::bail!("output stream error"),
111125
};
112126
}
113-
114-
let options = http_types::RequestOptions::new();
115-
options
116-
.set_connect_timeout(connect_timeout)
117-
.map_err(|()| anyhow!("failed to set connect_timeout"))?;
118-
options
119-
.set_first_byte_timeout(first_by_timeout)
120-
.map_err(|()| anyhow!("failed to set first_byte_timeout"))?;
121-
options
122-
.set_between_bytes_timeout(between_bytes_timeout)
123-
.map_err(|()| anyhow!("failed to set between_bytes_timeout"))?;
124-
let options = Some(options);
125-
126-
let future_response = outgoing_handler::handle(request, options)?;
127-
128127
http_types::OutgoingBody::finish(outgoing_body, None)?;
129128

130129
let incoming_response = match future_response.get() {

crates/wasi-http/src/body.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,14 @@ pub struct HostOutgoingBody {
423423

424424
impl HostOutgoingBody {
425425
/// Create a new `HostOutgoingBody`
426-
pub fn new(context: StreamContext, size: Option<u64>) -> (Self, HyperOutgoingBody) {
426+
pub fn new(
427+
context: StreamContext,
428+
size: Option<u64>,
429+
buffer_chunks: usize,
430+
chunk_size: usize,
431+
) -> (Self, HyperOutgoingBody) {
432+
assert!(buffer_chunks >= 1);
433+
427434
let written = size.map(WrittenState::new);
428435

429436
use tokio::sync::oneshot::error::RecvError;
@@ -469,17 +476,16 @@ impl HostOutgoingBody {
469476
}
470477
}
471478

472-
let (body_sender, body_receiver) = mpsc::channel(2);
479+
// always add 1 buffer here because one empty slot is required
480+
let (body_sender, body_receiver) = mpsc::channel(buffer_chunks + 1);
473481
let (finish_sender, finish_receiver) = oneshot::channel();
474482
let body_impl = BodyImpl {
475483
body_receiver,
476484
finish_receiver: Some(finish_receiver),
477485
}
478486
.boxed();
479487

480-
// TODO: this capacity constant is arbitrary, and should be configurable
481-
let output_stream =
482-
BodyWriteStream::new(context, 1024 * 1024, body_sender, written.clone());
488+
let output_stream = BodyWriteStream::new(context, chunk_size, body_sender, written.clone());
483489

484490
(
485491
Self {

crates/wasi-http/src/types.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,19 @@ pub trait WasiHttpView: Send {
125125
fn is_forbidden_header(&mut self, _name: &HeaderName) -> bool {
126126
false
127127
}
128+
129+
/// Number of distinct write calls to the outgoing body's output-stream
130+
/// that the implementation will buffer.
131+
/// Default: 1.
132+
fn outgoing_body_buffer_chunks(&mut self) -> usize {
133+
1
134+
}
135+
136+
/// Maximum size allowed in a write call to the outgoing body's output-stream.
137+
/// Default: 1024 * 1024.
138+
fn outgoing_body_chunk_size(&mut self) -> usize {
139+
1024 * 1024
140+
}
128141
}
129142

130143
impl<T: ?Sized + WasiHttpView> WasiHttpView for &mut T {
@@ -156,6 +169,14 @@ impl<T: ?Sized + WasiHttpView> WasiHttpView for &mut T {
156169
fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
157170
T::is_forbidden_header(self, name)
158171
}
172+
173+
fn outgoing_body_buffer_chunks(&mut self) -> usize {
174+
T::outgoing_body_buffer_chunks(self)
175+
}
176+
177+
fn outgoing_body_chunk_size(&mut self) -> usize {
178+
T::outgoing_body_chunk_size(self)
179+
}
159180
}
160181

161182
impl<T: ?Sized + WasiHttpView> WasiHttpView for Box<T> {
@@ -187,6 +208,14 @@ impl<T: ?Sized + WasiHttpView> WasiHttpView for Box<T> {
187208
fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
188209
T::is_forbidden_header(self, name)
189210
}
211+
212+
fn outgoing_body_buffer_chunks(&mut self) -> usize {
213+
T::outgoing_body_buffer_chunks(self)
214+
}
215+
216+
fn outgoing_body_chunk_size(&mut self) -> usize {
217+
T::outgoing_body_chunk_size(self)
218+
}
190219
}
191220

192221
/// A concrete structure that all generated `Host` traits are implemented for.
@@ -233,6 +262,14 @@ impl<T: WasiHttpView> WasiHttpView for WasiHttpImpl<T> {
233262
fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
234263
self.0.is_forbidden_header(name)
235264
}
265+
266+
fn outgoing_body_buffer_chunks(&mut self) -> usize {
267+
self.0.outgoing_body_buffer_chunks()
268+
}
269+
270+
fn outgoing_body_chunk_size(&mut self) -> usize {
271+
self.0.outgoing_body_chunk_size()
272+
}
236273
}
237274

238275
/// Returns `true` when the header is forbidden according to this [`WasiHttpView`] implementation.

crates/wasi-http/src/types_impl.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,8 @@ where
391391
&mut self,
392392
request: Resource<HostOutgoingRequest>,
393393
) -> wasmtime::Result<Result<Resource<HostOutgoingBody>, ()>> {
394+
let buffer_chunks = self.outgoing_body_buffer_chunks();
395+
let chunk_size = self.outgoing_body_chunk_size();
394396
let req = self
395397
.table()
396398
.get_mut(&request)
@@ -405,7 +407,8 @@ where
405407
Err(e) => return Ok(Err(e)),
406408
};
407409

408-
let (host_body, hyper_body) = HostOutgoingBody::new(StreamContext::Request, size);
410+
let (host_body, hyper_body) =
411+
HostOutgoingBody::new(StreamContext::Request, size, buffer_chunks, chunk_size);
409412

410413
req.body = Some(hyper_body);
411414

@@ -751,6 +754,8 @@ where
751754
&mut self,
752755
id: Resource<HostOutgoingResponse>,
753756
) -> wasmtime::Result<Result<Resource<HostOutgoingBody>, ()>> {
757+
let buffer_chunks = self.outgoing_body_buffer_chunks();
758+
let chunk_size = self.outgoing_body_chunk_size();
754759
let resp = self.table().get_mut(&id)?;
755760

756761
if resp.body.is_some() {
@@ -762,7 +767,8 @@ where
762767
Err(e) => return Ok(Err(e)),
763768
};
764769

765-
let (host, body) = HostOutgoingBody::new(StreamContext::Response, size);
770+
let (host, body) =
771+
HostOutgoingBody::new(StreamContext::Response, size, buffer_chunks, chunk_size);
766772

767773
resp.body.replace(body);
768774

0 commit comments

Comments
 (0)