Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cratetorrent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ hex = "0.4"
lazy_static = "1.4"
log = "0.4"
lru = "0.6"
nix = "0.19"
percent-encoding = "2.1"
reqwest = "0.10"
serde = "1.0"
Expand All @@ -32,6 +31,10 @@ sha-1 = "0.9"
tokio = { version = "0.2", features = ["blocking", "macros", "rt-threaded", "stream", "sync", "tcp", "time"] }
tokio-util = { version = "0.3", features = ["codec"] }
url = "2.2"
cfg-if = "1.0"

[target.'cfg(target_os = "linux")'.dependencies]
nix = "0.19"

[dev-dependencies]
mockito = "0.28"
Expand Down
12 changes: 12 additions & 0 deletions cratetorrent/src/disk/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ pub(crate) enum WriteError {
Io(std::io::Error),
}

impl From<std::io::Error> for WriteError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}

impl fmt::Display for WriteError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Expand All @@ -72,6 +78,12 @@ pub(crate) enum ReadError {
Io(std::io::Error),
}

impl From<std::io::Error> for ReadError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}

impl fmt::Display for ReadError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Expand Down
211 changes: 184 additions & 27 deletions cratetorrent/src/disk/io/file.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,62 @@
use std::{
fs::{File, OpenOptions},
os::unix::io::AsRawFd,
path::Path,
io::ErrorKind
};

use nix::sys::uio::{preadv, pwritev};

use crate::{
disk::error::*,
iovecs,
iovecs::{IoVec, IoVecs},
storage_info::FileSlice,
FileInfo,
};

use cfg_if::cfg_if;

#[cfg(target_os = "linux")]
use crate::iovecs::{self, IoVec, IoVecs};


// a convenience macro for handling ErrorKind::Interrupted,
// zero-byte reads and the like
macro_rules! unwrap_read_res {
($res:expr) => {
match $res {
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Err(ReadError::MissingData),
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e.into()),

// If there was nothing to read from file, it means we tried to
// read a piece from a portion of a file not yet downloaded or
// otherwise missing
Ok(0) => return Err(ReadError::MissingData),

Ok(written) => written,
}
}
}

// a convenience macro for handling ErrorKind::Interrupted,
// zero-byte writes and the like
macro_rules! unwrap_write_res {
($res:expr) => {
match $res {
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e.into()),

// Write syscalls never return 0 these days (unless buf.len == 0).
// If that happened, something went terribly wrong;
// io::ErrorKind even has a dedicated variant for it!
// Anyway, we don't expect them to happen.
Ok(0) => {
return Err(std::io::Error::new(ErrorKind::WriteZero, "failed to write whole buffer").into());
}

Ok(written) => written,
}
}
}


pub(crate) struct TorrentFile {
pub info: FileInfo,
pub handle: File,
Expand Down Expand Up @@ -46,7 +89,7 @@ impl TorrentFile {
}

/// Writes to file at most the slice length number of bytes of blocks at the
/// file slice's offset, using pwritev, called repeteadly until all blocks are
/// file slice's offset, using pwritev, called repeatedly until all blocks are
/// written to disk.
///
/// It returns the slice of blocks that weren't written to disk. That is, it
Expand All @@ -58,11 +101,15 @@ impl TorrentFile {
///
/// Since the syscall may be invoked repeatedly to perform disk IO, this
/// means that this operation is not guaranteed to be atomic.
pub fn write<'a>(
#[cfg(target_os = "linux")]
pub fn write_vectored<'a>(
&self,
file_slice: FileSlice,
blocks: &'a mut [IoVec<&'a [u8]>],
) -> Result<&'a mut [IoVec<&'a [u8]>], WriteError> {
use std::os::unix::io::AsRawFd;
use nix::sys::uio::{pwritev};

let mut iovecs = IoVecs::bounded(blocks, file_slice.len as usize);
// the write buffer cannot be larger than the file slice we want to
// write to
Expand All @@ -80,16 +127,18 @@ impl TorrentFile {
// transferred to disk (or an error occurs)
let mut total_write_count = 0;
while !iovecs.as_slice().is_empty() {
let write_count = pwritev(
let write_res = pwritev(
self.handle.as_raw_fd(),
iovecs.as_slice(),
file_slice.offset as i64,
)
.map_err(|e| {
log::warn!("File {:?} write error: {}", self.info.path, e);
// FIXME: convert actual error here
WriteError::Io(std::io::Error::last_os_error())
})?;
.map_err(|sys_err| {
let err = convert_nix_error(sys_err);
log::warn!("File {:?} write error: {}", self.info.path, err);
err
});

let write_count = unwrap_write_res!(write_res);

// tally up the total write count
total_write_count += write_count;
Expand All @@ -109,8 +158,21 @@ impl TorrentFile {
Ok(iovecs.into_tail())
}

pub fn write<'a>(&self, file_slice: FileSlice, blocks: &'a [u8]) -> Result<&'a [u8], WriteError> {
let len = file_slice.len as usize;
let content = &blocks[..len];

write_all_at(&self.handle, content, file_slice.offset)
.map_err(|err| {
log::warn!("File {:?} write error: {}", self.info.path, err);
err
})?;

Ok(&blocks[len..])
}

/// Reads from file at most the slice length number of bytes of blocks at
/// the file slice's offset, using preadv, called repeteadly until all
/// the file slice's offset, using preadv, called repeatedly until all
/// blocks are read from disk.
///
/// It returns the slice of block buffers that weren't filled by the
Expand All @@ -122,11 +184,15 @@ impl TorrentFile {
///
/// Since the syscall may be invoked repeatedly to perform disk IO, this
/// means that this operation is not guaranteed to be atomic.
pub fn read<'a>(
#[cfg(target_os = "linux")]
pub fn read_vectored<'a>(
&self,
file_slice: FileSlice,
mut iovecs: &'a mut [IoVec<&'a mut [u8]>],
) -> Result<&'a mut [IoVec<&'a mut [u8]>], ReadError> {
use std::os::unix::io::AsRawFd;
use nix::sys::uio::preadv;

// This is simpler than the write implementation as the preadv method
// stops reading in from the file if reaching EOF. We do need to advance
// the iovecs read buffer cursor after a read as we may want to read
Expand All @@ -138,23 +204,18 @@ impl TorrentFile {
// transferred to disk (or an error occurs)
let mut total_read_count = 0;
while !iovecs.is_empty() && (total_read_count as u64) < file_slice.len {
let read_count = preadv(
let read_res = preadv(
self.handle.as_raw_fd(),
iovecs,
file_slice.offset as i64,
)
.map_err(|e| {
log::warn!("File {:?} read error: {}", self.info.path, e);
// FIXME: convert actual error here
ReadError::Io(std::io::Error::last_os_error())
})?;
.map_err(|sys_err| {
let err = convert_nix_error(sys_err);
log::warn!("File {:?} read error: {}", self.info.path, err);
err
});

// if there was nothing to read from file it means we tried to
// read a piece from a portion of a file not yet downloaded or
// otherwise missing
if read_count == 0 {
return Err(ReadError::MissingData);
}
let read_count = unwrap_read_res!(read_res);

// tally up the total read count
total_read_count += read_count;
Expand All @@ -166,4 +227,100 @@ impl TorrentFile {

Ok(iovecs)
}

pub fn read<'a>(&self, file_slice: FileSlice, blocks: &'a mut [u8]) -> Result<&'a mut [u8], ReadError> {
let len = file_slice.len as usize;

read_exact_at(&self.handle, &mut blocks[..len], file_slice.offset)
.map_err(|err| {
log::warn!("File {:?} read error: {}", self.info.path, err);
err
})?;

Ok(&mut blocks[len..])
}
}

#[cfg(target_os = "linux")]
fn convert_nix_error(err: nix::Error) -> std::io::Error {
match err {
nix::Error::Sys(errno) => std::io::Error::from_raw_os_error(errno as i32),

// preadv/pwrite never return them, but better safe than sorry
nix::Error::InvalidPath |
nix::Error::InvalidUtf8 |
nix::Error::UnsupportedOperation => {
log::warn!("Unexpected nix::Error kind ({}), falling back to last_os_error", err);
std::io::Error::last_os_error()
},
}
}

// Here's where the most effective implementation of `write_at/read_at` is chosen.
// The trick is, linux-specific APIs do not modify the "seek cursor", while Windows' and others' do.
// This discrepancy doesn't matter because nothing in this crate relies on this cursor
// (the read/write offset is always provided explicitly).
//
// FIXME: On the other hand, if `TorrentInfo.handle` will ever become exposed in our public API,
// this "jumping cursor" can potentially be observed by the outside world. It shouldn't be
// a problem either because, well, I fail to see why someone would want to interact with
// the file in a way where the cursor's positioning matters. But, at least, this should be
// clearly documented and the user must be warned to perform his own seeks if need for
// unaccounted reads/writes presents itself...
cfg_if! {
if #[cfg(any(unix, target_os = "redox", target_os = "vxworks", target_os = "hermit"))] {
use std::os::unix::fs::FileExt;

#[inline]
fn write_all_at(file: &File, buf: &[u8], offset: u64) -> Result<(), WriteError> {
file.write_all_at(buf, offset).map_err(Into::into)
}

#[inline]
fn read_exact_at(file: &File, buf: &mut [u8], offset: u64) -> Result<(), ReadError> {
file.read_exact_at(buf, offset)
.map_err(|e| match e.kind() {
ErrorKind::UnexpectedEof => ReadError::MissingData,
_ => e.into()
})

}
} else if #[cfg(windows)] {
use std::os::windows::fs::FileExt;

fn write_all_at(file: &File, mut buf: &[u8], mut offset: u64) -> Result<(), WriteError> {
while !buf.is_empty() {
let written = unwrap_write_res!(file.seek_write(buf, offset));
offset += written;
buf = &buf[written..];
}

Ok(())
}

fn read_exact_at(file: &File, mut buf: &mut [u8], mut offset: u64) -> Result<(), ReadError> {
while !buf.is_empty() {
let read = unwrap_read_res!(file.seek_read(buf, offset));
offset += read;
buf = &mut buf[read..];
}

Ok(())
}
} else {
fn write_all_at(file: &File, buf: &[u8], offset: u64) -> Result<(), WriteError> {
file.seek(SeekFrom(offset))?;
Copy link
Owner

@vimpunk vimpunk Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a potential problem here for a future optimization.

While currently files are write locked when written, I think we can actually parallelize writes: because each piece maps to a disjunct portion of a file, there would be no clashes among writes.

If different pieces were written to the same file at the same time, there would be multiple calls to seek which I believe would be a race condition.

I think it's ok to leave as is for now with a note in the TorrentFile::write doc cautioning against the above.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that reads to the same file are already done concurrently, so concurrent seeks could already be an issue there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I'll be thinking what to do, probably ditching the "generic" version and using platform specific APIs for each of the supported platforms..

file.write_all(buf).map_err(Into::into)
}

fn read_exact_at(file: &File, buf: &mut [u8], offset: u64) -> Result<(), ReadError> {
file.seek(SeekFrom(offset))?;
file.read_exact(buf).map_err(|e| match e.kind() {
ErrorKind::UnexpectedEof => ReadError::MissingData,
_ => e.into()
})
}
}
}


Loading