Skip to content

Commit 89c4906

Browse files
committed
vortex-file: don't run separate IO dispatch pool
Signed-off-by: Alfonso Subiotto Marques <[email protected]>
1 parent 1fbc3a3 commit 89c4906

File tree

3 files changed

+40
-34
lines changed

3 files changed

+40
-34
lines changed

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-file/src/generic.rs

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use dashmap::DashMap;
88
use futures::{StreamExt, pin_mut};
99
use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
1010
use vortex_error::{VortexExpect, VortexResult, vortex_err};
11-
use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
11+
use vortex_io::{InstrumentedReadAt, VortexReadAt};
1212
use vortex_layout::segments::{SegmentEvents, SegmentId};
1313

1414
use crate::driver::CoalescedDriver;
@@ -18,10 +18,6 @@ use crate::segments::{
1818
};
1919
use crate::{EOF_SIZE, FileType, Footer, MAX_FOOTER_SIZE, VortexFile, VortexOpenOptions};
2020

21-
#[cfg(feature = "tokio")]
22-
static TOKIO_DISPATCHER: std::sync::LazyLock<IoDispatcher> =
23-
std::sync::LazyLock::new(|| IoDispatcher::new_tokio(1));
24-
2521
/// A type of Vortex file that supports any [`VortexReadAt`] implementation.
2622
///
2723
/// This is a reasonable choice for files backed by a network since it performs I/O coalescing.
@@ -76,8 +72,7 @@ impl VortexOpenOptions<GenericVortexFile> {
7672

7773
/// Open a Vortex file using the provided [`std::path::Path`].
7874
#[cfg(feature = "tokio")]
79-
pub async fn open(mut self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
80-
self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
75+
pub async fn open(self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
8176
self.open_read_at(vortex_io::TokioFile::open(read)?).await
8277
}
8378

@@ -119,7 +114,7 @@ impl VortexOpenOptions<GenericVortexFile> {
119114
);
120115

121116
// Spawn an I/O driver onto the dispatcher.
122-
let io_concurrency = self.options.io_concurrency;
117+
/*let io_concurrency = self.options.io_concurrency;
123118
self.options
124119
.io_dispatcher
125120
.dispatch(move || {
@@ -134,7 +129,19 @@ impl VortexOpenOptions<GenericVortexFile> {
134129
stream.collect::<()>().await
135130
}
136131
})
137-
.vortex_expect("Failed to spawn I/O driver");
132+
.vortex_expect("Failed to spawn I/O driver");*/
133+
134+
let io_concurrency = self.options.io_concurrency;
135+
tokio::task::spawn_local(async move {
136+
// Drive the segment event stream.
137+
let stream = driver
138+
.map(|coalesced_req| coalesced_req.launch(&read))
139+
.buffer_unordered(io_concurrency);
140+
pin_mut!(stream);
141+
142+
// Drive the stream to completion.
143+
stream.collect::<()>().await;
144+
});
138145

139146
Ok(VortexFile {
140147
footer,
@@ -239,11 +246,12 @@ impl VortexOpenOptions<GenericVortexFile> {
239246
&self,
240247
read: Arc<R>,
241248
) -> VortexResult<u64> {
242-
Ok(self
243-
.options
244-
.io_dispatcher
245-
.dispatch(move || async move { read.size().await })?
246-
.await??)
249+
Ok(read.size().await?)
250+
/*Ok(self
251+
.options
252+
.io_dispatcher
253+
.dispatch(move || async move { read.size().await })?
254+
.await??)*/
247255
}
248256

249257
/// Dispatch a read onto the configured I/O dispatcher.
@@ -252,11 +260,12 @@ impl VortexOpenOptions<GenericVortexFile> {
252260
read: Arc<R>,
253261
range: Range<u64>,
254262
) -> VortexResult<ByteBuffer> {
255-
Ok(self
256-
.options
257-
.io_dispatcher
258-
.dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
259-
.await??)
263+
Ok(read.read_byte_range(range, Alignment::none()).await?)
264+
/*Ok(self
265+
.options
266+
.io_dispatcher
267+
.dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
268+
.await??)*/
260269
}
261270

262271
/// Populate segments in the cache that were covered by the initial read.
@@ -289,17 +298,14 @@ impl VortexOpenOptions<GenericVortexFile> {
289298
#[cfg(feature = "object_store")]
290299
impl VortexOpenOptions<GenericVortexFile> {
291300
pub async fn open_object_store(
292-
mut self,
301+
self,
293302
object_store: &Arc<dyn object_store::ObjectStore>,
294303
path: &str,
295304
) -> VortexResult<VortexFile> {
296305
use std::path::Path;
297306

298307
use vortex_io::ObjectStoreReadAt;
299308

300-
// Object store _must_ use tokio for I/O.
301-
self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
302-
303309
// If the file is local, we much prefer to use TokioFile since object store re-opens the
304310
// file on every read. This check is a little naive... but we hope that ObjectStore will
305311
// soon expose the scheme in a way that we can check more thoroughly.
@@ -326,8 +332,6 @@ pub struct GenericFileOptions {
326332
/// The number of concurrent I/O requests to spawn.
327333
/// This should be smaller than execution concurrency for coalescing to occur.
328334
io_concurrency: usize,
329-
/// The dispatcher to use for I/O requests.
330-
io_dispatcher: IoDispatcher,
331335
}
332336

333337
impl Default for GenericFileOptions {
@@ -337,7 +341,6 @@ impl Default for GenericFileOptions {
337341
initial_read_size: 0,
338342
initial_read_segments: Default::default(),
339343
io_concurrency: 8,
340-
io_dispatcher: IoDispatcher::shared(),
341344
}
342345
}
343346
}

vortex-io/src/dispatcher/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::task::Poll;
1515
use cfg_if::cfg_if;
1616
use futures::FutureExt;
1717
use futures::channel::oneshot;
18-
use vortex_error::{VortexResult, vortex_err};
18+
use vortex_error::{VortexResult, vortex_err, vortex_panic};
1919

2020
static SHARED: LazyLock<IoDispatcher> = LazyLock::new(IoDispatcher::new);
2121

@@ -157,14 +157,17 @@ impl Dispatch for IoDispatcher {
157157
Fut: Future<Output = R> + 'static,
158158
R: Send + 'static,
159159
{
160-
match self.0.as_ref() {
160+
vortex_panic!(
161+
"IoDispatcher::dispatch is disabled by Polar Signals due to performance reasons."
162+
);
163+
/*match self.0.as_ref() {
161164
#[cfg(not(target_arch = "wasm32"))]
162165
Inner::Tokio(tokio_dispatch) => tokio_dispatch.dispatch(task),
163166
#[cfg(feature = "compio")]
164167
Inner::Compio(compio_dispatch) => compio_dispatch.dispatch(task),
165168
#[cfg(target_arch = "wasm32")]
166169
Inner::Wasm(wasm_dispatch) => wasm_dispatch.dispatch(task),
167-
}
170+
}*/
168171
}
169172

170173
fn shutdown(self) -> VortexResult<()> {

0 commit comments

Comments
 (0)