Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ categories = ["web-programming::http-server"]
edition = "2018"

[dependencies]
bytes = "0.5.3"
chrono = "0.4.10"
futures-util = "0.3.1"
http = "0.2.0"
hyper = "0.13.0"
mime_guess = "2.0.1"
percent-encoding = "2.1.0"
tokio = { version = "0.2.4", features = ["fs", "macros"] }
tokio = { version = "0.2.4", features = ["macros"] }
url = "2.1.0"

[dev-dependencies]
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@
//! behavior using `ResponseBuilder` if necessary.
//!
//! The `ResponseBuilder` in turn uses `FileResponseBuilder` to serve files that are found. The
//! `FileResponseBuilder` can also be used directly if you have an existing open `tokio::fs::File`
//! `FileResponseBuilder` can also be used directly if you have an existing open `std::fs::File`
//! and want to serve it. It takes care of basic headers, 'not modified' responses, and streaming
//! the file in the body.
//!
//! Finally, there's `FileBytesStream`, which is used by `FileResponseBuilder` to stream the file.
//! This is a struct wrapping a `tokio::fs::File` and implementing a `futures::Stream` that
//! produces `Bytes`s. It can be used for streaming a file in custom response.
//! This is a struct wrapping a `std::fs::File` and implementing a `futures::Stream` that produces
//! `Bytes`s. It can be used for streaming a file in custom response.

mod resolve;
mod response_builder;
Expand Down
3 changes: 1 addition & 2 deletions src/resolve.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::util::{open_with_metadata, RequestedPath};
use http::{Method, Request};
use mime_guess::{Mime, MimeGuess};
use std::fs::Metadata;
use std::fs::{File, Metadata};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::path::PathBuf;
use tokio::fs::File;

/// The result of `resolve`.
///
Expand Down
85 changes: 67 additions & 18 deletions src/util/file_bytes_stream.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,89 @@
use bytes::{BufMut, BytesMut};
use futures_util::stream::Stream;
use hyper::body::{Body, Bytes};
use std::io::Error as IoError;
use std::fs::File;
use std::future::Future;
use std::io::{self, Read};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::fs::File;
use tokio::prelude::AsyncRead;
use tokio::task::{spawn_blocking, JoinHandle};

const BUF_SIZE: usize = 8 * 1024;

/// Wraps a `tokio::fs::File`, and implements a stream of `Bytes`s.
/// Wraps a `std::fs::File`, and implements an async stream of `Bytes`s.
pub struct FileBytesStream {
file: File,
buf: Box<[u8; BUF_SIZE]>,
file: Arc<File>,
state: State,
}

enum State {
Invalid,
Idle(BytesMut),
Busy(JoinHandle<(io::Result<usize>, BytesMut)>),
}

impl FileBytesStream {
/// Create a new stream from the given file.
pub fn new(file: File) -> FileBytesStream {
let buf = Box::new([0; BUF_SIZE]);
FileBytesStream { file, buf }
let file = Arc::new(file);
let state = State::Idle(BytesMut::new());
FileBytesStream { file, state }
}
}

impl Stream for FileBytesStream {
type Item = Result<Bytes, IoError>;
type Item = Result<Bytes, io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let Self {
ref mut file,
ref mut buf,
} = *self;
match Pin::new(file).poll_read(cx, &mut buf[..]) {
Poll::Ready(Ok(0)) => Poll::Ready(None),
Poll::Ready(Ok(size)) => Poll::Ready(Some(Ok(self.buf[..size].to_owned().into()))),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
loop {
let mut state = State::Invalid;
std::mem::swap(&mut state, &mut self.state);
match state {
State::Invalid => {
unreachable!();
}
State::Idle(mut buf) => {
let file = self.file.clone();
buf.reserve(BUF_SIZE);
self.state = State::Busy(spawn_blocking(move || {
let slice = buf.bytes_mut();
let slice = unsafe {
std::slice::from_raw_parts_mut(
slice.as_mut_ptr() as *mut u8,
slice.len(),
)
};
let res = (&*file).read(slice);
(res, buf)
}));
}
State::Busy(mut op) => {
let (res, mut buf) = match Pin::new(&mut op).poll(cx) {
Poll::Ready(res) => res?,
Poll::Pending => {
self.state = State::Busy(op);
return Poll::Pending;
}
};
match res {
Ok(0) => {
self.state = State::Idle(buf);
return Poll::Ready(None);
}
Ok(size) => {
unsafe { buf.advance_mut(size) };
let retval = buf.split().freeze();
self.state = State::Idle(buf);
return Poll::Ready(Some(Ok(retval)));
}
Err(e) => {
self.state = State::Idle(buf);
return Poll::Ready(Some(Err(e)));
}
}
}
}
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/util/file_response_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use chrono::{offset::Local as LocalTz, DateTime, SubsecRound};
use http::response::Builder as ResponseBuilder;
use http::{header, HeaderMap, Method, Request, Response, Result, StatusCode};
use hyper::Body;
use std::fs::Metadata;
use tokio::fs::File;
use std::fs::{File, Metadata};

/// Utility to build responses for serving a `tokio::fs::File`.
/// Utility to build responses for serving a `std::fs::File`.
///
/// This struct allows direct access to its fields, but these fields are typically initialized by
/// the accessors, using the builder pattern. The fields are basically a bunch of settings that
Expand Down
23 changes: 13 additions & 10 deletions src/util/open_with_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fs::{Metadata, OpenOptions as StdOpenOptions};
use std::fs::{File, Metadata, OpenOptions};
use std::io::Error as IoError;
use std::path::Path;
use tokio::fs::{File, OpenOptions};
use tokio::task::spawn_blocking;

#[cfg(windows)]
use std::os::windows::fs::OpenOptionsExt;
Expand All @@ -10,14 +10,17 @@ use winapi::um::winbase::FILE_FLAG_BACKUP_SEMANTICS;

/// Open a file and get metadata.
pub async fn open_with_metadata(path: impl AsRef<Path>) -> Result<(File, Metadata), IoError> {
let mut opts = StdOpenOptions::new();
opts.read(true);
let path = path.as_ref().to_owned();
spawn_blocking(move || {
let mut opts = OpenOptions::new();
opts.read(true);

// On Windows, we need to set this flag to be able to open directories.
#[cfg(windows)]
opts.custom_flags(FILE_FLAG_BACKUP_SEMANTICS);
// On Windows, we need to set this flag to be able to open directories.
#[cfg(windows)]
opts.custom_flags(FILE_FLAG_BACKUP_SEMANTICS);

let file = OpenOptions::from(opts).open(path).await?;
let metadata = file.metadata().await?;
Ok((file, metadata))
let file = opts.open(path)?;
let metadata = file.metadata()?;
Ok((file, metadata))
}).await?
}