Skip to content

Commit 8484445

Browse files
authored
Reuse the IoDispatcher across DataFusion instances (#1299)
1 parent 5d72d84 commit 8484445

File tree

7 files changed

+51
-28
lines changed

7 files changed

+51
-28
lines changed

Cargo.lock

Lines changed: 22 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench-vortex/benches/tpch.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ use criterion::{criterion_group, criterion_main, Criterion};
44
use tokio::runtime::Builder;
55

66
fn benchmark(c: &mut Criterion) {
7-
let runtime = Builder::new_current_thread().enable_all().build().unwrap();
7+
let runtime = Builder::new_current_thread()
8+
.thread_name("benchmark-tpch")
9+
.enable_all()
10+
.build()
11+
.unwrap();
812

913
// Run TPC-H data gen.
1014
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();

bench-vortex/src/reader.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::fs::File;
22
use std::ops::Range;
33
use std::path::{Path, PathBuf};
44
use std::process::Command;
5-
use std::sync::Arc;
5+
use std::sync::{Arc, LazyLock};
66

77
use arrow_array::types::Int64Type;
88
use arrow_array::{
@@ -27,11 +27,16 @@ use vortex::arrow::FromArrowType;
2727
use vortex::compress::CompressionStrategy;
2828
use vortex::dtype::DType;
2929
use vortex::error::VortexResult;
30-
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
30+
use vortex::file::{
31+
IoDispatcher, LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder,
32+
};
3133
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
3234
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
3335
use vortex::{Array, IntoArray, IntoCanonical};
3436

37+
static DISPATCHER: LazyLock<Arc<IoDispatcher>> =
38+
LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1)));
39+
3540
pub const BATCH_SIZE: usize = 65_536;
3641

3742
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -51,6 +56,7 @@ pub async fn open_vortex(path: &Path) -> VortexResult<Array> {
5156
LayoutContext::default().into(),
5257
),
5358
)
59+
.with_io_dispatcher(DISPATCHER.clone())
5460
.build()
5561
.await?
5662
.read_all()

vortex-datafusion/src/persistent/opener.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::sync::{Arc, LazyLock};
22

33
use arrow_array::RecordBatch;
44
use arrow_schema::SchemaRef;
@@ -9,9 +9,15 @@ use futures::{FutureExt as _, StreamExt, TryStreamExt};
99
use object_store::ObjectStore;
1010
use vortex_array::Context;
1111
use vortex_expr::datafusion::convert_expr_to_vortex;
12-
use vortex_file::{LayoutContext, LayoutDeserializer, Projection, RowFilter, VortexReadBuilder};
12+
use vortex_file::{
13+
IoDispatcher, LayoutContext, LayoutDeserializer, Projection, RowFilter, VortexReadBuilder,
14+
};
1315
use vortex_io::ObjectStoreReadAt;
1416

17+
/// Share an IO dispatcher across all DataFusion instances.
18+
static IO_DISPATCHER: LazyLock<Arc<IoDispatcher>> =
19+
LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1)));
20+
1521
pub struct VortexFileOpener {
1622
pub ctx: Arc<Context>,
1723
pub object_store: Arc<dyn ObjectStore>,
@@ -28,7 +34,8 @@ impl FileOpener for VortexFileOpener {
2834
let mut builder = VortexReadBuilder::new(
2935
read_at,
3036
LayoutDeserializer::new(self.ctx.clone(), Arc::new(LayoutContext::default())),
31-
);
37+
)
38+
.with_io_dispatcher(IO_DISPATCHER.clone());
3239

3340
let row_filter = self
3441
.predicate

vortex-file/src/dispatcher/tokio.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ impl TokioDispatcher {
2626
let threads: Vec<_> = (0..num_threads)
2727
.map(|tid| {
2828
let worker_thread = std::thread::Builder::new();
29-
let worker_thread = worker_thread.name(format!("tokio-dispatch-{tid}"));
3029
let rx: flume::Receiver<Box<dyn TokioSpawn + Send>> = rx.clone();
3130

3231
worker_thread
3332
.spawn(move || {
3433
// Create a runtime-per-thread
3534
let rt = tokio::runtime::Builder::new_current_thread()
35+
.thread_name(format!("tokio-dispatch-{tid}"))
3636
.enable_all()
3737
.build()
3838
.unwrap_or_else(|e| {

vortex-file/src/read/builder/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ pub struct VortexReadBuilder<R> {
7171
size: Option<u64>,
7272
row_mask: Option<Array>,
7373
row_filter: Option<RowFilter>,
74-
io_dispatcher: Option<IoDispatcher>,
74+
io_dispatcher: Option<Arc<IoDispatcher>>,
7575
}
7676

7777
impl<R: VortexReadAt> VortexReadBuilder<R> {
@@ -112,7 +112,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
112112
self
113113
}
114114

115-
pub fn with_io_dispatcher(mut self, dispatcher: IoDispatcher) -> Self {
115+
pub fn with_io_dispatcher(mut self, dispatcher: Arc<IoDispatcher>) -> Self {
116116
self.io_dispatcher = Some(dispatcher);
117117
self
118118
}
@@ -176,7 +176,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
176176
// Default: fallback to single-threaded tokio dispatcher.
177177
let io_dispatcher = self
178178
.io_dispatcher
179-
.unwrap_or_else(|| IoDispatcher::new_tokio(1));
179+
.unwrap_or_else(|| Arc::new(IoDispatcher::new_tokio(1)));
180180

181181
Ok(VortexFileArrayStream::new(
182182
self.read_at,

vortex-file/src/read/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub struct VortexFileArrayStream<R> {
3535
messages_cache: Arc<RwLock<LayoutMessageCache>>,
3636
state: Option<StreamingState>,
3737
input: R,
38-
dispatcher: IoDispatcher,
38+
dispatcher: Arc<IoDispatcher>,
3939
}
4040

4141
impl<R: VortexReadAt> VortexFileArrayStream<R> {
@@ -48,7 +48,7 @@ impl<R: VortexReadAt> VortexFileArrayStream<R> {
4848
dtype: DType,
4949
row_count: u64,
5050
row_mask: Option<RowMask>,
51-
dispatcher: IoDispatcher,
51+
dispatcher: Arc<IoDispatcher>,
5252
) -> Self {
5353
let mask_iterator = if let Some(fr) = filter_reader {
5454
Box::new(FilteringRowSplitIterator::new(fr, row_count, row_mask)) as MaskIteratorRef

0 commit comments

Comments
 (0)