Skip to content

Commit cfc8eb9

Browse files
authored
Add execution concurrency == 2 * io concurrency (#2023)
1 parent 93d658f commit cfc8eb9

File tree

4 files changed

+60
-31
lines changed

4 files changed

+60
-31
lines changed

vortex-file/src/exec/inline.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,32 @@
1+
use std::future::ready;
2+
13
use futures_util::future::BoxFuture;
24
use futures_util::stream::BoxStream;
3-
use futures_util::{FutureExt, StreamExt};
5+
use futures_util::StreamExt;
46
use vortex_array::ArrayData;
57
use vortex_error::VortexResult;
68

79
use crate::exec::ExecDriver;
810

911
/// An [`ExecDriver`] implementation that awaits the futures inline using the caller's runtime.
10-
pub struct InlineDriver;
12+
pub struct InlineDriver {
13+
concurrency: usize,
14+
}
15+
16+
impl InlineDriver {
17+
pub fn with_concurrency(concurrency: usize) -> Self {
18+
Self { concurrency }
19+
}
20+
}
1121

1222
impl ExecDriver for InlineDriver {
1323
fn drive(
1424
&self,
1525
stream: BoxStream<'static, BoxFuture<'static, VortexResult<Option<ArrayData>>>>,
1626
) -> BoxStream<'static, VortexResult<ArrayData>> {
1727
stream
18-
.filter_map(|future| future.map(|result| result.transpose()))
28+
.buffered(self.concurrency)
29+
.filter_map(|result| ready(result.transpose()))
1930
.boxed()
2031
}
2132
}

vortex-file/src/exec/tokio.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,21 @@ use vortex_error::{vortex_err, VortexResult};
1010
use crate::exec::ExecDriver;
1111

1212
/// An [`ExecDriver`] implementation that spawns the futures onto a Tokio runtime.
13-
pub struct TokioDriver(pub Handle);
13+
pub struct TokioDriver {
14+
pub handle: Handle,
15+
pub concurrency: usize,
16+
}
1417

1518
impl ExecDriver for TokioDriver {
1619
fn drive(
1720
&self,
1821
stream: BoxStream<'static, BoxFuture<'static, VortexResult<Option<ArrayData>>>>,
1922
) -> BoxStream<'static, VortexResult<ArrayData>> {
20-
let handle = self.0.clone();
21-
22-
// This is how many file splits to make progress on at once. While I/O is resolving for
23-
// the first, we may as well find out the segments required by the next.
24-
// TODO(ngates): I picked this number somewhat arbitrarily :)
25-
let concurrency = 2 * handle.metrics().num_workers();
23+
let handle = self.handle.clone();
2624

2725
stream
2826
.map(move |future| handle.spawn(future))
29-
.buffered(concurrency)
27+
.buffered(self.concurrency)
3028
.map(|result| match result {
3129
Ok(result) => result,
3230
Err(e) => Err(vortex_err!("Failed to join Tokio result {}", e)),

vortex-file/src/open/exec.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
use std::sync::Arc;
22

3-
#[cfg(feature = "tokio")]
4-
use tokio::runtime::Handle;
5-
63
use crate::exec::inline::InlineDriver;
74
#[cfg(feature = "tokio")]
85
use crate::exec::tokio::TokioDriver;
@@ -19,29 +16,35 @@ pub enum ExecutionMode {
1916
RayonThreadPool(Arc<rayon::ThreadPool>),
2017
/// Spawns the tasks onto a provided Tokio runtime.
2118
#[cfg(feature = "tokio")]
22-
TokioRuntime(Handle),
19+
TokioRuntime(tokio::runtime::Handle),
20+
}
21+
22+
#[allow(clippy::derivable_impls)]
23+
impl Default for ExecutionMode {
24+
fn default() -> Self {
25+
// Default to tokio-specific behavior if its enabled and there's a runtime running.
26+
#[cfg(feature = "tokio")]
27+
if let Ok(h) = tokio::runtime::Handle::try_current() {
28+
return ExecutionMode::TokioRuntime(h);
29+
}
30+
31+
ExecutionMode::Inline
32+
}
2333
}
2434

2535
impl ExecutionMode {
26-
pub fn into_driver(self) -> Arc<dyn ExecDriver> {
36+
pub fn into_driver(self, concurrency: usize) -> Arc<dyn ExecDriver> {
2737
match self {
28-
ExecutionMode::Inline => {
29-
// Default to tokio-specific behavior if its enabled and there's a runtime running.
30-
#[cfg(feature = "tokio")]
31-
match Handle::try_current() {
32-
Ok(h) => Arc::new(TokioDriver(h)),
33-
Err(_) => Arc::new(InlineDriver),
34-
}
35-
36-
#[cfg(not(feature = "tokio"))]
37-
Arc::new(InlineDriver)
38-
}
38+
ExecutionMode::Inline => Arc::new(InlineDriver::with_concurrency(concurrency)),
3939
#[cfg(feature = "rayon")]
4040
ExecutionMode::RayonThreadPool(_) => {
4141
todo!()
4242
}
4343
#[cfg(feature = "tokio")]
44-
ExecutionMode::TokioRuntime(handle) => Arc::new(TokioDriver(handle)),
44+
ExecutionMode::TokioRuntime(handle) => Arc::new(TokioDriver {
45+
handle,
46+
concurrency,
47+
}),
4548
}
4649
}
4750
}

vortex-file/src/open/mod.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub struct VortexOpenOptions {
4242
execution_mode: Option<ExecutionMode>,
4343
// TODO(ngates): allow fully configurable I/O driver.
4444
io_concurrency: usize,
45+
exec_concurrency: Option<usize>,
4546
}
4647

4748
impl VortexOpenOptions {
@@ -56,7 +57,8 @@ impl VortexOpenOptions {
5657
segment_cache: None,
5758
execution_mode: None,
5859
// TODO(ngates): pick some numbers...
59-
io_concurrency: 16,
60+
io_concurrency: 10,
61+
exec_concurrency: None,
6062
}
6163
}
6264

@@ -111,6 +113,21 @@ impl VortexOpenOptions {
111113
self.execution_mode = Some(execution_mode);
112114
self
113115
}
116+
117+
/// Configure the number of concurrent I/O requests.
118+
pub fn with_io_concurrency(mut self, io_concurrency: usize) -> Self {
119+
self.io_concurrency = io_concurrency;
120+
self
121+
}
122+
123+
/// Override the default split-by concurrency.
124+
///
125+
/// It is recommended to use more split-by concurrency than I/O concurrency to ensure there
126+
/// are always I/O operations enqueued.
127+
pub fn with_exec_concurrency(mut self, exec_concurrency: usize) -> Self {
128+
self.exec_concurrency = Some(exec_concurrency);
129+
self
130+
}
114131
}
115132

116133
impl VortexOpenOptions {
@@ -143,8 +160,8 @@ impl VortexOpenOptions {
143160
// Set up the execution driver.
144161
let exec_driver = self
145162
.execution_mode
146-
.unwrap_or(ExecutionMode::Inline)
147-
.into_driver();
163+
.unwrap_or_default()
164+
.into_driver(self.exec_concurrency.unwrap_or(self.io_concurrency * 2));
148165

149166
// Finally, create the VortexFile.
150167
Ok(VortexFile {

0 commit comments

Comments
 (0)