Skip to content

Commit 1aaeeb7

Browse files
authored
Dispatch each I/O request (#4464)
An attempt at short-term fix for #4400 The coalescing driver was spawned as a single task, and so landed on a single thread of the I/O dispatcher regardless of how many threads you gave it. This PR spawns each read request to allow other threads a chance to play ball. The reason the Polars patch doesn't work is that we need to continue to dispatch work in order to provide a Tokio runtime within non-Tokio clients, such as DuckDB. This is a bigger change as part of #4406
1 parent d135f4a commit 1aaeeb7

File tree

3 files changed

+12
-5
lines changed

3 files changed

+12
-5
lines changed

vortex-file/src/driver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ impl CoalescedSegmentRequest {
421421
}
422422

423423
/// Launch the request, reading the byte range from the provided reader.
424-
pub async fn launch<R: VortexReadAt>(self, read: &R) {
424+
pub async fn launch<R: VortexReadAt>(self, read: R) {
425425
let alignment = self.segment_map[*self.requests[0].id() as usize].alignment;
426426
let byte_range = self.byte_range.clone();
427427
let buffer = read

vortex-file/src/generic.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,21 @@ impl VortexOpenOptions<GenericVortexFile> {
120120

121121
// Spawn an I/O driver onto the dispatcher.
122122
let io_concurrency = self.options.io_concurrency;
123+
let io_dispatcher = self.options.io_dispatcher.clone();
123124
self.options
124125
.io_dispatcher
125126
.dispatch(move || {
126127
async move {
127128
// Drive the segment event stream.
128129
let stream = driver
129-
.map(|coalesced_req| coalesced_req.launch(&read))
130-
.buffer_unordered(io_concurrency);
130+
.map(|coalesced_req| {
131+
let read = read.clone();
132+
io_dispatcher
133+
.dispatch(move || coalesced_req.launch(read))
134+
.vortex_expect("Failed to dispatch I/O request")
135+
})
136+
.buffer_unordered(io_concurrency)
137+
.map(|result| result.vortex_expect("infallible"));
131138
pin_mut!(stream);
132139

133140
// Drive the stream to completion.

vortex-io/src/dispatcher/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ impl IoDispatcher {
8181
if #[cfg(target_arch = "wasm32")] {
8282
Self(Arc::new(Inner::Wasm(WasmDispatcher::new())))
8383
} else if #[cfg(not(feature = "compio"))] {
84-
Self(Arc::new(Inner::Tokio(TokioDispatcher::new(1))))
84+
Self(Arc::new(Inner::Tokio(TokioDispatcher::new(4))))
8585
} else {
86-
Self(Arc::new(Inner::Compio(CompioDispatcher::new(1))))
86+
Self(Arc::new(Inner::Compio(CompioDispatcher::new(4))))
8787
}
8888
}
8989
}

0 commit comments

Comments
 (0)