Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vortex-file/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ pub fn vortex_file::WriteStrategyBuilder::build(self) -> alloc::sync::Arc<dyn vo

pub fn vortex_file::WriteStrategyBuilder::with_allow_encodings(self, vortex_utils::aliases::hash_set::HashSet<vortex_array::array::ArrayId>) -> Self

pub fn vortex_file::WriteStrategyBuilder::with_array_tree(self, bool) -> Self

pub fn vortex_file::WriteStrategyBuilder::with_btrblocks_builder(self, vortex_btrblocks::builder::BtrBlocksCompressorBuilder) -> Self

pub fn vortex_file::WriteStrategyBuilder::with_compressor<C: vortex_layout::layouts::compressed::CompressorPlugin>(self, C) -> Self
Expand Down
91 changes: 70 additions & 21 deletions vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use vortex_fastlanes::FoR;
use vortex_fastlanes::RLE;
use vortex_fsst::FSST;
use vortex_layout::LayoutStrategy;
use vortex_layout::layouts::array_tree::writer as array_tree_writer;
use vortex_layout::layouts::buffered::BufferedStrategy;
use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
use vortex_layout::layouts::collect::CollectStrategy;
Expand Down Expand Up @@ -143,6 +144,7 @@ pub struct WriteStrategyBuilder {
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
allow_encodings: Option<HashSet<ArrayId>>,
flat_strategy: Option<Arc<dyn LayoutStrategy>>,
array_tree: bool,
}

impl Default for WriteStrategyBuilder {
Expand All @@ -155,6 +157,7 @@ impl Default for WriteStrategyBuilder {
field_writers: HashMap::new(),
allow_encodings: Some(ALLOWED_ENCODINGS.clone()),
flat_strategy: None,
array_tree: false,
}
}
}
Expand Down Expand Up @@ -187,11 +190,32 @@ impl WriteStrategyBuilder {
///
/// By default, this uses [`FlatLayoutStrategy`]. This can be used to substitute a custom
/// layout strategy, e.g. one that inlines constant array buffers for GPU reads.
///
/// Passing a custom flat strategy implicitly disables the array-tree outlining feature
/// (see [`Self::with_array_tree`]), since the custom strategy owns the leaf format.
pub fn with_flat_strategy(mut self, flat: Arc<dyn LayoutStrategy>) -> Self {
self.flat_strategy = Some(flat);
self
}

/// Enable array-tree outlining: each chunk's encoding tree (without per-chunk statistics)
/// is collected into a single auxiliary segment per column rather than being inlined
/// alongside the chunk's data.
///
/// Disabled by default. When enabled, the written file uses two encodings that older
/// readers will not understand:
/// [`vortex_layout::layouts::array_tree::ArrayTreeFlatLayout`] at the data leaves, and a
/// wrapping [`vortex_layout::layouts::array_tree::ArrayTreeLayout`] that owns the
/// consolidated auxiliary segment. Files written by this builder with the feature on
/// require a reader that recognizes both encodings.
///
/// Has no effect if a custom flat strategy is provided via
/// [`Self::with_flat_strategy`] — the user-supplied leaf format wins.
pub fn with_array_tree(mut self, array_tree: bool) -> Self {
self.array_tree = array_tree;
self
}

/// Override the default [`BtrBlocksCompressorBuilder`] used for compression.
///
/// The builder is finalized during [`build`](Self::build), producing two compressors: one for
Expand All @@ -212,23 +236,17 @@ impl WriteStrategyBuilder {
/// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
/// applied.
pub fn build(self) -> Arc<dyn LayoutStrategy> {
let flat: Arc<dyn LayoutStrategy> = if let Some(flat) = self.flat_strategy {
flat
} else if let Some(allow_encodings) = self.allow_encodings {
Arc::new(FlatLayoutStrategy::default().with_allow_encodings(allow_encodings))
let flat: Arc<dyn LayoutStrategy> = if let Some(flat) = &self.flat_strategy {
Arc::clone(flat)
} else if let Some(allow_encodings) = &self.allow_encodings {
Arc::new(FlatLayoutStrategy::default().with_allow_encodings(allow_encodings.clone()))
} else {
Arc::new(FlatLayoutStrategy::default())
};

// 7. for each chunk create a flat layout
let chunked = ChunkedLayoutStrategy::new(Arc::clone(&flat));
// 6. buffer chunks so they end up with closer segment ids physically
let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB

// 5. compress each chunk.
// Exclude IntDictScheme from the data compressor because DictStrategy (step 3) already
// dictionary-encodes columns. Allowing IntDictScheme here would redundantly
// dictionary-encode the integer codes produced by that earlier step.
// Data compressor: excludes IntDictScheme because DictStrategy (step 3 below) already
// dictionary-encodes columns; allowing it here would redundantly dictionary-encode the
// integer codes produced by that earlier step.
let data_compressor: Arc<dyn CompressorPlugin> = match &self.compressor {
CompressorConfig::BtrBlocks(builder) => Arc::new(
builder
Expand All @@ -238,6 +256,37 @@ impl WriteStrategyBuilder {
),
CompressorConfig::Opaque(compressor) => Arc::clone(compressor),
};
// Stats compressor: used for zone-map tables, dict values, and (when enabled) the
// consolidated array-trees segment.
let stats_compressor: Arc<dyn CompressorPlugin> = match &self.compressor {
CompressorConfig::BtrBlocks(builder) => Arc::new(builder.clone().build()),
CompressorConfig::Opaque(compressor) => Arc::clone(compressor),
};
let compress_then_flat =
CompressingStrategy::new(Arc::clone(&flat), Arc::clone(&stats_compressor));
let compress_then_flat_arc: Arc<dyn LayoutStrategy> = Arc::new(compress_then_flat.clone());

let array_tree_enabled = self.array_tree && self.flat_strategy.is_none();
let (data_leaf, array_tree_collector): (Arc<dyn LayoutStrategy>, _) = if !array_tree_enabled
{
(Arc::clone(&flat), None)
} else {
let data_flat = if let Some(allow_encodings) = &self.allow_encodings {
FlatLayoutStrategy::default().with_allow_encodings(allow_encodings.clone())
} else {
FlatLayoutStrategy::default()
};
let (collector, leaf) =
array_tree_writer::writer(data_flat, Arc::clone(&compress_then_flat_arc));
(Arc::new(leaf), Some(collector))
};

// 7. for each chunk create a flat layout
let chunked = ChunkedLayoutStrategy::new(data_leaf);
// 6. buffer chunks so they end up with closer segment ids physically
let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB

// 5. compress each chunk.
let compressing = CompressingStrategy::new(buffered, data_compressor);

// 4. prior to compression, coalesce up to a minimum size
Expand All @@ -257,13 +306,6 @@ impl WriteStrategyBuilder {
},
);

// 2.1. | 3.1. compress stats tables and dict values.
let stats_compressor: Arc<dyn CompressorPlugin> = match self.compressor {
CompressorConfig::BtrBlocks(builder) => Arc::new(builder.build()),
CompressorConfig::Opaque(compressor) => compressor,
};
let compress_then_flat = CompressingStrategy::new(flat, stats_compressor);

// 3. apply dict encoding or fallback
let dict = DictStrategy::new(
coalescing.clone(),
Expand All @@ -272,9 +314,16 @@ impl WriteStrategyBuilder {
Default::default(),
);

// 2.5. wrap dict in the array-tree collector if outlining is enabled.
let data_pipeline: Arc<dyn LayoutStrategy> = if let Some(collector) = array_tree_collector {
Arc::new(collector.wrap(dict))
} else {
Arc::new(dict)
};

// 2. calculate stats for each row group
let stats = ZonedStrategy::new(
dict,
data_pipeline,
compress_then_flat.clone(),
ZonedLayoutOptions {
block_size: self.row_block_size,
Expand Down
Loading
Loading