Skip to content

Commit cf4b6b3

Browse files
committed
perf(cubestore): Make S3RemoteFs consume less memory when making listings
1 parent 858e5eb commit cf4b6b3

File tree

1 file changed

+27
-11
lines changed
  • rust/cubestore/cubestore/src/remotefs

1 file changed

+27
-11
lines changed

rust/cubestore/cubestore/src/remotefs/s3.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ impl RemoteFs for S3RemoteFs {
307307

308308
async fn list(&self, remote_prefix: String) -> Result<Vec<String>, CubeError> {
309309
let leading_subpath = self.leading_subpath_regex();
310-
self.list_with_metadata_and_map(remote_prefix, |o: s3::serde_types::Object| {
310+
self.list_objects_and_map(remote_prefix, |o: s3::serde_types::Object| {
311311
Ok(Self::object_key_to_remote_path(&leading_subpath, &o.key))
312312
})
313313
.await
@@ -318,7 +318,7 @@ impl RemoteFs for S3RemoteFs {
318318
remote_prefix: String,
319319
) -> Result<Vec<RemoteFile>, CubeError> {
320320
let leading_subpath = self.leading_subpath_regex();
321-
self.list_with_metadata_and_map(remote_prefix, |o: s3::serde_types::Object| {
321+
self.list_objects_and_map(remote_prefix, |o: s3::serde_types::Object| {
322322
Ok(RemoteFile {
323323
remote_path: Self::object_key_to_remote_path(&leading_subpath, &o.key),
324324
updated: DateTime::parse_from_rfc3339(&o.last_modified)?.with_timezone(&Utc),
@@ -350,31 +350,47 @@ impl S3RemoteFs {
350350
leading_subpath.0.replace(o_key, NoExpand("")).to_string()
351351
}
352352

353-
async fn list_with_metadata_and_map<T, F>(
353+
async fn list_objects_and_map<T, F>(
354354
&self,
355355
remote_prefix: String,
356-
f: F,
356+
mut f: F,
357357
) -> Result<Vec<T>, CubeError>
358358
where
359359
F: FnMut(s3::serde_types::Object) -> Result<T, CubeError> + Copy,
360360
{
361361
let path = self.s3_path(&remote_prefix);
362362
let bucket = self.bucket.load();
363-
let list = bucket.list(path, None).await?;
364-
let pages_count = list.len();
363+
let mut mapped_results = Vec::new();
364+
let mut continuation_token = None;
365+
let mut pages_count: i64 = 0;
366+
367+
loop {
368+
let (result, _) = bucket
369+
.list_page(path.clone(), None, continuation_token, None, None)
370+
.await?;
371+
372+
pages_count += 1;
373+
374+
for obj in result.contents.into_iter() {
375+
mapped_results.push(f(obj)?);
376+
}
377+
378+
continuation_token = result.next_continuation_token;
379+
if continuation_token.is_none() {
380+
break;
381+
}
382+
}
383+
365384
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
366385
pages_count as i64,
367386
Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]),
368387
);
369388
if pages_count > 100 {
370389
log::warn!("S3 list returned more than 100 pages: {}", pages_count);
371390
}
372-
let result = list
373-
.into_iter()
374-
.flat_map(|res| res.contents.into_iter().map(f))
375-
.collect::<Result<Vec<_>, _>>()?;
376-
Ok(result)
391+
Ok(mapped_results)
377392
}
393+
378394
fn s3_path(&self, remote_path: &str) -> String {
379395
format!(
380396
"{}{}",

0 commit comments

Comments
 (0)