diff --git a/Cargo.toml b/Cargo.toml index 2165323..be6ad32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,13 +16,14 @@ categories = ["web-programming::http-server"] edition = "2018" [dependencies] +bytes = "0.5.3" chrono = "0.4.10" futures-util = "0.3.1" http = "0.2.0" hyper = "0.13.0" mime_guess = "2.0.1" percent-encoding = "2.1.0" -tokio = { version = "0.2.4", features = ["fs", "macros"] } +tokio = { version = "0.2.4", features = ["macros"] } url = "2.1.0" [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index c851493..25745d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,13 +71,13 @@ //! behavior using `ResponseBuilder` if necessary. //! //! The `ResponseBuilder` in turn uses `FileResponseBuilder` to serve files that are found. The -//! `FileResponseBuilder` can also be used directly if you have an existing open `tokio::fs::File` +//! `FileResponseBuilder` can also be used directly if you have an existing open `std::fs::File` //! and want to serve it. It takes care of basic headers, 'not modified' responses, and streaming //! the file in the body. //! //! Finally, there's `FileBytesStream`, which is used by `FileResponseBuilder` to stream the file. -//! This is a struct wrapping a `tokio::fs::File` and implementing a `futures::Stream` that -//! produces `Bytes`s. It can be used for streaming a file in custom response. +//! This is a struct wrapping a `std::fs::File` and implementing a `futures::Stream` that produces +//! `Bytes`s. It can be used for streaming a file in custom response. mod resolve; mod response_builder; diff --git a/src/resolve.rs b/src/resolve.rs index 1583c18..cde3c81 100644 --- a/src/resolve.rs +++ b/src/resolve.rs @@ -1,10 +1,9 @@ use crate::util::{open_with_metadata, RequestedPath}; use http::{Method, Request}; use mime_guess::{Mime, MimeGuess}; -use std::fs::Metadata; +use std::fs::{File, Metadata}; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::path::PathBuf; -use tokio::fs::File; /// The result of `resolve`. /// diff --git a/src/util/file_bytes_stream.rs b/src/util/file_bytes_stream.rs index efd3612..99cd2d0 100644 --- a/src/util/file_bytes_stream.rs +++ b/src/util/file_bytes_stream.rs @@ -1,40 +1,89 @@ +use bytes::{BufMut, BytesMut}; use futures_util::stream::Stream; use hyper::body::{Body, Bytes}; -use std::io::Error as IoError; +use std::fs::File; +use std::future::Future; +use std::io::{self, Read}; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; -use tokio::fs::File; -use tokio::prelude::AsyncRead; +use tokio::task::{spawn_blocking, JoinHandle}; const BUF_SIZE: usize = 8 * 1024; -/// Wraps a `tokio::fs::File`, and implements a stream of `Bytes`s. +/// Wraps a `std::fs::File`, and implements an async stream of `Bytes`s. pub struct FileBytesStream { - file: File, - buf: Box<[u8; BUF_SIZE]>, + file: Arc, + state: State, +} + +enum State { + Invalid, + Idle(BytesMut), + Busy(JoinHandle<(io::Result, BytesMut)>), } impl FileBytesStream { /// Create a new stream from the given file. pub fn new(file: File) -> FileBytesStream { - let buf = Box::new([0; BUF_SIZE]); - FileBytesStream { file, buf } + let file = Arc::new(file); + let state = State::Idle(BytesMut::new()); + FileBytesStream { file, state } } } impl Stream for FileBytesStream { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let Self { - ref mut file, - ref mut buf, - } = *self; - match Pin::new(file).poll_read(cx, &mut buf[..]) { - Poll::Ready(Ok(0)) => Poll::Ready(None), - Poll::Ready(Ok(size)) => Poll::Ready(Some(Ok(self.buf[..size].to_owned().into()))), - Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), - Poll::Pending => Poll::Pending, + loop { + let mut state = State::Invalid; + std::mem::swap(&mut state, &mut self.state); + match state { + State::Invalid => { + unreachable!(); + } + State::Idle(mut buf) => { + let file = self.file.clone(); + buf.reserve(BUF_SIZE); + self.state = State::Busy(spawn_blocking(move || { + let slice = buf.bytes_mut(); + let slice = unsafe { + std::slice::from_raw_parts_mut( + slice.as_mut_ptr() as *mut u8, + slice.len(), + ) + }; + let res = (&*file).read(slice); + (res, buf) + })); + } + State::Busy(mut op) => { + let (res, mut buf) = match Pin::new(&mut op).poll(cx) { + Poll::Ready(res) => res?, + Poll::Pending => { + self.state = State::Busy(op); + return Poll::Pending; + } + }; + match res { + Ok(0) => { + self.state = State::Idle(buf); + return Poll::Ready(None); + } + Ok(size) => { + unsafe { buf.advance_mut(size) }; + let retval = buf.split().freeze(); + self.state = State::Idle(buf); + return Poll::Ready(Some(Ok(retval))); + } + Err(e) => { + self.state = State::Idle(buf); + return Poll::Ready(Some(Err(e))); + } + } + } + } } } } diff --git a/src/util/file_response_builder.rs b/src/util/file_response_builder.rs index 4930fe3..1db9181 100644 --- a/src/util/file_response_builder.rs +++ b/src/util/file_response_builder.rs @@ -3,10 +3,9 @@ use chrono::{offset::Local as LocalTz, DateTime, SubsecRound}; use http::response::Builder as ResponseBuilder; use http::{header, HeaderMap, Method, Request, Response, Result, StatusCode}; use hyper::Body; -use std::fs::Metadata; -use tokio::fs::File; +use std::fs::{File, Metadata}; -/// Utility to build responses for serving a `tokio::fs::File`. +/// Utility to build responses for serving a `std::fs::File`. /// /// This struct allows direct access to its fields, but these fields are typically initialized by /// the accessors, using the builder pattern. The fields are basically a bunch of settings that diff --git a/src/util/open_with_metadata.rs b/src/util/open_with_metadata.rs index 0cc9c62..9e2e380 100644 --- a/src/util/open_with_metadata.rs +++ b/src/util/open_with_metadata.rs @@ -1,7 +1,7 @@ -use std::fs::{Metadata, OpenOptions as StdOpenOptions}; +use std::fs::{File, Metadata, OpenOptions}; use std::io::Error as IoError; use std::path::Path; -use tokio::fs::{File, OpenOptions}; +use tokio::task::spawn_blocking; #[cfg(windows)] use std::os::windows::fs::OpenOptionsExt; @@ -10,14 +10,17 @@ use winapi::um::winbase::FILE_FLAG_BACKUP_SEMANTICS; /// Open a file and get metadata. pub async fn open_with_metadata(path: impl AsRef) -> Result<(File, Metadata), IoError> { - let mut opts = StdOpenOptions::new(); - opts.read(true); + let path = path.as_ref().to_owned(); + spawn_blocking(move || { + let mut opts = OpenOptions::new(); + opts.read(true); - // On Windows, we need to set this flag to be able to open directories. - #[cfg(windows)] - opts.custom_flags(FILE_FLAG_BACKUP_SEMANTICS); + // On Windows, we need to set this flag to be able to open directories. + #[cfg(windows)] + opts.custom_flags(FILE_FLAG_BACKUP_SEMANTICS); - let file = OpenOptions::from(opts).open(path).await?; - let metadata = file.metadata().await?; - Ok((file, metadata)) + let file = opts.open(path)?; + let metadata = file.metadata()?; + Ok((file, metadata)) + }).await? }