Skip to content

Commit 1847138

Browse files
authored
Add flush after writing file and add some docs to file writepath (#2922)
1. Make `VortexWriteOptions::write` flush the internal writer before returning it (especially important for tokio and `ObjectStore` writes, as pointed out by @a10y in #2921). 2. Remove the unused `TokioAdapter` wrapper struct. 3. Adds some docs
1 parent 61340d3 commit 1847138

File tree

3 files changed

+9
-21
lines changed

3 files changed

+9
-21
lines changed

vortex-datafusion/src/persistent/sink.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ impl FileSink for VortexSink {
106106
.write(writer, stream_adapter)
107107
.await?;
108108

109-
writer.flush().await?;
110109
writer.shutdown().await?;
111110
}
112111

vortex-file/src/writer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ use crate::segments::writer::BufferedSegmentWriter;
1313
use crate::strategy::VortexLayoutStrategy;
1414
use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};
1515

16+
/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink that implements [`VortexWrite`].
17+
///
18+
/// By default, the [`LayoutStrategy`] will be the [`VortexLayoutStrategy`], which includes re-chunking and will also
19+
/// uncompress all data back to its canonical form before compressing it using the [`BtrBlocksCompressor`](vortex_btrblocks::BtrBlocksCompressor).
1620
pub struct VortexWriteOptions {
1721
strategy: Box<dyn LayoutStrategy>,
1822
exclude_dtype: bool,
@@ -147,6 +151,8 @@ impl VortexWriteOptions {
147151
eof[4..8].copy_from_slice(&MAGIC_BYTES);
148152
write.write_all(eof).await?;
149153

154+
write.flush().await?;
155+
150156
Ok(write.into_inner())
151157
}
152158

vortex-io/src/tokio.rs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,35 +5,18 @@ use std::os::unix::fs::FileExt;
55
use std::path::Path;
66
use std::sync::Arc;
77

8-
use tokio::io::{AsyncWrite, AsyncWriteExt};
8+
use tokio::io::AsyncWriteExt;
99
use tokio::task::spawn_blocking;
1010
use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
1111
use vortex_error::VortexExpect;
1212

1313
use crate::{IoBuf, PerformanceHint, VortexReadAt, VortexWrite};
1414

15-
pub struct TokioAdapter<IO>(pub IO);
16-
17-
impl<W: AsyncWrite + Unpin> VortexWrite for TokioAdapter<W> {
18-
async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
19-
self.0.write_all(buffer.as_slice()).await?;
20-
Ok(buffer)
21-
}
22-
23-
async fn flush(&mut self) -> io::Result<()> {
24-
self.0.flush().await
25-
}
26-
27-
async fn shutdown(&mut self) -> io::Result<()> {
28-
self.0.shutdown().await
29-
}
30-
}
31-
3215
/// A cheaply cloneable, readonly file that executes operations
3316
/// on a tokio blocking threadpool.
3417
///
35-
/// We use this because the builtin tokio `File` type is not `Clone` and
36-
/// also does actually implement a `read_exact_at` operation.
18+
/// We use this because tokio's [`File`](tokio::fs::File) type is not `Clone` and
19+
/// also does not implement a `read_exact_at` operation.
3720
#[derive(Debug, Clone)]
3821
pub struct TokioFile(Arc<File>);
3922

0 commit comments

Comments
 (0)