Skip to content

Commit 8ac663d

Browse files
authored
Merge pull request #2442 from aarroyoc/fix-http-at-end-of-stream
Fix at_end_of_stream/1 for http read stream
2 parents 9027126 + 08a9f7c commit 8ac663d

File tree

3 files changed

+17
-15
lines changed

3 files changed

+17
-15
lines changed

src/http.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::io::BufRead;
1+
use bytes::{Bytes, buf::Reader};
22
use std::sync::{Arc, Condvar, Mutex};
33

44
use warp::http;
@@ -19,5 +19,5 @@ pub struct HttpRequestData {
1919
pub headers: http::HeaderMap,
2020
pub path: String,
2121
pub query: String,
22-
pub body: Box<dyn BufRead + Send>,
22+
pub body: Reader<Bytes>,
2323
}

src/machine/streams.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::machine::machine_indices::*;
1212
use crate::machine::machine_state::*;
1313
use crate::types::*;
1414

15+
use bytes::Buf;
1516
pub use scryer_modular_bitfield::prelude::*;
1617

1718
use std::cmp::Ordering;
@@ -22,7 +23,7 @@ use std::fs::{File, OpenOptions};
2223
use std::hash::Hash;
2324
use std::io;
2425
#[cfg(feature = "http")]
25-
use std::io::BufRead;
26+
use bytes::{buf::Reader as BufReader, Bytes};
2627
use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
2728
use std::net::{Shutdown, TcpStream};
2829
use std::ops::{Deref, DerefMut};
@@ -274,7 +275,7 @@ impl Write for NamedTlsStream {
274275
#[cfg(feature = "http")]
275276
pub struct HttpReadStream {
276277
url: Atom,
277-
body_reader: Box<dyn BufRead>,
278+
body_reader: BufReader<Bytes>,
278279
}
279280

280281
#[cfg(feature = "http")]
@@ -1115,6 +1116,13 @@ impl Stream {
11151116
}
11161117
}
11171118
}
1119+
Stream::HttpRead(stream_layout) => {
1120+
if stream_layout.stream.get_ref().body_reader.get_ref().has_remaining() {
1121+
AtEndOfStream::Not
1122+
} else {
1123+
AtEndOfStream::Past
1124+
}
1125+
}
11181126
_ => AtEndOfStream::Not,
11191127
}
11201128
}
@@ -1203,7 +1211,7 @@ impl Stream {
12031211
#[inline]
12041212
pub(crate) fn from_http_stream(
12051213
url: Atom,
1206-
http_stream: Box<dyn BufRead>,
1214+
http_stream: BufReader<Bytes>,
12071215
arena: &mut Arena,
12081216
) -> Self {
12091217
Stream::HttpRead(arena_alloc!(

src/machine/system_calls.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ use std::env;
5050
use std::ffi::CString;
5151
use std::fs;
5252
use std::hash::{BuildHasher, BuildHasherDefault};
53-
#[cfg(feature = "http")]
54-
use std::io::BufRead;
5553
use std::io::{ErrorKind, Read, Write};
5654
use std::iter::{once, FromIterator};
5755
use std::mem;
@@ -4352,7 +4350,7 @@ impl Machine {
43524350

43534351
let mut stream = Stream::from_http_stream(
43544352
AtomTable::build_with(&self.machine_st.atom_tbl, &address_string),
4355-
Box::new(reader),
4353+
reader,
43564354
&mut self.machine_st.arena,
43574355
);
43584356
*stream.options_mut() = StreamOptions::default();
@@ -4447,11 +4445,7 @@ impl Machine {
44474445
let runtime = tokio::runtime::Handle::current();
44484446
let _guard = runtime.enter();
44494447

4450-
fn get_reader(body: impl Buf + Send + 'static) -> Box<dyn BufRead + Send> {
4451-
Box::new(body.reader())
4452-
}
4453-
4454-
let serve = warp::body::aggregate()
4448+
let serve = warp::body::bytes()
44554449
.and(warp::header::optional::<u64>(
44564450
warp::http::header::CONTENT_LENGTH.as_str(),
44574451
))
@@ -4462,7 +4456,7 @@ impl Machine {
44624456
future::ready(Ok::<(String,), warp::Rejection>(("".to_string(),)))
44634457
}))
44644458
.map(
4465-
move |body,
4459+
move |body: bytes::Bytes,
44664460
content_length,
44674461
method,
44684462
headers: warp::http::HeaderMap,
@@ -4482,7 +4476,7 @@ impl Machine {
44824476
headers,
44834477
path: path.as_str().to_string(),
44844478
query,
4485-
body: get_reader(body),
4479+
body: body.reader(),
44864480
};
44874481
let response =
44884482
Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new()));

0 commit comments

Comments
 (0)