Skip to content

Commit 19cc2c1

Browse files
authored
Per VortexReadAt coalescing window (#2241)
Introduces a new `PerformanceHint` struct that can have more knobs in the future, and configure it for each `VortexReadAt` implementation we have.
1 parent f97dfc5 commit 19cc2c1

File tree

4 files changed

+56
-11
lines changed

4 files changed

+56
-11
lines changed

vortex-file/src/io/file.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ impl<R: VortexReadAt> IoDriver for FileIoDriver<R> {
6363
stream: impl Stream<Item = SegmentRequest> + 'static,
6464
) -> impl Stream<Item = VortexResult<()>> + 'static {
6565
// We map the segment requests to their respective locations within the file.
66+
let coalescing_window = self.read.performance_hint().coalescing_window();
6667
let segment_map = self.file_layout.segment_map().clone();
6768
let stream = stream.filter_map(move |request| {
6869
let segment_map = segment_map.clone();
@@ -127,7 +128,9 @@ impl<R: VortexReadAt> IoDriver for FileIoDriver<R> {
127128
.inspect(|requests| log::debug!("Processing {} segment requests", requests.len()));
128129

129130
// Coalesce the segment requests to minimize the number of I/O operations.
130-
let stream = stream.map(coalesce).flat_map(stream::iter);
131+
let stream = stream
132+
.map(move |r| coalesce(r, coalescing_window))
133+
.flat_map(stream::iter);
131134

132135
// Submit the coalesced requests to the I/O.
133136
let read = self.read.clone();
@@ -221,13 +224,15 @@ async fn evaluate<R: VortexReadAt>(
221224
}
222225

223226
/// TODO(ngates): outsource coalescing to a trait
224-
fn coalesce(requests: Vec<FileSegmentRequest>) -> Vec<CoalescedSegmentRequest> {
225-
const COALESCE: u64 = 1024 * 1024; // 1MB
227+
fn coalesce(
228+
requests: Vec<FileSegmentRequest>,
229+
coalescing_window: u64,
230+
) -> Vec<CoalescedSegmentRequest> {
226231
let fetch_ranges = merge_ranges(
227232
requests
228233
.iter()
229234
.map(|r| r.location.offset..r.location.offset + r.location.length as u64),
230-
COALESCE,
235+
coalescing_window,
231236
);
232237
let mut coalesced = fetch_ranges
233238
.iter()
@@ -239,7 +244,7 @@ fn coalesce(requests: Vec<FileSegmentRequest>) -> Vec<CoalescedSegmentRequest> {
239244

240245
for req in requests {
241246
let idx = fetch_ranges.partition_point(|v| v.start <= req.location.offset) - 1;
242-
coalesced.as_mut_slice()[idx].requests.push(req);
247+
coalesced[idx].requests.push(req);
243248
}
244249

245250
// Ensure we sort the requests by segment ID within the coalesced request.

vortex-io/src/offset.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::io;
44
use bytes::Bytes;
55
use futures::FutureExt;
66

7-
use crate::VortexReadAt;
7+
use crate::{PerformanceHint, VortexReadAt};
88

99
/// An adapter that offsets all reads by a fixed amount.
1010
pub struct OffsetReadAt<R> {
@@ -39,7 +39,7 @@ impl<R: VortexReadAt> VortexReadAt for OffsetReadAt<R> {
3939
self.read.read_byte_range(pos + self.offset, len)
4040
}
4141

42-
fn performance_hint(&self) -> usize {
42+
fn performance_hint(&self) -> PerformanceHint {
4343
self.read.performance_hint()
4444
}
4545

vortex-io/src/read.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ pub trait VortexReadAt: Send + Sync + Clone + 'static {
3030

3131
// TODO(ngates): the read implementation should be able to hint at its latency/throughput
3232
// allowing the caller to make better decisions about how to coalesce reads.
33-
fn performance_hint(&self) -> usize {
34-
0
33+
fn performance_hint(&self) -> PerformanceHint {
34+
PerformanceHint::default()
3535
}
3636

3737
/// Asynchronously get the number of bytes of data readable.
@@ -41,6 +41,34 @@ pub trait VortexReadAt: Send + Sync + Clone + 'static {
4141
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static;
4242
}
4343

44+
pub struct PerformanceHint {
45+
coalescing_window: u64,
46+
}
47+
48+
impl Default for PerformanceHint {
49+
fn default() -> Self {
50+
Self {
51+
coalescing_window: 2 << 20, //1MB,
52+
}
53+
}
54+
}
55+
56+
impl PerformanceHint {
57+
pub fn new(coalescing_window: u64) -> Self {
58+
Self { coalescing_window }
59+
}
60+
61+
/// Creates a new instance with a profile appropriate for fast local storage, like memory or files on NVMe devices.
62+
pub fn local() -> Self {
63+
Self::new(0)
64+
}
65+
66+
/// The maximum distance between two reads that should coalesced into a single operation.
67+
pub fn coalescing_window(&self) -> u64 {
68+
self.coalescing_window
69+
}
70+
}
71+
4472
impl<T: VortexReadAt> VortexReadAt for Arc<T> {
4573
fn read_byte_range(
4674
&self,
@@ -50,7 +78,7 @@ impl<T: VortexReadAt> VortexReadAt for Arc<T> {
5078
T::read_byte_range(self, pos, len)
5179
}
5280

53-
fn performance_hint(&self) -> usize {
81+
fn performance_hint(&self) -> PerformanceHint {
5482
T::performance_hint(self)
5583
}
5684

@@ -79,6 +107,10 @@ impl VortexReadAt for Bytes {
79107
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
80108
ready(Ok(self.len() as u64))
81109
}
110+
111+
fn performance_hint(&self) -> PerformanceHint {
112+
PerformanceHint::local()
113+
}
82114
}
83115

84116
impl VortexReadAt for ByteBuffer {
@@ -98,6 +130,10 @@ impl VortexReadAt for ByteBuffer {
98130
ready(Ok(self.slice(read_start..read_end).into_inner()))
99131
}
100132

133+
fn performance_hint(&self) -> PerformanceHint {
134+
PerformanceHint::local()
135+
}
136+
101137
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
102138
ready(Ok(self.len() as u64))
103139
}

vortex-io/src/tokio.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use bytes::{Bytes, BytesMut};
1010
use tokio::io::{AsyncWrite, AsyncWriteExt};
1111
use vortex_error::VortexUnwrap;
1212

13-
use crate::{IoBuf, VortexReadAt, VortexWrite};
13+
use crate::{IoBuf, PerformanceHint, VortexReadAt, VortexWrite};
1414

1515
pub struct TokioAdapter<IO>(pub IO);
1616

@@ -81,6 +81,10 @@ impl VortexReadAt for TokioFile {
8181

8282
async move { this.metadata().map(|metadata| metadata.len()) }
8383
}
84+
85+
fn performance_hint(&self) -> PerformanceHint {
86+
PerformanceHint::local()
87+
}
8488
}
8589

8690
impl VortexWrite for tokio::fs::File {

0 commit comments

Comments
 (0)