Skip to content

Commit e4ba1d5

Browse files
committed
vortex-file: don't run separate IO dispatch pool
Signed-off-by: Alfonso Subiotto Marques <[email protected]>
1 parent 341e879 commit e4ba1d5

File tree

4 files changed

+43
-60
lines changed

4 files changed

+43
-60
lines changed

vortex-file/src/driver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ impl CoalescedSegmentRequest {
421421
}
422422

423423
/// Launch the request, reading the byte range from the provided reader.
424-
pub async fn launch<R: VortexReadAt>(self, read: R) {
424+
pub async fn launch<R: VortexReadAt + Send + Sync>(self, read: R) {
425425
let alignment = self.segment_map[*self.requests[0].id() as usize].alignment;
426426
let byte_range = self.byte_range.clone();
427427
let buffer = read

vortex-file/src/generic.rs

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::sync::Arc;
77
use futures::{StreamExt, pin_mut};
88
use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
99
use vortex_error::{VortexExpect, VortexResult, vortex_err};
10-
use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
10+
use vortex_io::{InstrumentedReadAt, VortexReadAt};
1111
use vortex_layout::segments::{SegmentEvents, SegmentId};
1212
use vortex_utils::aliases::dash_map::DashMap;
1313

@@ -18,10 +18,6 @@ use crate::segments::{
1818
};
1919
use crate::{EOF_SIZE, FileType, Footer, MAX_FOOTER_SIZE, VortexFile, VortexOpenOptions};
2020

21-
#[cfg(feature = "tokio")]
22-
static TOKIO_DISPATCHER: std::sync::LazyLock<IoDispatcher> =
23-
std::sync::LazyLock::new(|| IoDispatcher::new_tokio(1));
24-
2521
/// A type of Vortex file that supports any [`VortexReadAt`] implementation.
2622
///
2723
/// This is a reasonable choice for files backed by a network since it performs I/O coalescing.
@@ -76,8 +72,7 @@ impl VortexOpenOptions<GenericVortexFile> {
7672

7773
/// Open a Vortex file using the provided [`std::path::Path`].
7874
#[cfg(feature = "tokio")]
79-
pub async fn open(mut self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
80-
self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
75+
pub async fn open(self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
8176
self.open_read_at(vortex_io::TokioFile::open(read)?).await
8277
}
8378

@@ -120,28 +115,18 @@ impl VortexOpenOptions<GenericVortexFile> {
120115

121116
// Spawn an I/O driver onto the dispatcher.
122117
let io_concurrency = self.options.io_concurrency;
123-
let io_dispatcher = self.options.io_dispatcher.clone();
124-
self.options
125-
.io_dispatcher
126-
.dispatch(move || {
127-
async move {
128-
// Drive the segment event stream.
129-
let stream = driver
130-
.map(|coalesced_req| {
131-
let read = read.clone();
132-
io_dispatcher
133-
.dispatch(move || coalesced_req.launch(read))
134-
.vortex_expect("Failed to dispatch I/O request")
135-
})
136-
.buffer_unordered(io_concurrency)
137-
.map(|result| result.vortex_expect("infallible"));
138-
pin_mut!(stream);
139-
140-
// Drive the stream to completion.
141-
stream.collect::<()>().await
142-
}
143-
})
144-
.vortex_expect("Failed to spawn I/O driver");
118+
tokio::task::spawn(async move {
119+
// Drive the segment event stream.
120+
// TODO(asubiotto): Experiment with dispatching coalesced req as
121+
// tokio task.
122+
let stream = driver
123+
.map(|coalesced_req| coalesced_req.launch(read.clone()))
124+
.buffer_unordered(io_concurrency);
125+
pin_mut!(stream);
126+
127+
// Drive the stream to completion.
128+
stream.collect::<()>().await
129+
});
145130

146131
Ok(VortexFile {
147132
footer,
@@ -246,11 +231,12 @@ impl VortexOpenOptions<GenericVortexFile> {
246231
&self,
247232
read: Arc<R>,
248233
) -> VortexResult<u64> {
249-
Ok(self
250-
.options
251-
.io_dispatcher
252-
.dispatch(move || async move { read.size().await })?
253-
.await??)
234+
Ok(read.size().await?)
235+
/*Ok(self
236+
.options
237+
.io_dispatcher
238+
.dispatch(move || async move { read.size().await })?
239+
.await??)*/
254240
}
255241

256242
/// Dispatch a read onto the configured I/O dispatcher.
@@ -259,11 +245,12 @@ impl VortexOpenOptions<GenericVortexFile> {
259245
read: Arc<R>,
260246
range: Range<u64>,
261247
) -> VortexResult<ByteBuffer> {
262-
Ok(self
263-
.options
264-
.io_dispatcher
265-
.dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
266-
.await??)
248+
Ok(read.read_byte_range(range, Alignment::none()).await?)
249+
/*Ok(self
250+
.options
251+
.io_dispatcher
252+
.dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
253+
.await??)*/
267254
}
268255

269256
/// Populate segments in the cache that were covered by the initial read.
@@ -296,17 +283,14 @@ impl VortexOpenOptions<GenericVortexFile> {
296283
#[cfg(feature = "object_store")]
297284
impl VortexOpenOptions<GenericVortexFile> {
298285
pub async fn open_object_store(
299-
mut self,
286+
self,
300287
object_store: &Arc<dyn object_store::ObjectStore>,
301288
path: &str,
302289
) -> VortexResult<VortexFile> {
303290
use std::path::Path;
304291

305292
use vortex_io::ObjectStoreReadAt;
306293

307-
// Object store _must_ use tokio for I/O.
308-
self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
309-
310294
// If the file is local, we much prefer to use TokioFile since object store re-opens the
311295
// file on every read. This check is a little naive... but we hope that ObjectStore will
312296
// soon expose the scheme in a way that we can check more thoroughly.
@@ -333,8 +317,6 @@ pub struct GenericFileOptions {
333317
/// The number of concurrent I/O requests to spawn.
334318
/// This should be smaller than execution concurrency for coalescing to occur.
335319
io_concurrency: usize,
336-
/// The dispatcher to use for I/O requests.
337-
io_dispatcher: IoDispatcher,
338320
}
339321

340322
impl Default for GenericFileOptions {
@@ -344,7 +326,6 @@ impl Default for GenericFileOptions {
344326
initial_read_size: 0,
345327
initial_read_segments: Default::default(),
346328
io_concurrency: 8,
347-
io_dispatcher: IoDispatcher::shared(),
348329
}
349330
}
350331
}

vortex-io/src/dispatcher/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::task::Poll;
1515
use cfg_if::cfg_if;
1616
use futures::FutureExt;
1717
use futures::channel::oneshot;
18-
use vortex_error::{VortexResult, vortex_err};
18+
use vortex_error::{VortexResult, vortex_err, vortex_panic};
1919

2020
static SHARED: LazyLock<IoDispatcher> = LazyLock::new(IoDispatcher::new);
2121

@@ -157,14 +157,17 @@ impl Dispatch for IoDispatcher {
157157
Fut: Future<Output = R> + 'static,
158158
R: Send + 'static,
159159
{
160-
match self.0.as_ref() {
160+
vortex_panic!(
161+
"IoDispatcher::dispatch is disabled by Polar Signals due to performance reasons."
162+
);
163+
/*match self.0.as_ref() {
161164
#[cfg(not(target_arch = "wasm32"))]
162165
Inner::Tokio(tokio_dispatch) => tokio_dispatch.dispatch(task),
163166
#[cfg(feature = "compio")]
164167
Inner::Compio(compio_dispatch) => compio_dispatch.dispatch(task),
165168
#[cfg(target_arch = "wasm32")]
166169
Inner::Wasm(wasm_dispatch) => wasm_dispatch.dispatch(task),
167-
}
170+
}*/
168171
}
169172

170173
fn shutdown(self) -> VortexResult<()> {

vortex-io/src/read.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,24 @@ use vortex_metrics::{Histogram, Timer, VortexMetrics};
1212

1313
/// A trait for types that support asynchronous reads.
1414
///
15-
/// References to the type must be safe to [share across threads][Send], but spawned
16-
/// futures may be `!Send` to support thread-per-core implementations.
15+
/// References to the type must be safe to [share across threads][Send], and spawned
16+
/// futures must also be [`Send`] to support spawning on multi-threaded executors.
1717
///
1818
/// Readers must be cheaply cloneable to allow for easy sharing across tasks or threads.
19-
pub trait VortexReadAt: 'static {
19+
pub trait VortexReadAt: 'static + Send + Sync {
2020
/// Request an asynchronous positional read. Results will be returned as a [`ByteBuffer`].
2121
///
2222
/// If the reader does not have the requested number of bytes, the returned Future will complete
2323
/// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof].
2424
///
2525
/// ## Thread Safety
2626
///
27-
/// The resultant Future need not be [`Send`], allowing implementations that use thread-per-core
28-
/// executors.
27+
/// The resultant Future must be [`Send`] to allow spawning on multi-threaded executors.
2928
fn read_byte_range(
3029
&self,
3130
range: Range<u64>,
3231
alignment: Alignment,
33-
) -> impl Future<Output = io::Result<ByteBuffer>>;
32+
) -> impl Future<Output = io::Result<ByteBuffer>> + Send;
3433

3534
// TODO(ngates): the read implementation should be able to hint at its latency/throughput
3635
// allowing the caller to make better decisions about how to coalesce reads.
@@ -42,7 +41,7 @@ pub trait VortexReadAt: 'static {
4241
///
4342
/// For a file it will be the size in bytes, for an object in an
4443
/// `ObjectStore` it will be the `ObjectMeta::size`.
45-
fn size(&self) -> impl Future<Output = io::Result<u64>>;
44+
fn size(&self) -> impl Future<Output = io::Result<u64>> + Send;
4645
}
4746

4847
#[derive(Debug, Clone)]
@@ -83,7 +82,7 @@ impl PerformanceHint {
8382
}
8483
}
8584

86-
impl<T: VortexReadAt> VortexReadAt for Arc<T> {
85+
impl<T: VortexReadAt + Send + Sync> VortexReadAt for Arc<T> {
8786
async fn read_byte_range(
8887
&self,
8988
range: Range<u64>,
@@ -134,7 +133,7 @@ pub struct InstrumentedReadAt<T: VortexReadAt> {
134133
durations: Arc<Timer>,
135134
}
136135

137-
impl<T: VortexReadAt> InstrumentedReadAt<T> {
136+
impl<T: VortexReadAt + Send + Sync> InstrumentedReadAt<T> {
138137
pub fn new(read: T, metrics: &VortexMetrics) -> Self {
139138
Self {
140139
read,
@@ -144,7 +143,7 @@ impl<T: VortexReadAt> InstrumentedReadAt<T> {
144143
}
145144
}
146145

147-
impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
146+
impl<T: VortexReadAt + Send + Sync> VortexReadAt for InstrumentedReadAt<T> {
148147
async fn read_byte_range(
149148
&self,
150149
range: Range<u64>,

0 commit comments

Comments
 (0)