Skip to content

Commit 332036c

Browse files
authored
layout writer tweaks (#3435)
Signed-off-by: Onur Satici <[email protected]>
1 parent 03260ee commit 332036c

File tree

12 files changed

+186
-176
lines changed

12 files changed

+186
-176
lines changed

vortex-array/src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl ArrayRegistry {
5050
#[derive(Debug, Clone)]
5151
pub struct VTableContext<T>(Arc<RwLock<Vec<T>>>);
5252

53-
impl<T: Clone + Eq + Display> VTableContext<T> {
53+
impl<T: Clone + Eq> VTableContext<T> {
5454
pub fn empty() -> Self {
5555
Self(Arc::new(RwLock::new(Vec::new())))
5656
}

vortex-file/src/strategy.rs

Lines changed: 5 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,19 @@
11
//! This module defines the default layout strategy for a Vortex file.
22
3-
use std::collections::VecDeque;
43
use std::sync::Arc;
54

65
use arcref::ArcRef;
7-
use async_stream::try_stream;
8-
use futures::{FutureExt, StreamExt, pin_mut};
9-
use vortex_array::stats::{PRUNING_STATS, Stat};
10-
use vortex_array::{Array, ArrayContext};
11-
use vortex_btrblocks::BtrBlocksCompressor;
6+
use vortex_array::stats::PRUNING_STATS;
7+
use vortex_layout::LayoutStrategy;
8+
use vortex_layout::layouts::buffered::BufferedStrategy;
129
use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
10+
use vortex_layout::layouts::compressed::BtrBlocksCompressedStrategy;
1311
use vortex_layout::layouts::dict::writer::DictStrategy;
1412
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
1513
use vortex_layout::layouts::repartition::{RepartitionStrategy, RepartitionWriterOptions};
1614
use vortex_layout::layouts::struct_::writer::StructStrategy;
1715
use vortex_layout::layouts::zoned::writer::{ZonedLayoutOptions, ZonedStrategy};
18-
use vortex_layout::scan::{TaskExecutor, TaskExecutorExt};
19-
use vortex_layout::segments::SequenceWriter;
20-
use vortex_layout::{
21-
LayoutStrategy, SendableLayoutWriter, SendableSequentialStream, SequentialStreamAdapter,
22-
SequentialStreamExt as _,
23-
};
16+
use vortex_layout::scan::TaskExecutor;
2417

2518
const ROW_BLOCK_SIZE: usize = 8192;
2619

@@ -93,121 +86,3 @@ impl VortexLayoutStrategy {
9386
fn arcref(item: impl LayoutStrategy) -> ArcRef<dyn LayoutStrategy> {
9487
ArcRef::new_arc(Arc::new(item))
9588
}
96-
97-
/// A layout writer that compresses chunks using a sampling compressor.
98-
struct BtrBlocksCompressedStrategy {
99-
child: ArcRef<dyn LayoutStrategy>,
100-
executor: Arc<dyn TaskExecutor>,
101-
parallelism: usize,
102-
}
103-
104-
impl BtrBlocksCompressedStrategy {
105-
pub fn new(
106-
child: ArcRef<dyn LayoutStrategy>,
107-
executor: Arc<dyn TaskExecutor>,
108-
parallelism: usize,
109-
) -> Self {
110-
Self {
111-
child,
112-
executor,
113-
parallelism,
114-
}
115-
}
116-
}
117-
118-
impl LayoutStrategy for BtrBlocksCompressedStrategy {
119-
fn write_stream(
120-
&self,
121-
ctx: &ArrayContext,
122-
sequence_writer: SequenceWriter,
123-
stream: SendableSequentialStream,
124-
) -> SendableLayoutWriter {
125-
let executor = self.executor.clone();
126-
127-
let dtype = stream.dtype().clone();
128-
let stream = stream
129-
.map(|chunk| {
130-
async {
131-
let (sequence_id, chunk) = chunk?;
132-
// Compute the stats for the chunk prior to compression
133-
chunk
134-
.statistics()
135-
.compute_all(&Stat::all().collect::<Vec<_>>())?;
136-
Ok((sequence_id, BtrBlocksCompressor.compress(&chunk)?))
137-
}
138-
.boxed()
139-
})
140-
.map(move |compress_future| executor.spawn(compress_future))
141-
.buffered(self.parallelism);
142-
143-
self.child.write_stream(
144-
ctx,
145-
sequence_writer,
146-
SequentialStreamAdapter::new(dtype, stream).sendable(),
147-
)
148-
}
149-
}
150-
151-
struct BufferedStrategy {
152-
child: ArcRef<dyn LayoutStrategy>,
153-
buffer_size: u64,
154-
}
155-
156-
impl BufferedStrategy {
157-
pub fn new(child: ArcRef<dyn LayoutStrategy>, buffer_size: u64) -> Self {
158-
Self { child, buffer_size }
159-
}
160-
}
161-
162-
impl LayoutStrategy for BufferedStrategy {
163-
fn write_stream(
164-
&self,
165-
ctx: &ArrayContext,
166-
sequence_writer: SequenceWriter,
167-
stream: SendableSequentialStream,
168-
) -> SendableLayoutWriter {
169-
let dtype = stream.dtype().clone();
170-
let buffer_size = self.buffer_size;
171-
let buffered_stream = try_stream! {
172-
let stream = stream.peekable();
173-
pin_mut!(stream);
174-
175-
let mut nbytes = 0u64;
176-
let mut chunks = VecDeque::new();
177-
178-
while let Some(chunk) = stream.as_mut().next().await {
179-
let (sequence_id, chunk) = chunk?;
180-
nbytes += chunk.nbytes() as u64;
181-
chunks.push_back(chunk);
182-
183-
// if this is the last element, flush everything
184-
if stream.as_mut().peek().await.is_none() {
185-
let mut sequence_pointer = sequence_id.descend();
186-
while let Some(chunk) = chunks.pop_front() {
187-
yield (sequence_pointer.advance(), chunk)
188-
}
189-
break;
190-
}
191-
192-
if nbytes < 2 * buffer_size {
193-
continue;
194-
};
195-
// Wait until we're at 2x the buffer size before flushing 1x the buffer size
196-
// This avoids small tail stragglers being flushed at the end of the file.
197-
let mut sequence_pointer = sequence_id.descend();
198-
while nbytes > buffer_size {
199-
let Some(chunk) = chunks.pop_front() else {
200-
break;
201-
};
202-
nbytes -= chunk.nbytes() as u64;
203-
yield (sequence_pointer.advance(), chunk)
204-
}
205-
}
206-
};
207-
self.child.write_stream(
208-
ctx,
209-
sequence_writer,
210-
SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
211-
)
212-
}
213-
}

vortex-file/src/writer.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::future;
22
use std::sync::Arc;
33

44
use arcref::ArcRef;
5-
use futures::StreamExt;
5+
use futures::TryStreamExt;
66
use futures::future::try_join;
77
use vortex_array::ArrayContext;
88
use vortex_array::stats::{PRUNING_STATS, Stat};
@@ -33,7 +33,7 @@ pub struct VortexWriteOptions {
3333
impl Default for VortexWriteOptions {
3434
fn default() -> Self {
3535
Self {
36-
strategy: VortexLayoutStrategy::with_executor(Arc::new(LocalExecutor {})),
36+
strategy: VortexLayoutStrategy::with_executor(Arc::new(LocalExecutor)),
3737
exclude_dtype: false,
3838
file_statistics: PRUNING_STATS.to_vec(),
3939
max_variable_length_statistics_size: 64,
@@ -77,10 +77,8 @@ impl VortexWriteOptions {
7777
let (segment_writer, flusher) = SerialSegmentWriter::create();
7878
let sequence_writer = SequenceWriter::new(Box::new(segment_writer));
7979

80-
let stream = stream.filter(|item| match item {
81-
Ok(chunk) => future::ready(!chunk.is_empty()),
82-
Err(_) => future::ready(true),
83-
});
80+
let stream = stream.try_filter(|chunk| future::ready(!chunk.is_empty()));
81+
8482
let stream = sequence_writer.new_sequential(ArrayStreamExt::boxed(
8583
ArrayStreamAdapter::new(dtype.clone(), stream),
8684
));
@@ -95,7 +93,7 @@ impl VortexWriteOptions {
9593
let mut write = futures::io::Cursor::new(write);
9694
write.write_all(MAGIC_BYTES).await?;
9795

98-
let io_fut = async { flusher.flush(write).await };
96+
let io_fut = flusher.flush(write);
9997
let compute_fut = self.strategy.write_stream(&ctx, sequence_writer, stream);
10098
let (layout, (mut write, segment_specs)) = try_join(compute_fut, io_fut).await?;
10199

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use std::collections::VecDeque;
2+
3+
use arcref::ArcRef;
4+
use async_stream::try_stream;
5+
use futures::{StreamExt as _, pin_mut};
6+
use vortex_array::ArrayContext;
7+
8+
use crate::segments::SequenceWriter;
9+
use crate::{
10+
LayoutStrategy, SendableLayoutWriter, SendableSequentialStream, SequentialStreamAdapter,
11+
SequentialStreamExt as _,
12+
};
13+
14+
pub struct BufferedStrategy {
15+
child: ArcRef<dyn LayoutStrategy>,
16+
buffer_size: u64,
17+
}
18+
19+
impl BufferedStrategy {
20+
pub fn new(child: ArcRef<dyn LayoutStrategy>, buffer_size: u64) -> Self {
21+
Self { child, buffer_size }
22+
}
23+
}
24+
25+
impl LayoutStrategy for BufferedStrategy {
26+
fn write_stream(
27+
&self,
28+
ctx: &ArrayContext,
29+
sequence_writer: SequenceWriter,
30+
stream: SendableSequentialStream,
31+
) -> SendableLayoutWriter {
32+
let dtype = stream.dtype().clone();
33+
let buffer_size = self.buffer_size;
34+
let buffered_stream = try_stream! {
35+
let stream = stream.peekable();
36+
pin_mut!(stream);
37+
38+
let mut nbytes = 0u64;
39+
let mut chunks = VecDeque::new();
40+
41+
while let Some(chunk) = stream.as_mut().next().await {
42+
let (sequence_id, chunk) = chunk?;
43+
nbytes += chunk.nbytes() as u64;
44+
chunks.push_back(chunk);
45+
46+
// if this is the last element, flush everything
47+
if stream.as_mut().peek().await.is_none() {
48+
let mut sequence_pointer = sequence_id.descend();
49+
while let Some(chunk) = chunks.pop_front() {
50+
yield (sequence_pointer.advance(), chunk)
51+
}
52+
break;
53+
}
54+
55+
if nbytes < 2 * buffer_size {
56+
continue;
57+
};
58+
// Wait until we're at 2x the buffer size before flushing 1x the buffer size
59+
// This avoids small tail stragglers being flushed at the end of the file.
60+
let mut sequence_pointer = sequence_id.descend();
61+
while nbytes > buffer_size {
62+
let Some(chunk) = chunks.pop_front() else {
63+
break;
64+
};
65+
nbytes -= chunk.nbytes() as u64;
66+
yield (sequence_pointer.advance(), chunk)
67+
}
68+
}
69+
};
70+
self.child.write_stream(
71+
ctx,
72+
sequence_writer,
73+
SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
74+
)
75+
}
76+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use std::sync::Arc;
2+
3+
use arcref::ArcRef;
4+
use futures::{FutureExt as _, StreamExt as _};
5+
use vortex_array::ArrayContext;
6+
use vortex_array::stats::Stat;
7+
use vortex_btrblocks::BtrBlocksCompressor;
8+
9+
use crate::scan::{TaskExecutor, TaskExecutorExt as _};
10+
use crate::segments::SequenceWriter;
11+
use crate::{
12+
LayoutStrategy, SendableLayoutWriter, SendableSequentialStream, SequentialStreamAdapter,
13+
SequentialStreamExt as _,
14+
};
15+
16+
/// A layout writer that compresses chunks using a sampling compressor.
17+
pub struct BtrBlocksCompressedStrategy {
18+
child: ArcRef<dyn LayoutStrategy>,
19+
executor: Arc<dyn TaskExecutor>,
20+
parallelism: usize,
21+
}
22+
23+
impl BtrBlocksCompressedStrategy {
24+
pub fn new(
25+
child: ArcRef<dyn LayoutStrategy>,
26+
executor: Arc<dyn TaskExecutor>,
27+
parallelism: usize,
28+
) -> Self {
29+
Self {
30+
child,
31+
executor,
32+
parallelism,
33+
}
34+
}
35+
}
36+
37+
impl LayoutStrategy for BtrBlocksCompressedStrategy {
38+
fn write_stream(
39+
&self,
40+
ctx: &ArrayContext,
41+
sequence_writer: SequenceWriter,
42+
stream: SendableSequentialStream,
43+
) -> SendableLayoutWriter {
44+
let executor = self.executor.clone();
45+
46+
let dtype = stream.dtype().clone();
47+
let stream = stream
48+
.map(|chunk| {
49+
async {
50+
let (sequence_id, chunk) = chunk?;
51+
// Compute the stats for the chunk prior to compression
52+
chunk
53+
.statistics()
54+
.compute_all(&Stat::all().collect::<Vec<_>>())?;
55+
Ok((sequence_id, BtrBlocksCompressor.compress(&chunk)?))
56+
}
57+
.boxed()
58+
})
59+
.map(move |compress_future| executor.spawn(compress_future))
60+
.buffered(self.parallelism);
61+
62+
self.child.write_stream(
63+
ctx,
64+
sequence_writer,
65+
SequentialStreamAdapter::new(dtype, stream).sendable(),
66+
)
67+
}
68+
}

vortex-layout/src/layouts/dict/writer/mod.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use vortex_btrblocks::BtrBlocksCompressor;
1111
use vortex_dict::DictEncoding;
1212
use vortex_dict::builders::{DictConstraints, DictEncoder, dict_encoder};
1313
use vortex_dtype::{DType, PType};
14-
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_err};
14+
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_bail, vortex_err};
1515

1616
use super::DictLayout;
1717
use crate::layouts::chunked::ChunkedLayout;
@@ -203,10 +203,8 @@ impl DictStreamState {
203203
labeler: &mut DictChunkLabeler,
204204
chunk: ArrayRef,
205205
) -> Vec<VortexResult<DictionaryChunk>> {
206-
match self.try_encode(labeler, chunk) {
207-
Ok(chunks) => chunks,
208-
Err(e) => vec![Err(e)],
209-
}
206+
self.try_encode(labeler, chunk)
207+
.unwrap_or_else(|e| vec![Err(e)])
210208
}
211209

212210
fn try_encode(
@@ -304,10 +302,9 @@ impl DictEncodedRuns {
304302

305303
let (values_tx, values_rx) = oneshot::channel();
306304
let values_future = async {
307-
match values_rx.await {
308-
Ok(values) => values,
309-
Err(_) => Err(vortex_err!("sender dropped")),
310-
}
305+
values_rx
306+
.await
307+
.unwrap_or_else(|_| vortex_bail!("sender dropped"))
311308
};
312309

313310
let codes_stream = DictEncodedRunStream {

vortex-layout/src/layouts/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use futures::future::{BoxFuture, Shared};
44
use vortex_array::ArrayRef;
55
use vortex_error::SharedVortexResult;
66

7+
pub mod buffered;
78
pub mod chunked;
9+
pub mod compressed;
810
pub mod dict;
911
pub mod file_stats;
1012
pub mod filter;

0 commit comments

Comments
 (0)