Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Required parameter are only `--s3-url` and `--s3-bucket`.
export RUSTUS_DATA_DIR="./data/"
export RUSTUS_DIR_STRUCTURE="{year}/{month}/{day}"
export RUSTUS_FORCE_FSYNC="true"
export RUSTUS_S3_FORCE_PATH_STYLE="true"

rustus
```
Expand Down
95 changes: 52 additions & 43 deletions src/data_storage/impls/file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{fs::File, io::Write, path::PathBuf};
use actix_files::NamedFile;
use actix_web::{HttpRequest, HttpResponse};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use log::error;
use std::{
fs::{remove_file, DirBuilder, OpenOptions},
Expand All @@ -13,7 +14,7 @@ use crate::{
data_storage::base::DataStorage,
errors::{RustusError, RustusResult},
file_info::FileInfo,
utils::dir_struct::substr_now,
utils::dir_struct::substr_time,
};
use derive_more::Display;

Expand All @@ -34,7 +35,11 @@ impl FileDataStorage {
}
}

pub fn data_file_path(&self, file_id: &str) -> RustusResult<PathBuf> {
pub fn data_file_path(
&self,
file_id: &str,
created_at: DateTime<Utc>,
) -> RustusResult<PathBuf> {
let dir = self
.data_dir
// We're working wit absolute paths, because tus.io says so.
Expand All @@ -43,7 +48,7 @@ impl FileDataStorage {
error!("{}", err);
RustusError::UnableToWrite(err.to_string())
})?
.join(substr_now(self.dir_struct.as_str()));
.join(substr_time(self.dir_struct.as_str(), created_at));
DirBuilder::new()
.recursive(true)
.create(dir.as_path())
Expand Down Expand Up @@ -99,7 +104,7 @@ impl DataStorage for FileDataStorage {
if file_info.path.is_none() {
return Err(RustusError::FileNotFound);
}
let path = file_info.path.as_ref().unwrap().clone();
let path = self.data_file_path(&file_info.id, file_info.created_at)?;
let force_sync = self.force_fsync;
tokio::task::spawn_blocking(move || {
// Opening file in w+a mode.
Expand All @@ -110,7 +115,7 @@ impl DataStorage for FileDataStorage {
.create(false)
.read(false)
.truncate(false)
.open(path.as_str())
.open(path)
.map_err(|err| {
error!("{:?}", err);
RustusError::UnableToWrite(err.to_string())
Expand All @@ -130,7 +135,7 @@ impl DataStorage for FileDataStorage {

async fn create_file(&self, file_info: &FileInfo) -> RustusResult<String> {
// New path to file.
let file_path = self.data_file_path(file_info.id.as_str())?;
let file_path = self.data_file_path(file_info.id.as_str(), file_info.created_at)?;
tokio::task::spawn_blocking(move || {
// Creating new file.
OpenOptions::new()
Expand All @@ -154,7 +159,11 @@ impl DataStorage for FileDataStorage {
parts_info: Vec<FileInfo>,
) -> RustusResult<()> {
let force_fsync = self.force_fsync;
let path = file_info.path.as_ref().unwrap().clone();
let path = self.data_file_path(&file_info.id, file_info.created_at)?;
let part_paths = parts_info
.iter()
.map(|info| self.data_file_path(&info.id, info.created_at))
.collect::<Result<Vec<PathBuf>, _>>()?;
tokio::task::spawn_blocking(move || {
let file = OpenOptions::new()
.append(true)
Expand All @@ -165,13 +174,8 @@ impl DataStorage for FileDataStorage {
RustusError::UnableToWrite(err.to_string())
})?;
let mut writer = BufWriter::new(file);
for part in parts_info {
if part.path.is_none() {
return Err(RustusError::FileNotFound);
}
let part_file = OpenOptions::new()
.read(true)
.open(part.path.as_ref().unwrap())?;
for part in part_paths {
let part_file = OpenOptions::new().read(true).open(part)?;
let mut reader = BufReader::new(part_file);
copy(&mut reader, &mut writer)?;
}
Expand All @@ -186,13 +190,13 @@ impl DataStorage for FileDataStorage {

async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> {
let info = file_info.clone();
let file_path = self.data_file_path(&info.id, info.created_at)?;
tokio::task::spawn_blocking(move || {
// Let's remove the file itself.
let data_path = PathBuf::from(info.path.as_ref().unwrap().clone());
if !data_path.exists() {
if !file_path.exists() {
return Err(RustusError::FileNotFound);
}
remove_file(data_path).map_err(|err| {
remove_file(file_path).map_err(|err| {
error!("{:?}", err);
RustusError::UnableToRemove(info.id.clone())
})?;
Expand Down Expand Up @@ -315,36 +319,41 @@ mod tests {
let storage = FileDataStorage::new(dir.into_path(), String::new(), false);

let mut parts = Vec::new();
let part1_path = storage.data_dir.as_path().join("part1");
let mut part1 = File::create(part1_path.clone()).unwrap();
let size1 = part1.write(b"hello ").unwrap();
let mut part1 = FileInfo::new("part_id1", None, None, storage.to_string(), None);
part1.path = Some(
storage
.data_file_path(&part1.id, part1.created_at)
.unwrap()
.display()
.to_string(),
);
let mut part1_file = File::create(part1.path.clone().unwrap()).unwrap();
part1.length = Some(part1_file.write(b"hello ").unwrap());

parts.push(FileInfo::new(
"part_id1",
Some(size1),
Some(part1_path.display().to_string()),
storage.to_string(),
None,
));
parts.push(part1);

let part2_path = storage.data_dir.as_path().join("part2");
let mut part2 = File::create(part2_path.clone()).unwrap();
let size2 = part2.write(b"world").unwrap();
parts.push(FileInfo::new(
"part_id2",
Some(size2),
Some(part2_path.display().to_string()),
storage.to_string(),
None,
));
let mut part2 = FileInfo::new("part_id2", None, None, storage.to_string(), None);
part2.path = Some(
storage
.data_file_path(&part2.id, part2.created_at)
.unwrap()
.display()
.to_string(),
);
let mut part2_file = File::create(part2.path.clone().unwrap()).unwrap();
part2.length = Some(part2_file.write(b"world").unwrap());

let final_info = FileInfo::new(
"final_id",
None,
Some(storage.data_dir.join("final_info").display().to_string()),
storage.to_string(),
None,
parts.push(part2);

let mut final_info = FileInfo::new("final_id", None, None, storage.to_string(), None);
final_info.path = Some(
storage
.data_file_path(&final_info.id, final_info.created_at)
.unwrap()
.display()
.to_string(),
);

storage.concat_files(&final_info, parts).await.unwrap();
let mut final_file = File::open(final_info.path.unwrap()).unwrap();
let mut buffer = String::new();
Expand Down
36 changes: 13 additions & 23 deletions src/data_storage/impls/s3_hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use s3::{
command::Command,
request::{tokio_backend::HyperRequest, Request as S3Request},
Expand Down Expand Up @@ -101,23 +102,26 @@
if file_info.path.is_none() {
return Err(RustusError::UnableToWrite("Cannot get upload path.".into()));
}
let s3_path = self.get_s3_key(file_info);
let local_path = self
.local_storage
.data_file_path(&file_info.id, file_info.created_at)?;
let s3_path = self.get_s3_key(&file_info.id, file_info.created_at);
log::debug!(
"Starting uploading {} to S3 with key `{}`",
file_info.id,
s3_path,
);
let file = tokio::fs::File::open(file_info.path.clone().unwrap()).await?;
let file = tokio::fs::File::open(local_path).await?;
let mut reader = tokio::io::BufReader::new(file);
self.bucket.put_object_stream(&mut reader, s3_path).await?;
Ok(())
}

// Construct an S3 key which is used to upload files.
fn get_s3_key(&self, file_info: &FileInfo) -> String {
let base_path = substr_time(self.dir_struct.as_str(), file_info.created_at);
fn get_s3_key(&self, id: &str, created_at: DateTime<Utc>) -> String {
let base_path = substr_time(self.dir_struct.as_str(), created_at);
let trimmed_path = base_path.trim_end_matches('/');
format!("{trimmed_path}/{}", file_info.id)
format!("{trimmed_path}/{}", id)

Check warning on line 124 in src/data_storage/impls/s3_hybrid.rs

View workflow job for this annotation

GitHub Actions / clippy

variables can be used directly in the `format!` string

warning: variables can be used directly in the `format!` string --> src/data_storage/impls/s3_hybrid.rs:124:9 | 124 | format!("{trimmed_path}/{}", id) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#uninlined_format_args note: the lint level is defined here --> src/main.rs:5:5 | 5 | clippy::pedantic, | ^^^^^^^^^^^^^^^^ = note: `#[warn(clippy::uninlined_format_args)]` implied by `#[warn(clippy::pedantic)]` help: change this to | 124 - format!("{trimmed_path}/{}", id) 124 + format!("{trimmed_path}/{id}") |
}
}

Expand All @@ -129,21 +133,6 @@
Ok(())
}

// async fn get_contents(
// &self,
// file_info: &FileInfo,
// request: &HttpRequest,
// ) -> RustusResult<HttpResponse> {
// if file_info.length != Some(file_info.offset) {
// log::debug!("File isn't uploaded. Returning from local storage.");
// return self.local_storage.get_contents(file_info, request).await;
// }
// let key = self.get_s3_key(file_info);
// let command = Command::GetObject;
// let s3_request = Reqwest::new(&self.bucket, &key, command);
// let s3_response = s3_request.response().await?;
// }

async fn get_contents(
&self,
file_info: &FileInfo,
Expand All @@ -153,7 +142,7 @@
log::debug!("File isn't uploaded. Returning from local storage.");
return self.local_storage.get_contents(file_info, request).await;
}
let key = self.get_s3_key(file_info);
let key = self.get_s3_key(&file_info.id, file_info.created_at);
let command = Command::GetObject;
let s3_request = HyperRequest::new(&self.bucket, &key, command).await?;
let s3_response = s3_request.response_data_to_stream().await?;
Expand All @@ -175,7 +164,8 @@
}

async fn create_file(&self, file_info: &FileInfo) -> RustusResult<String> {
self.local_storage.create_file(file_info).await
self.local_storage.create_file(file_info).await?;
Ok(self.get_s3_key(&file_info.id, file_info.created_at))
}

async fn concat_files(
Expand All @@ -191,7 +181,7 @@
async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> {
if Some(file_info.offset) == file_info.length {
self.bucket
.delete_object(self.get_s3_key(file_info))
.delete_object(self.get_s3_key(&file_info.id, file_info.created_at))
.await?;
} else {
self.local_storage.remove_file(file_info).await?;
Expand Down
14 changes: 5 additions & 9 deletions src/utils/dir_struct.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
use chrono::{Datelike, Timelike};

/// Generate directory name with user template.
pub fn substr_now(dir_structure: &str) -> String {
let now = chrono::Utc::now();
substr_time(dir_structure, now)
}

pub fn substr_time(dir_structure: &str, time: chrono::DateTime<chrono::Utc>) -> String {
dir_structure
.replace("{day}", time.day().to_string().as_str())
Expand All @@ -17,19 +11,21 @@ pub fn substr_time(dir_structure: &str, time: chrono::DateTime<chrono::Utc>) ->

#[cfg(test)]
mod tests {
use super::substr_now;
use chrono::Datelike;

use crate::utils::dir_struct::substr_time;

#[test]
pub fn test_time() {
let now = chrono::Utc::now();
let dir = substr_now("{day}/{month}");
let dir = substr_time("{day}/{month}", now);
assert_eq!(dir, format!("{}/{}", now.day(), now.month()));
}

#[test]
pub fn test_unknown_var() {
let dir = substr_now("test/{quake}");
let now = chrono::Utc::now();
let dir = substr_time("test/{quake}", now);
assert_eq!(dir, String::from("test/{quake}"));
}
}
Loading