Skip to content

Commit 13609e9

Browse files
authored
Improve write strategy (#2783)
* Introducing a separate flush stage allows us to ensure stats tables are written at the end of the file. * We buffer each column to help keep chunks from the same column next to each other, within some memory budget. Currently this is naive per-column memory, not amortized over the size of the schema. Note: this isn't a break, but we should re-generate S3 files
1 parent 345b67c commit 13609e9

File tree

9 files changed

+116
-6
lines changed

9 files changed

+116
-6
lines changed

vortex-file/src/strategy.rs

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
use std::sync::Arc;
44

55
use vortex_array::arcref::ArcRef;
6+
use vortex_array::nbytes::NBytes;
67
use vortex_array::stats::{PRUNING_STATS, STATS_TO_WRITE};
78
use vortex_array::{Array, ArrayContext, ArrayRef};
89
use vortex_btrblocks::BtrBlocksCompressor;
910
use vortex_dtype::DType;
1011
use vortex_error::VortexResult;
1112
use vortex_layout::layouts::chunked::writer::{ChunkedLayoutOptions, ChunkedLayoutWriter};
1213
use vortex_layout::layouts::flat::FlatLayout;
13-
use vortex_layout::layouts::flat::writer::FlatLayoutOptions;
1414
use vortex_layout::layouts::stats::writer::{StatsLayoutOptions, StatsLayoutWriter};
1515
use vortex_layout::layouts::struct_::writer::StructLayoutWriter;
1616
use vortex_layout::segments::SegmentWriter;
@@ -32,13 +32,24 @@ impl LayoutStrategy for VortexLayoutStrategy {
3232
);
3333
}
3434

35-
// Otherwise, we finish with compressing the chunks
35+
// Leaf arrays are written as flat arrays, above which, we buffer up to 16MB to try to keep
36+
// chunks for the same column next to each other, within some reasonable write-time memory
37+
// buffering limit.
38+
let writer: ArcRef<dyn LayoutStrategy> = ArcRef::new_arc(Arc::new(BufferedStrategy {
39+
child: ArcRef::new_ref(&FlatLayout),
40+
// Buffer 4MB of compressed data per column before writing the chunks consecutively.
41+
// TODO(ngates): this should really be amortized by the number of fields? Maybe the
42+
// strategy could keep track of how many writers were created?
43+
buffer_size: 4 << 20, // 4 MB
44+
}) as _);
45+
46+
// Compress each chunk with btrblocks.
3647
let writer = BtrBlocksCompressedWriter {
3748
child: ChunkedLayoutWriter::new(
3849
ctx.clone(),
3950
&DType::Null,
4051
ChunkedLayoutOptions {
41-
chunk_strategy: ArcRef::new_arc(Arc::new(FlatLayoutOptions::default()) as _),
52+
chunk_strategy: writer,
4253
},
4354
)
4455
.boxed(),
@@ -117,6 +128,64 @@ impl LayoutWriter for BtrBlocksCompressedWriter {
117128
self.child.push_chunk(segment_writer, compressed)
118129
}
119130

131+
fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
132+
self.child.flush(segment_writer)
133+
}
134+
135+
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
136+
self.child.finish(segment_writer)
137+
}
138+
}
139+
140+
struct BufferedStrategy {
141+
child: ArcRef<dyn LayoutStrategy>,
142+
buffer_size: u64,
143+
}
144+
145+
impl LayoutStrategy for BufferedStrategy {
146+
fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
147+
let child = self.child.new_writer(ctx, dtype)?;
148+
Ok(BufferedWriter {
149+
chunks: Vec::new(),
150+
nbytes: 0,
151+
buffer_size: self.buffer_size,
152+
child,
153+
}
154+
.boxed())
155+
}
156+
}
157+
158+
struct BufferedWriter {
159+
chunks: Vec<ArrayRef>,
160+
nbytes: u64,
161+
buffer_size: u64,
162+
child: Box<dyn LayoutWriter>,
163+
}
164+
165+
impl LayoutWriter for BufferedWriter {
166+
fn push_chunk(
167+
&mut self,
168+
segment_writer: &mut dyn SegmentWriter,
169+
chunk: ArrayRef,
170+
) -> VortexResult<()> {
171+
self.nbytes += chunk.nbytes() as u64;
172+
self.chunks.push(chunk);
173+
if self.nbytes >= self.buffer_size {
174+
for chunk in self.chunks.drain(..) {
175+
self.child.push_chunk(segment_writer, chunk)?;
176+
}
177+
self.nbytes = 0;
178+
}
179+
Ok(())
180+
}
181+
182+
fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
183+
for chunk in self.chunks.drain(..) {
184+
self.child.push_chunk(segment_writer, chunk)?;
185+
}
186+
self.child.flush(segment_writer)
187+
}
188+
120189
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
121190
self.child.finish(segment_writer)
122191
}

vortex-file/src/writer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ impl VortexWriteOptions {
8888
}
8989

9090
// Flush the final layout messages into the file
91+
layout_writer.flush(&mut segment_writer)?;
92+
segment_writer
93+
.flush_async(&mut write, &mut segment_map)
94+
.await?;
95+
96+
// Finish the layouts and flush the finishing messages into the file
9197
let layout = layout_writer.finish(&mut segment_writer)?;
9298
segment_writer
9399
.flush_async(&mut write, &mut segment_map)

vortex-layout/src/layouts/chunked/writer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,16 @@ impl LayoutWriter for ChunkedLayoutWriter {
6262
.chunk_strategy
6363
.new_writer(&self.ctx, chunk.dtype())?;
6464
chunk_writer.push_chunk(segment_writer, chunk)?;
65+
chunk_writer.flush(segment_writer)?;
6566
self.chunks.push(chunk_writer);
6667

6768
Ok(())
6869
}
6970

71+
fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
72+
Ok(())
73+
}
74+
7075
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
7176
// Call finish on each chunk's writer
7277
let mut children = vec![];

vortex-layout/src/layouts/flat/writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ impl LayoutWriter for FlatLayoutWriter {
9494
Ok(())
9595
}
9696

97+
fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
98+
Ok(())
99+
}
100+
97101
fn finish(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
98102
self.layout
99103
.take()

vortex-layout/src/layouts/stats/writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ impl LayoutWriter for StatsLayoutWriter {
9696
self.child_writer.push_chunk(segment_writer, chunk)
9797
}
9898

99+
fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
100+
self.child_writer.flush(segment_writer)
101+
}
102+
99103
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
100104
let child = self.child_writer.finish(segment_writer)?;
101105

vortex-layout/src/layouts/struct_/writer.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ impl LayoutWriter for StructLayoutWriter {
9393
Ok(())
9494
}
9595

96+
fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
97+
for writer in self.column_strategies.iter_mut() {
98+
writer.flush(segment_writer)?;
99+
}
100+
Ok(())
101+
}
102+
96103
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
97104
let mut column_layouts = vec![];
98105
for writer in self.column_strategies.iter_mut() {

vortex-layout/src/stats.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ impl LayoutWriter for FileStatsLayoutWriter {
7676
self.inner.push_chunk(segment_writer, chunk)
7777
}
7878

79+
fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
80+
self.inner.flush(segment_writer)
81+
}
82+
7983
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
8084
self.inner.finish(segment_writer)
8185
}

vortex-layout/src/writer.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,17 @@ use crate::segments::SegmentWriter;
77
/// A strategy for writing chunks of an array into a layout.
88
// [layout writer]
99
pub trait LayoutWriter: Send {
10+
/// Push a chunk into the layout writer.
1011
fn push_chunk(
1112
&mut self,
1213
segment_writer: &mut dyn SegmentWriter,
1314
chunk: ArrayRef,
1415
) -> VortexResult<()>;
1516

17+
/// Flush any buffered chunks.
18+
fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()>;
19+
20+
/// Write any final data (e.g. stats) and return the finished [`Layout`].
1621
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout>;
1722
}
1823
// [layout writer]
@@ -33,6 +38,7 @@ pub trait LayoutWriterExt: LayoutWriter {
3338
chunk: ArrayRef,
3439
) -> VortexResult<Layout> {
3540
self.push_chunk(segment_writer, chunk)?;
41+
self.flush(segment_writer)?;
3642
self.finish(segment_writer)
3743
}
3844

@@ -46,6 +52,7 @@ pub trait LayoutWriterExt: LayoutWriter {
4652
for chunk in iter.into_iter() {
4753
self.push_chunk(segment_writer, chunk?)?
4854
}
55+
self.flush(segment_writer)?;
4956
self.finish(segment_writer)
5057
}
5158
}

vortex-layout/src/writers/repartition.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl RepartitionWriter {
4646
}
4747
}
4848

49-
fn flush(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<()> {
49+
fn maybe_flush_chunk(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<()> {
5050
if self.nbytes >= self.options.block_size_minimum {
5151
let nblocks = self.row_count / self.options.block_len_multiple;
5252

@@ -112,18 +112,22 @@ impl LayoutWriter for RepartitionWriter {
112112
self.chunks.push_back(c);
113113
offset = end;
114114

115-
self.flush(segment_writer)?;
115+
self.maybe_flush_chunk(segment_writer)?;
116116
}
117117

118118
Ok(())
119119
}
120120

121-
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
121+
fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
122122
let chunk =
123123
ChunkedArray::new_unchecked(self.chunks.drain(..).collect(), self.dtype.clone())
124124
.to_canonical()?
125125
.into_array();
126126
self.writer.push_chunk(segment_writer, chunk)?;
127+
self.writer.flush(segment_writer)
128+
}
129+
130+
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
127131
self.writer.finish(segment_writer)
128132
}
129133
}

0 commit comments

Comments
 (0)