Skip to content

Commit 90b87e4

Browse files
committed
Added concat support for s3 and hybrid-s3.
1 parent 6a75cb4 commit 90b87e4

File tree

7 files changed

+250
-21
lines changed

7 files changed

+250
-21
lines changed

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ url = "^2.5.4"
2525
prometheus = "^0.13.4"
2626
actix-web-prom = "^0.9.0"
2727
dyn-clone = "^1.0.17"
28+
tempdir = "0.3.7"
2829
actix-cors = "0.7.0"
2930
wildmatch = "2.4.0"
3031
md-5 = "^0.10.6"
@@ -69,7 +70,6 @@ test_rmq = []
6970
integration_tests = ["test_redis", "test_rmq"]
7071

7172
[dev-dependencies]
72-
tempdir = "0.3.7"
7373
actix-rt = "2.10.0"
7474
httptest = "0.16.1"
7575
actix-http = "3.9.0"

docs/configuration.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ Parameters:
182182
* `--s3-profile` - Name of the section from `~/.aws/credentials` file;
183183
* `--s3-headers` - JSON object with additional header to every S3 request (Useful for setting ACLs);
184184
* `--s3-force-path-style` - use path style URL. It appends bucket name at the end of the URL;
185+
* `--s3-concat-concurrent-downloads` - Number of concurrent downloads of partial files from S3. When performing concatenation, Rustus downloads all partial files from S3 and concatenates them into a single file. This parameter controls the number of concurrent downloads. Default value is 10.
185186

186187
Required parameter are only `--s3-url` and `--s3-bucket`.
187188

@@ -203,7 +204,8 @@ Required parameter are only `--s3-url` and `--s3-bucket`.
203204
--s3-headers '{"x-amz-acl": "public-read"}' \
204205
--force-fsync \
205206
--data-dir "./data/" \
206-
--dir-structure "{year}/{month}/{day}"
207+
--dir-structure "{year}/{month}/{day}" \
208+
--s3-concat-concurrent-downloads 10
207209
```
208210

209211
=== "ENV"
@@ -225,6 +227,7 @@ Required parameter are only `--s3-url` and `--s3-bucket`.
225227
export RUSTUS_DIR_STRUCTURE="{year}/{month}/{day}"
226228
export RUSTUS_FORCE_FSYNC="true"
227229
export RUSTUS_S3_FORCE_PATH_STYLE="true"
230+
export RUSTUS_S3_CONCAT_CONCURRENT_DOWNLOADS="true"
228231

229232
rustus
230233
```
@@ -259,6 +262,8 @@ Parameters:
259262
* `--s3-headers` - JSON object with additional header to every S3 request (Useful for setting ACLs);
260263
* `--s3-force-path-style` - use path style URL. It appends bucket name at the end of the URL;
261264
* `--dir-structure` - pattern of a directory structure on s3;
265+
* `--s3-concat-concurrent-downloads` - Number of concurrent downloads of partial files from S3. When performing concatenation, Rustus downloads all partial files from S3 and concatenates them into a single file. This parameter controls the number of concurrent downloads. Default value is 10.
266+
262267

263268
Required parameter are only `--s3-url` and `--s3-bucket`.
264269

@@ -278,7 +283,9 @@ Required parameter are only `--s3-url` and `--s3-bucket`.
278283
--s3-session-token "token" \
279284
--s3-force-path-style \
280285
--s3-headers '{"x-amz-acl": "public-read"}' \
281-
--dir-structure "{year}/{month}/{day}"
286+
--dir-structure "{year}/{month}/{day}" \
287+
--s3-concat-concurrent-downloads 10
288+
282289
```
283290

284291
=== "ENV"
@@ -298,6 +305,7 @@ Required parameter are only `--s3-url` and `--s3-bucket`.
298305
export RUSTUS_S3_HEADERS='{"x-amz-acl": "public-read"}'
299306
export RUSTUS_S3_FORCE_PATH_STYLE="true"
300307
export RUSTUS_DIR_STRUCTURE="{year}/{month}/{day}"
308+
export RUSTUS_S3_CONCAT_CONCURRENT_DOWNLOADS="10"
301309

302310
rustus
303311
```

src/config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,21 @@ pub struct DataStorageOptions {
118118
/// This parameter is required fo s3-based storages.
119119
#[arg(long, env = "RUSTUS_S3_HEADERS")]
120120
pub s3_headers: Option<String>,
121+
122+
/// Number of concurrent downloads of partial files
123+
/// from S3.
124+
/// When performing concatenation, Rustus downloads
125+
/// all partial files from S3 and concatenates them
126+
/// into a single file.
127+
///
128+
/// This parameter controls the number of concurrent
129+
/// downloads.
130+
#[arg(
131+
long,
132+
env = "RUSTUS_S3_CONCAT_CONCURRENT_DOWNLOADS",
133+
default_value = "10"
134+
)]
135+
pub s3_concat_concurrent_downloads: usize,
121136
}
122137

123138
#[derive(Parser, Debug, Clone)]

src/data_storage/impls/s3_hybrid.rs

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, path::PathBuf};
1+
use std::{collections::HashMap, io::Write, path::PathBuf};
22

33
use crate::{
44
data_storage::base::DataStorage,
@@ -12,11 +12,13 @@ use crate::utils::dir_struct::substr_time;
1212
use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder};
1313
use bytes::Bytes;
1414
use chrono::{DateTime, Utc};
15+
use futures::{StreamExt, TryStreamExt};
1516
use s3::{
1617
command::Command,
1718
request::{tokio_backend::HyperRequest, Request as S3Request},
1819
Bucket,
1920
};
21+
use tokio::io::AsyncWriteExt;
2022

2123
use super::file_storage::FileDataStorage;
2224

@@ -32,6 +34,7 @@ pub struct S3HybridDataStorage {
3234
bucket: Bucket,
3335
local_storage: FileDataStorage,
3436
dir_struct: String,
37+
concurrent_concat_downloads: usize,
3538
}
3639

3740
impl S3HybridDataStorage {
@@ -50,6 +53,7 @@ impl S3HybridDataStorage {
5053
data_dir: PathBuf,
5154
dir_struct: String,
5255
force_fsync: bool,
56+
concurrent_concat_downloads: usize,
5357
) -> Self {
5458
let local_storage = FileDataStorage::new(data_dir, dir_struct.clone(), force_fsync);
5559
let creds = s3::creds::Credentials::new(
@@ -91,6 +95,7 @@ impl S3HybridDataStorage {
9195
bucket: *bucket,
9296
local_storage,
9397
dir_struct,
98+
concurrent_concat_downloads,
9499
}
95100
}
96101

@@ -170,12 +175,81 @@ impl DataStorage for S3HybridDataStorage {
170175

171176
async fn concat_files(
172177
&self,
173-
_file_info: &FileInfo,
174-
_parts_info: Vec<FileInfo>,
178+
file_info: &FileInfo,
179+
parts_info: Vec<FileInfo>,
175180
) -> RustusResult<()> {
176-
Err(RustusError::Unimplemented(
177-
"Hybrid s3 cannot concat files.".into(),
178-
))
181+
let dir = tempdir::TempDir::new(&file_info.id)?;
182+
let mut download_futures = vec![];
183+
184+
// At first we need to download all parts.
185+
for part_info in &parts_info {
186+
let part_key = self.get_s3_key(&part_info.id, part_info.created_at);
187+
let part_out = dir.path().join(&part_info.id);
188+
// Here we create a future which downloads the part
189+
// into a temporary file.
190+
download_futures.push(async move {
191+
let part_file = tokio::fs::File::create(&part_out).await?;
192+
let mut writer = tokio::io::BufWriter::new(part_file);
193+
let mut reader = self.bucket.get_object_stream(&part_key).await?;
194+
while let Some(chunk) = reader.bytes().next().await {
195+
let mut chunk = chunk?;
196+
writer.write_all_buf(&mut chunk).await.map_err(|err| {
197+
log::error!("{:?}", err);
198+
RustusError::UnableToWrite(err.to_string())
199+
})?;
200+
}
201+
writer.flush().await?;
202+
writer.get_ref().sync_data().await?;
203+
Ok::<_, RustusError>(())
204+
});
205+
}
206+
// Here we await all download futures.
207+
// We use buffer_unordered to limit the number of concurrent downloads.
208+
futures::stream::iter(download_futures)
209+
// Number of concurrent downloads.
210+
.buffer_unordered(self.concurrent_concat_downloads)
211+
// We use try_collect to collect all results
212+
// and return an error if any of the futures returned an error.
213+
.try_collect::<Vec<_>>()
214+
.await?;
215+
216+
let output_path = dir.path().join(&file_info.id);
217+
let output_path_cloned = output_path.clone();
218+
let parts_files = parts_info
219+
.iter()
220+
.map(|info| dir.path().join(&info.id))
221+
.collect::<Vec<_>>();
222+
tokio::task::spawn_blocking(move || {
223+
let file = std::fs::OpenOptions::new()
224+
.append(true)
225+
.create(true)
226+
.open(output_path_cloned)
227+
.map_err(|err| {
228+
log::error!("{:?}", err);
229+
RustusError::UnableToWrite(err.to_string())
230+
})?;
231+
let mut writer = std::io::BufWriter::new(file);
232+
for part in &parts_files {
233+
let part_file = std::fs::OpenOptions::new().read(true).open(part)?;
234+
let mut reader = std::io::BufReader::new(part_file);
235+
std::io::copy(&mut reader, &mut writer)?;
236+
}
237+
writer.flush()?;
238+
writer.get_ref().sync_data()?;
239+
Ok::<_, RustusError>(())
240+
})
241+
.await??;
242+
243+
// We reopen the file to upload it to S3.
244+
// This is needed because we need to open the file in read mode.
245+
let output_file = tokio::fs::File::open(&output_path).await?;
246+
let mut reader = tokio::io::BufReader::new(output_file);
247+
let key = self.get_s3_key(&file_info.id, file_info.created_at);
248+
self.bucket.put_object_stream(&mut reader, key).await?;
249+
250+
tokio::fs::remove_file(output_path).await?;
251+
252+
Ok(())
179253
}
180254

181255
async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> {

0 commit comments

Comments
 (0)