Skip to content

Commit 858e5eb

Browse files
committed
perf(cubestore): Make GCSRemoteFs::list use less intermediate memory
1 parent 93b2ba5 commit 858e5eb

File tree

1 file changed

+49
-28
lines changed
  • rust/cubestore/cubestore/src/remotefs

1 file changed

+49
-28
lines changed

rust/cubestore/cubestore/src/remotefs/gcs.rs

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -261,31 +261,64 @@ impl RemoteFs for GCSRemoteFs {
261261
}
262262

263263
async fn list(&self, remote_prefix: String) -> Result<Vec<String>, CubeError> {
264-
Ok(self
265-
.list_with_metadata(remote_prefix)
266-
.await?
267-
.into_iter()
268-
.map(|f| f.remote_path)
269-
.collect::<Vec<_>>())
264+
let leading_subpath = self.leading_subpath_regex();
265+
self.list_with_metadata_and_map(remote_prefix, |obj: Object| {
266+
Self::object_key_to_remote_path(&leading_subpath, &obj.name)
267+
})
268+
.await
270269
}
271270

272271
async fn list_with_metadata(
273272
&self,
274273
remote_prefix: String,
275274
) -> Result<Vec<RemoteFile>, CubeError> {
275+
let leading_subpath = self.leading_subpath_regex();
276+
self.list_with_metadata_and_map(remote_prefix, |obj: Object| RemoteFile {
277+
remote_path: Self::object_key_to_remote_path(&leading_subpath, &obj.name),
278+
updated: obj.updated,
279+
file_size: obj.size,
280+
})
281+
.await
282+
}
283+
284+
async fn local_path(&self) -> Result<String, CubeError> {
285+
Ok(self.dir.to_str().unwrap().to_owned())
286+
}
287+
288+
async fn local_file(&self, remote_path: String) -> Result<String, CubeError> {
289+
let buf = self.dir.join(remote_path);
290+
fs::create_dir_all(buf.parent().unwrap()).await?;
291+
Ok(buf.to_str().unwrap().to_string())
292+
}
293+
}
294+
295+
struct LeadingSubpath(Regex);
296+
297+
impl GCSRemoteFs {
298+
fn leading_subpath_regex(&self) -> LeadingSubpath {
299+
LeadingSubpath(Regex::new(format!("^{}", self.gcs_path("")).as_str()).unwrap())
300+
}
301+
302+
fn object_key_to_remote_path(leading_subpath: &LeadingSubpath, obj_name: &String) -> String {
303+
leading_subpath
304+
.0
305+
.replace(&obj_name, NoExpand(""))
306+
.to_string()
307+
}
308+
309+
async fn list_with_metadata_and_map<T, F>(
310+
&self,
311+
remote_prefix: String,
312+
f: F,
313+
) -> Result<Vec<T>, CubeError>
314+
where
315+
F: FnMut(Object) -> T + Copy,
316+
{
276317
let prefix = self.gcs_path(&remote_prefix);
277318
let list = Object::list_prefix(self.bucket.as_str(), prefix.as_str()).await?;
278-
let leading_slash = Regex::new(format!("^{}", self.gcs_path("")).as_str()).unwrap();
279319
let result = list
280-
.map(|objects| -> Result<Vec<RemoteFile>, CubeError> {
281-
Ok(objects?
282-
.into_iter()
283-
.map(|obj| RemoteFile {
284-
remote_path: leading_slash.replace(&obj.name, NoExpand("")).to_string(),
285-
updated: obj.updated.clone(),
286-
file_size: obj.size,
287-
})
288-
.collect())
320+
.map(|objects| -> Result<Vec<T>, CubeError> {
321+
Ok(objects?.into_iter().map(f).collect())
289322
})
290323
.collect::<Vec<_>>()
291324
.await
@@ -310,18 +343,6 @@ impl RemoteFs for GCSRemoteFs {
310343
Ok(result)
311344
}
312345

313-
async fn local_path(&self) -> Result<String, CubeError> {
314-
Ok(self.dir.to_str().unwrap().to_owned())
315-
}
316-
317-
async fn local_file(&self, remote_path: String) -> Result<String, CubeError> {
318-
let buf = self.dir.join(remote_path);
319-
fs::create_dir_all(buf.parent().unwrap()).await?;
320-
Ok(buf.to_str().unwrap().to_string())
321-
}
322-
}
323-
324-
impl GCSRemoteFs {
325346
fn gcs_path(&self, remote_path: &str) -> String {
326347
format!(
327348
"{}/{}",

0 commit comments

Comments
 (0)