Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{error, fmt, io, result};

use bytes::Bytes;
use derive_more::Display;
use time::OffsetDateTime;
use url::Url;

use crate::*;
Expand All @@ -45,6 +46,7 @@ pub mod s3;
/// support streaming or partial reads and writes.
#[derive(Clone)]
pub struct Transport {
/// The concrete protocol implementation: local, S3, etc.
protocol: Arc<dyn Protocol + 'static>,
}

Expand Down Expand Up @@ -84,7 +86,7 @@ impl Transport {
_other => {
return Err(Error {
kind: ErrorKind::UrlScheme,
path: Some(url.as_str().to_owned()),
url: Some(url.clone()),
source: None,
})
}
Expand Down Expand Up @@ -174,9 +176,18 @@ pub enum WriteMode {

trait Protocol: Send + Sync {
fn read_file(&self, path: &str) -> Result<Bytes>;

/// Write a complete file.
///
/// Depending on the [WriteMode] this may either overwrite existing files, or error.
///
/// As much as possible, the file should be written atomically so that it is only visible with
/// the complete content.
fn write_file(&self, relpath: &str, content: &[u8], mode: WriteMode) -> Result<()>;
fn list_dir(&self, relpath: &str) -> Result<ListDir>;
fn create_dir(&self, relpath: &str) -> Result<()>;

/// Get metadata about a file.
fn metadata(&self, relpath: &str) -> Result<Metadata>;

/// Delete a file.
Expand Down Expand Up @@ -212,6 +223,9 @@ pub struct Metadata {

/// Kind of file.
pub kind: Kind,

/// Last modified time.
pub modified: OffsetDateTime,
}

/// A list of all the files and directories in a directory.
Expand All @@ -225,11 +239,11 @@ pub struct ListDir {
#[derive(Debug)]
pub struct Error {
/// What type of generally known error?
kind: ErrorKind,
pub kind: ErrorKind,
/// The underlying error: for example an IO or S3 error.
source: Option<Box<dyn error::Error + Send + Sync>>,
/// The affected path, possibly relative to the transport.
path: Option<String>,
pub source: Option<Box<dyn error::Error + Send + Sync>>,
/// The affected URL, if known.
pub url: Option<Url>,
}

/// General categories of transport errors.
Expand All @@ -244,6 +258,12 @@ pub enum ErrorKind {
#[display(fmt = "Permission denied")]
PermissionDenied,

#[display(fmt = "Create transport error")]
CreateTransport,

#[display(fmt = "Connect error")]
Connect,

#[display(fmt = "Unsupported URL scheme")]
UrlScheme,

Expand All @@ -268,28 +288,35 @@ impl Error {
}

pub(self) fn io_error(path: &Path, source: io::Error) -> Error {
let kind = match source.kind() {
io::ErrorKind::NotFound => ErrorKind::NotFound,
io::ErrorKind::AlreadyExists => ErrorKind::AlreadyExists,
io::ErrorKind::PermissionDenied => ErrorKind::PermissionDenied,
_ => ErrorKind::Other,
};

Error {
kind: source.kind().into(),
source: Some(Box::new(source)),
path: Some(path.to_string_lossy().to_string()),
url: Url::from_file_path(path).ok(),
kind,
}
}

pub fn is_not_found(&self) -> bool {
self.kind == ErrorKind::NotFound
}

/// The transport-relative path where this error occurred, if known.
pub fn path(&self) -> Option<&str> {
self.path.as_deref()
/// The URL where this error occurred, if known.
pub fn url(&self) -> Option<&Url> {
self.url.as_ref()
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.kind)?;
if let Some(ref path) = self.path {
write!(f, ": {}", path)?;
if let Some(ref url) = self.url {
write!(f, ": {url}")?;
}
if let Some(source) = &self.source {
// I'm not sure we should write this here; it might be repetitive.
Expand Down
27 changes: 19 additions & 8 deletions src/transport/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,14 @@ impl super::Protocol for Protocol {
fn metadata(&self, relpath: &str) -> Result<Metadata> {
let path = self.full_path(relpath);
let fsmeta = path.metadata().map_err(|err| Error::io_error(&path, err))?;
let modified = fsmeta
.modified()
.map_err(|err| Error::io_error(&path, err))?
.into();
Ok(Metadata {
len: fsmeta.len(),
kind: fsmeta.file_type().into(),
modified,
})
}

Expand Down Expand Up @@ -164,9 +169,11 @@ impl super::Protocol for Protocol {
#[cfg(test)]
mod test {
use std::error::Error;
use std::time::Duration;

use assert_fs::prelude::*;
use predicates::prelude::*;
use time::OffsetDateTime;

use super::*;
use crate::kind::Kind;
Expand Down Expand Up @@ -200,7 +207,12 @@ mod test {
assert!(message.contains("Not found"));
assert!(message.contains("nonexistent.json"));

assert!(err.path().expect("path").ends_with("nonexistent.json"));
assert!(err
.url
.as_ref()
.expect("url")
.path()
.ends_with("/nonexistent.json"));
assert_eq!(err.kind(), transport::ErrorKind::NotFound);
assert!(err.is_not_found());

Expand All @@ -218,13 +230,12 @@ mod test {

let transport = Transport::local(temp.path());

assert_eq!(
transport.metadata(filename).unwrap(),
Metadata {
len: 24,
kind: Kind::File
}
);
let metadata = transport.metadata(filename).unwrap();
dbg!(&metadata);

assert_eq!(metadata.len, 24);
assert_eq!(metadata.kind, Kind::File);
assert!(metadata.modified + Duration::from_secs(60) > OffsetDateTime::now_utc());
assert!(transport.metadata("nopoem").unwrap_err().is_not_found());
}

Expand Down
67 changes: 38 additions & 29 deletions src/transport/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
// cargo mutants -f s3.rs --no-config -C --features=s3,s3-integration-test

use std::fmt;
use std::path::Path;
use std::sync::Arc;
use std::time::SystemTime;

use aws_config::{AppName, BehaviorVersion};
use aws_sdk_s3::error::SdkError;
Expand Down Expand Up @@ -73,7 +73,11 @@ impl Protocol {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| Error::io_error(Path::new(""), err))?;
.map_err(|source| Error {
kind: ErrorKind::CreateTransport,
url: Some(url.to_owned()),
source: Some(Box::new(source)),
})?;

let bucket = url.authority().to_owned();
assert!(!bucket.is_empty(), "S3 bucket name is empty in {url:?}");
Expand Down Expand Up @@ -121,6 +125,24 @@ impl Protocol {
fn join_path(&self, relpath: &str) -> String {
join_paths(&self.base_path, relpath)
}

fn s3_error<E, R>(&self, key: &str, source: SdkError<E, R>) -> Error
where
E: std::error::Error + Send + Sync + 'static,
R: std::fmt::Debug + Send + Sync + 'static,
ErrorKind: for<'a> From<&'a E>,
{
debug!(s3_error = ?source);
let kind = match &source {
SdkError::ServiceError(service_err) => ErrorKind::from(service_err.err()),
_ => ErrorKind::Other,
};
Error {
kind,
url: self.url.join(key).ok(),
source: Some(source.into()),
}
}
}

impl fmt::Debug for Protocol {
Expand Down Expand Up @@ -221,7 +243,7 @@ impl super::Protocol for Protocol {
result.files.push(name.to_owned());
}
}
Some(Err(err)) => return Err(s3_error(prefix, err)),
Some(Err(err)) => return Err(self.s3_error(&prefix, err)),
None => break,
}
}
Expand All @@ -240,13 +262,13 @@ impl super::Protocol for Protocol {
let response = self
.runtime
.block_on(request.send())
.map_err(|source| s3_error(key.clone(), source))?;
.map_err(|source| self.s3_error(&key, source))?;
let body_bytes = self
.runtime
.block_on(response.body.collect())
.map_err(|source| Error {
kind: ErrorKind::Other,
path: Some(key.clone()),
url: self.url.join(relpath).ok(),
source: Some(Box::new(source)),
})?
.into_bytes();
Expand Down Expand Up @@ -279,7 +301,7 @@ impl super::Protocol for Protocol {
}
let response = self.runtime.block_on(request.send());
// trace!(?response);
response.map_err(|err| s3_error(key, err))?;
response.map_err(|err| self.s3_error(&key, err))?;
trace!(body_len = content.len(), "wrote file");
Ok(())
}
Expand All @@ -290,7 +312,7 @@ impl super::Protocol for Protocol {
let request = self.client.delete_object().bucket(&self.bucket).key(&key);
let response = self.runtime.block_on(request.send());
trace!(?response);
response.map_err(|err| s3_error(key, err))?;
response.map_err(|err| self.s3_error(&key, err))?;
trace!("deleted file");
Ok(())
}
Expand All @@ -311,7 +333,7 @@ impl super::Protocol for Protocol {
let mut n_files = 0;
while let Some(response) = self.runtime.block_on(stream.next()) {
for object in response
.map_err(|err| s3_error(prefix.clone(), err))?
.map_err(|err| self.s3_error(&prefix, err))?
.contents
.expect("ListObjectsV2Response has contents")
{
Expand All @@ -324,7 +346,7 @@ impl super::Protocol for Protocol {
.key(&key)
.send(),
)
.map_err(|err| s3_error(key, err))?;
.map_err(|err| self.s3_error(&key, err))?;
n_files += 1;
}
}
Expand All @@ -346,14 +368,20 @@ impl super::Protocol for Protocol {
.expect("S3 HeadObject response should have a content_length")
.try_into()
.expect("Content length non-negative");
let modified: SystemTime = response
.last_modified
.expect("S3 HeadObject response should have a last_modified")
.try_into()
.expect("S3 last_modified is valid SystemTime");
trace!(?len, "File exists");
Ok(Metadata {
kind: Kind::File,
len,
modified: modified.into(),
})
}
Err(err) => {
let translated = s3_error(key, err);
let translated = self.s3_error(&key, err);
if translated.is_not_found() {
trace!("file does not exist");
} else {
Expand All @@ -380,25 +408,6 @@ impl super::Protocol for Protocol {
}
}

fn s3_error<K, E, R>(key: K, source: SdkError<E, R>) -> Error
where
K: ToOwned<Owned = String>,
E: std::error::Error + Send + Sync + 'static,
R: std::fmt::Debug + Send + Sync + 'static,
ErrorKind: for<'a> From<&'a E>,
{
debug!(s3_error = ?source);
let kind = match &source {
SdkError::ServiceError(service_err) => ErrorKind::from(service_err.err()),
_ => ErrorKind::Other,
};
Error {
kind,
path: Some(key.to_owned()),
source: Some(source.into()),
}
}

impl From<&GetObjectError> for ErrorKind {
fn from(source: &GetObjectError) -> Self {
match source {
Expand Down
Loading
Loading