Skip to content

Commit 99eb574

Browse files
authored
Support opening Vortex files without I/O (#1920)
Part of #1676
1 parent 0c2b023 commit 99eb574

File tree

1 file changed

+87
-55
lines changed

1 file changed

+87
-55
lines changed

vortex-file/src/v2/open/mod.rs

Lines changed: 87 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ pub struct VortexOpenOptions {
3232
layout_ctx: LayoutContextRef,
3333
/// An optional, externally provided, file layout.
3434
file_layout: Option<FileLayout>,
35-
/// An optional, externally provided, dtype.
36-
dtype: Option<DType>,
35+
/// An optional, externally provided, file size.
36+
file_size: Option<u64>,
3737
// TODO(ngates): also support a messages_middleware that can wrap a message cache to provide
3838
// additional caching, metrics, or other intercepts, etc. It should support synchronous
3939
// read + write of Map<MessageId, ByteBuffer> or similar.
@@ -51,7 +51,7 @@ impl VortexOpenOptions {
5151
ctx,
5252
layout_ctx: LayoutContextRef::default(),
5353
file_layout: None,
54-
dtype: None,
54+
file_size: None,
5555
initial_read_size: INITIAL_READ_SIZE,
5656
split_by: SplitBy::Layout,
5757
segment_cache: None,
@@ -60,6 +60,24 @@ impl VortexOpenOptions {
6060
}
6161
}
6262

63+
/// Configure a known file layout.
64+
///
65+
/// If this is provided, then the Vortex file can be opened without performing any I/O.
66+
/// Once open, the [`FileLayout`] can be accessed via [`VortexFile::file_layout`].
67+
pub fn with_file_layout(mut self, file_layout: FileLayout) -> Self {
68+
self.file_layout = Some(file_layout);
69+
self
70+
}
71+
72+
/// Configure a known file size.
73+
///
74+
/// This helps to prevent an I/O request to discover the size of the file.
75+
/// Of course, all bets are off if you pass an incorrect value.
76+
pub fn with_file_size(mut self, file_size: u64) -> Self {
77+
self.file_size = Some(file_size);
78+
self
79+
}
80+
6381
/// Configure the initial read size for the Vortex file.
6482
pub fn with_initial_read_size(mut self, initial_read_size: u64) -> VortexResult<Self> {
6583
if self.initial_read_size < u16::MAX as u64 {
@@ -91,9 +109,60 @@ impl VortexOpenOptions {
91109

92110
impl VortexOpenOptions {
93111
/// Open the Vortex file using asynchronous IO.
94-
pub async fn open<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile<FileIoDriver<R>>> {
112+
pub async fn open<R: VortexReadAt>(
113+
mut self,
114+
read: R,
115+
) -> VortexResult<VortexFile<FileIoDriver<R>>> {
116+
// Set up our segment cache.
117+
let segment_cache = self
118+
.segment_cache
119+
.as_ref()
120+
.cloned()
121+
.unwrap_or_else(|| Arc::new(InMemorySegmentCache::default()));
122+
123+
// If we need to read the file layout, then do so.
124+
let file_layout = match self.file_layout.take() {
125+
None => self.read_file_layout(&read, segment_cache.as_ref()).await?,
126+
Some(file_layout) => file_layout,
127+
};
128+
129+
// Set up the I/O driver.
130+
let io_driver = FileIoDriver {
131+
read,
132+
file_layout: file_layout.clone(),
133+
concurrency: self.io_concurrency,
134+
};
135+
136+
// Set up the execution driver.
137+
let exec_driver = self
138+
.execution_mode
139+
.unwrap_or(ExecutionMode::Inline)
140+
.into_driver();
141+
142+
// Compute the splits of the file.
143+
let splits = self.split_by.splits(&file_layout.root_layout)?.into();
144+
145+
// Finally, create the VortexFile.
146+
Ok(VortexFile {
147+
ctx: self.ctx.clone(),
148+
file_layout,
149+
io_driver,
150+
exec_driver,
151+
splits,
152+
})
153+
}
154+
155+
/// Read the [`FileLayout`] from the file.
156+
async fn read_file_layout<R: VortexReadAt>(
157+
&self,
158+
read: &R,
159+
segment_cache: &dyn SegmentCache,
160+
) -> VortexResult<FileLayout> {
95161
// Fetch the file size and perform the initial read.
96-
let file_size = read.size().await?;
162+
let file_size = match self.file_size {
163+
None => read.size().await?,
164+
Some(file_size) => file_size,
165+
};
97166
let initial_read_size = self.initial_read_size.min(file_size);
98167
let initial_offset = file_size - initial_read_size;
99168
let initial_read: ByteBuffer = read
@@ -105,11 +174,10 @@ impl VortexOpenOptions {
105174
let postscript = self.parse_postscript(&initial_read)?;
106175

107176
// Check if we need to read more bytes.
108-
// NOTE(ngates): for now, we assume the dtype and layout segments are adjacent.
109-
let (initial_offset, initial_read) = if (self.dtype.is_none()
110-
&& postscript.dtype.offset < initial_offset)
111-
|| (self.file_layout.is_none() && postscript.file_layout.offset < initial_offset)
177+
let (initial_offset, initial_read) = if (postscript.dtype.offset < initial_offset)
178+
|| (postscript.file_layout.offset < initial_offset)
112179
{
180+
// NOTE(ngates): for now, we assume the dtype and layout segments are adjacent.
113181
let offset = postscript.dtype.offset.min(postscript.file_layout.offset);
114182
let mut new_initial_read =
115183
ByteBufferMut::with_capacity(usize::try_from(file_size - offset)?);
@@ -125,10 +193,9 @@ impl VortexOpenOptions {
125193
};
126194

127195
// Now we try to read the DType and Layout segments.
128-
let dtype = self.dtype.clone().unwrap_or_else(|| {
129-
self.parse_dtype(initial_offset, &initial_read, postscript.dtype)
130-
.vortex_expect("Failed to parse dtype")
131-
});
196+
let dtype = self
197+
.parse_dtype(initial_offset, &initial_read, postscript.dtype)
198+
.vortex_expect("Failed to parse dtype");
132199
let file_layout = self.file_layout.clone().unwrap_or_else(|| {
133200
self.parse_file_layout(
134201
initial_offset,
@@ -139,44 +206,12 @@ impl VortexOpenOptions {
139206
.vortex_expect("Failed to parse file layout")
140207
});
141208

142-
// Set up our segment cache and for good measure, we populate any segments that were
143-
// covered by the initial read.
144-
let segment_cache = self
145-
.segment_cache
146-
.as_ref()
147-
.cloned()
148-
.unwrap_or_else(|| Arc::new(InMemorySegmentCache::default()));
149-
self.populate_segments(
150-
initial_offset,
151-
&initial_read,
152-
&file_layout,
153-
segment_cache.as_ref(),
154-
)?;
155-
156-
// Set up the I/O driver.
157-
let io_driver = FileIoDriver {
158-
read,
159-
file_layout: file_layout.clone(),
160-
concurrency: self.io_concurrency,
161-
};
162-
163-
// Set up the execution driver.
164-
let exec_driver = self
165-
.execution_mode
166-
.unwrap_or(ExecutionMode::Inline)
167-
.into_driver();
209+
// If the initial read happened to cover any segments, then we can populate the
210+
// segment cache
211+
self.populate_segments(initial_offset, &initial_read, &file_layout, segment_cache)
212+
.await?;
168213

169-
// Compute the splits of the file.
170-
let splits = self.split_by.splits(&file_layout.root_layout)?.into();
171-
172-
// Finally, create the VortexFile.
173-
Ok(VortexFile {
174-
ctx: self.ctx.clone(),
175-
file_layout,
176-
io_driver,
177-
exec_driver,
178-
splits,
179-
})
214+
Ok(file_layout)
180215
}
181216

182217
/// Parse the postscript from the initial read.
@@ -268,8 +303,7 @@ impl VortexOpenOptions {
268303
}
269304

270305
/// Populate segments in the cache that were covered by the initial read.
271-
#[allow(unused_variables)]
272-
fn populate_segments(
306+
async fn populate_segments(
273307
&self,
274308
initial_offset: u64,
275309
initial_read: &ByteBuffer,
@@ -285,9 +319,7 @@ impl VortexOpenOptions {
285319
let offset = usize::try_from(segment.offset - initial_offset)?;
286320
let buffer = initial_read.slice(offset..offset + (segment.length as usize));
287321

288-
// FIXME(ngates): how should we write into the segment cache? Feels like it should be
289-
// non-blocking and on some other thread?
290-
// segments.put(segment_id, buffer)?;
322+
segments.put(segment_id, buffer).await?;
291323
}
292324
Ok(())
293325
}

0 commit comments

Comments
 (0)