Skip to content

Commit 452b02b

Browse files
authored
Use FuturesUnordered when order of try_join_all is not relevant (#2026)
1 parent cfc8eb9 commit 452b02b

File tree

4 files changed

+30
-17
lines changed

4 files changed

+30
-17
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ arrow-ord = "53.0.0"
5959
arrow-schema = "53.0.0"
6060
arrow-select = "53.0.0"
6161
arrow-string = "53.0.0"
62+
async-once-cell = "0.5.4"
6263
async-trait = "0.1"
6364
backtrace = "0.3.74"
6465
bytes = "1.6.0"
@@ -107,7 +108,6 @@ parquet = "53.0.0"
107108
paste = "1.0.14"
108109
pin-project = "1.1.5"
109110
pin-project-lite = "0.2.15"
110-
tabled = { version = "0.17.0", default-features = false }
111111
prost = "0.13.0"
112112
prost-build = "0.13.0"
113113
prost-types = "0.13.0"
@@ -123,6 +123,7 @@ serde_json = "1.0.116"
123123
serde_test = "1.0.176"
124124
simplelog = { version = "0.12.2", features = ["paris"] }
125125
static_assertions = "1"
126+
tabled = { version = "0.17.0", default-features = false }
126127
tar = "0.4"
127128
tempfile = "3"
128129
thiserror = "2.0.0"

vortex-file/src/io/file.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use std::sync::Arc;
44

55
use futures::channel::oneshot;
66
use futures::Stream;
7-
use futures_util::future::try_join_all;
8-
use futures_util::{stream, StreamExt};
7+
use futures_util::stream::FuturesUnordered;
8+
use futures_util::{stream, StreamExt, TryStreamExt};
99
use vortex_buffer::ByteBuffer;
1010
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
1111
use vortex_io::VortexReadAt;
@@ -213,7 +213,9 @@ async fn evaluate<R: VortexReadAt>(
213213
}
214214

215215
// Populate the cache
216-
try_join_all(cache_futures).await?;
216+
FuturesUnordered::from_iter(cache_futures)
217+
.try_collect::<()>()
218+
.await?;
217219

218220
Ok(())
219221
}

vortex-file/src/open/mod.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use std::sync::Arc;
55

66
pub use exec::*;
77
use flatbuffers::root;
8+
use futures_util::stream::FuturesUnordered;
9+
use futures_util::{stream, StreamExt, TryStreamExt};
810
use itertools::Itertools;
911
pub use split_by::*;
1012
use vortex_array::ContextRef;
@@ -331,17 +333,25 @@ impl VortexOpenOptions {
331333
file_layout: &FileLayout,
332334
segments: &dyn SegmentCache,
333335
) -> VortexResult<()> {
334-
for (idx, segment) in file_layout.segment_map().iter().enumerate() {
335-
if segment.offset < initial_offset {
336-
// Skip segments that aren't in the initial read.
337-
continue;
338-
}
339-
let segment_id = SegmentId::from(u32::try_from(idx)?);
340-
let offset = usize::try_from(segment.offset - initial_offset)?;
341-
let buffer = initial_read.slice(offset..offset + (segment.length as usize));
342-
343-
segments.put(segment_id, buffer).await?;
344-
}
345-
Ok(())
336+
stream::iter(
337+
file_layout
338+
.segment_map()
339+
.iter()
340+
.enumerate()
341+
.filter(|(_, segment)| segment.offset > initial_offset)
342+
.map(|(idx, segment)| async move {
343+
let segment_id = SegmentId::from(u32::try_from(idx)?);
344+
let offset = usize::try_from(segment.offset - initial_offset)?;
345+
let buffer = initial_read
346+
.slice(offset..offset + (segment.length as usize))
347+
.aligned(segment.alignment);
348+
349+
segments.put(segment_id, buffer).await
350+
}),
351+
)
352+
.collect::<FuturesUnordered<_>>()
353+
.await
354+
.try_collect::<()>()
355+
.await
346356
}
347357
}

vortex-layout/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ categories.workspace = true
1515

1616
[dependencies]
1717
arrow-buffer = { workspace = true }
18-
async-once-cell = "0.5.4"
18+
async-once-cell = { workspace = true }
1919
async-trait = { workspace = true }
2020
bytes = { workspace = true }
2121
flatbuffers = { workspace = true }

0 commit comments

Comments
 (0)