Skip to content

Commit 93b2ba5

Browse files
committed
perf(cubestore): Make S3RemoteFs::list use less intermediate memory
1 parent c60b756 commit 93b2ba5

File tree

1 file changed

+48
-33
lines changed
  • rust/cubestore/cubestore/src/remotefs

1 file changed

+48
-33
lines changed

rust/cubestore/cubestore/src/remotefs/s3.rs

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -306,46 +306,26 @@ impl RemoteFs for S3RemoteFs {
306306
}
307307

308308
async fn list(&self, remote_prefix: String) -> Result<Vec<String>, CubeError> {
309-
Ok(self
310-
.list_with_metadata(remote_prefix)
311-
.await?
312-
.into_iter()
313-
.map(|f| f.remote_path)
314-
.collect::<Vec<_>>())
309+
let leading_subpath = self.leading_subpath_regex();
310+
self.list_with_metadata_and_map(remote_prefix, |o: s3::serde_types::Object| {
311+
Ok(Self::object_key_to_remote_path(&leading_subpath, &o.key))
312+
})
313+
.await
315314
}
316315

317316
async fn list_with_metadata(
318317
&self,
319318
remote_prefix: String,
320319
) -> Result<Vec<RemoteFile>, CubeError> {
321-
let path = self.s3_path(&remote_prefix);
322-
let bucket = self.bucket.load();
323-
let list = bucket.list(path, None).await?;
324-
let pages_count = list.len();
325-
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
326-
pages_count as i64,
327-
Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]),
328-
);
329-
if pages_count > 100 {
330-
log::warn!("S3 list returned more than 100 pages: {}", pages_count);
331-
}
332-
let leading_slash = Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap();
333-
let result = list
334-
.iter()
335-
.flat_map(|res| {
336-
res.contents
337-
.iter()
338-
.map(|o| -> Result<RemoteFile, CubeError> {
339-
Ok(RemoteFile {
340-
remote_path: leading_slash.replace(&o.key, NoExpand("")).to_string(),
341-
updated: DateTime::parse_from_rfc3339(&o.last_modified)?
342-
.with_timezone(&Utc),
343-
file_size: o.size,
344-
})
345-
})
320+
let leading_subpath = self.leading_subpath_regex();
321+
self.list_with_metadata_and_map(remote_prefix, |o: s3::serde_types::Object| {
322+
Ok(RemoteFile {
323+
remote_path: Self::object_key_to_remote_path(&leading_subpath, &o.key),
324+
updated: DateTime::parse_from_rfc3339(&o.last_modified)?.with_timezone(&Utc),
325+
file_size: o.size,
346326
})
347-
.collect::<Result<Vec<_>, _>>()?;
348-
Ok(result)
327+
})
328+
.await
349329
}
350330

351331
async fn local_path(&self) -> Result<String, CubeError> {
@@ -359,7 +339,42 @@ impl RemoteFs for S3RemoteFs {
359339
}
360340
}
361341

342+
struct LeadingSubpath(Regex);
343+
362344
impl S3RemoteFs {
345+
fn leading_subpath_regex(&self) -> LeadingSubpath {
346+
LeadingSubpath(Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap())
347+
}
348+
349+
fn object_key_to_remote_path(leading_subpath: &LeadingSubpath, o_key: &String) -> String {
350+
leading_subpath.0.replace(o_key, NoExpand("")).to_string()
351+
}
352+
353+
async fn list_with_metadata_and_map<T, F>(
354+
&self,
355+
remote_prefix: String,
356+
f: F,
357+
) -> Result<Vec<T>, CubeError>
358+
where
359+
F: FnMut(s3::serde_types::Object) -> Result<T, CubeError> + Copy,
360+
{
361+
let path = self.s3_path(&remote_prefix);
362+
let bucket = self.bucket.load();
363+
let list = bucket.list(path, None).await?;
364+
let pages_count = list.len();
365+
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
366+
pages_count as i64,
367+
Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]),
368+
);
369+
if pages_count > 100 {
370+
log::warn!("S3 list returned more than 100 pages: {}", pages_count);
371+
}
372+
let result = list
373+
.into_iter()
374+
.flat_map(|res| res.contents.into_iter().map(f))
375+
.collect::<Result<Vec<_>, _>>()?;
376+
Ok(result)
377+
}
363378
fn s3_path(&self, remote_path: &str) -> String {
364379
format!(
365380
"{}{}",

0 commit comments

Comments
 (0)