Skip to content

Commit daabd21

Browse files
committed
fix: Allow writer options for DataFusion
Signed-off-by: peasee <[email protected]>
1 parent ffd8429 commit daabd21

File tree

3 files changed

+161
-28
lines changed

3 files changed

+161
-28
lines changed

vortex-datafusion/src/persistent/format.rs

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use object_store::{ObjectMeta, ObjectStore};
3232
use vortex::dtype::arrow::FromArrowType;
3333
use vortex::dtype::{DType, Nullability, PType};
3434
use vortex::error::{VortexExpect, VortexResult, vortex_err};
35-
use vortex::file::VORTEX_FILE_EXTENSION;
35+
use vortex::file::{VORTEX_FILE_EXTENSION, VortexWriteOptionsFactory};
3636
use vortex::metrics::VortexMetrics;
3737
use vortex::scalar::Scalar;
3838
use vortex::session::VortexSession;
@@ -50,6 +50,7 @@ pub struct VortexFormat {
5050
session: Arc<VortexSession>,
5151
file_cache: VortexFileCache,
5252
opts: VortexOptions,
53+
write_options_factory: Arc<VortexWriteOptionsFactory>,
5354
}
5455

5556
impl Debug for VortexFormat {
@@ -81,6 +82,7 @@ impl Eq for VortexOptions {}
8182
pub struct VortexFormatFactory {
8283
session: Arc<VortexSession>,
8384
options: Option<VortexOptions>,
85+
write_options_factory: Option<VortexWriteOptionsFactory>,
8486
}
8587

8688
impl GetExt for VortexFormatFactory {
@@ -96,16 +98,7 @@ impl VortexFormatFactory {
9698
Self {
9799
session: Arc::new(VortexSession::default()),
98100
options: None,
99-
}
100-
}
101-
102-
/// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory.
103-
///
104-
/// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`].
105-
pub fn new_with_options(session: Arc<VortexSession>, options: VortexOptions) -> Self {
106-
Self {
107-
session,
108-
options: Some(options),
101+
write_options_factory: None,
109102
}
110103
}
111104

@@ -121,6 +114,23 @@ impl VortexFormatFactory {
121114
self.options = Some(options);
122115
self
123116
}
117+
118+
/// Override the default write options for this factory.
119+
////
120+
/// For example:
121+
/// ```rust
122+
/// use vortex_datafusion::VortexFormatFactory;
123+
/// use vortex::file::VortexWriteOptionsFactory;
124+
///
125+
/// let factory = VortexFormatFactory::new().with_write_options(VortexWriteOptionsFactory::default());
126+
/// ```
127+
pub fn with_write_options_factory(
128+
mut self,
129+
write_options_factory: VortexWriteOptionsFactory,
130+
) -> Self {
131+
self.write_options_factory = Some(write_options_factory);
132+
self
133+
}
124134
}
125135

126136
impl FileFormatFactory for VortexFormatFactory {
@@ -139,10 +149,13 @@ impl FileFormatFactory for VortexFormatFactory {
139149
}
140150
}
141151

142-
Ok(Arc::new(VortexFormat::new_with_options(
143-
self.session.clone(),
144-
opts,
145-
)))
152+
let write_opts = self.write_options_factory.clone().unwrap_or_default();
153+
154+
Ok(Arc::new(
155+
VortexFormat::new(self.session.clone())
156+
.with_options(opts)
157+
.with_write_options_factory(write_opts),
158+
))
146159
}
147160

148161
fn default(&self) -> Arc<dyn FileFormat> {
@@ -163,11 +176,7 @@ impl Default for VortexFormat {
163176
impl VortexFormat {
164177
/// Create a new instance with default options.
165178
pub fn new(session: Arc<VortexSession>) -> Self {
166-
Self::new_with_options(session, VortexOptions::default())
167-
}
168-
169-
/// Creates a new instance with configured by a [`VortexOptions`].
170-
pub fn new_with_options(session: Arc<VortexSession>, opts: VortexOptions) -> Self {
179+
let opts = VortexOptions::default();
171180
Self {
172181
session: session.clone(),
173182
file_cache: VortexFileCache::new(
@@ -176,13 +185,48 @@ impl VortexFormat {
176185
session,
177186
),
178187
opts,
188+
write_options_factory: VortexWriteOptionsFactory::default().into(),
179189
}
180190
}
181191

192+
/// Override the default options for this format.
193+
////
194+
/// For example:
195+
/// ```rust
196+
/// use vortex_datafusion::{VortexFormat, VortexOptions};
197+
///
198+
/// let format = VortexFormat::default().with_options(VortexOptions::default());
199+
/// ```
200+
pub fn with_options(mut self, opts: VortexOptions) -> Self {
201+
self.opts = opts;
202+
self
203+
}
204+
205+
/// Override the default write options for this format.
206+
//// For example:
207+
/// ```rust
208+
/// use vortex_datafusion::VortexFormat;
209+
/// use vortex::file::VortexWriteOptions;
210+
///
211+
/// let format = VortexFormat::default().with_write_options(VortexWriteOptions::default());
212+
/// ```
213+
pub fn with_write_options_factory(
214+
mut self,
215+
write_options_factory: impl Into<Arc<VortexWriteOptionsFactory>>,
216+
) -> Self {
217+
self.write_options_factory = write_options_factory.into();
218+
self
219+
}
220+
182221
/// Return the format specific configuration
183222
pub fn options(&self) -> &VortexOptions {
184223
&self.opts
185224
}
225+
226+
/// Return the write options
227+
pub fn write_options_factory(&self) -> Arc<VortexWriteOptionsFactory> {
228+
Arc::clone(&self.write_options_factory)
229+
}
186230
}
187231

188232
#[async_trait]
@@ -395,7 +439,7 @@ impl FileFormat for VortexFormat {
395439
}
396440

397441
let schema = conf.output_schema().clone();
398-
let sink = Arc::new(VortexSink::new(conf, schema));
442+
let sink = Arc::new(VortexSink::new(conf, schema, self.write_options_factory()));
399443

400444
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
401445
}

vortex-datafusion/src/persistent/sink.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,27 @@ use vortex::arrow::FromArrowArray;
2525
use vortex::dtype::DType;
2626
use vortex::dtype::arrow::FromArrowType;
2727
use vortex::error::VortexResult;
28-
use vortex::file::VortexWriteOptions;
28+
use vortex::file::VortexWriteOptionsFactory;
2929
use vortex::io::{ObjectStoreWriter, VortexWrite};
3030
use vortex::stream::ArrayStreamAdapter;
3131

3232
pub struct VortexSink {
3333
config: FileSinkConfig,
3434
schema: SchemaRef,
35+
write_options_factory: Arc<VortexWriteOptionsFactory>,
3536
}
3637

3738
impl VortexSink {
38-
pub fn new(config: FileSinkConfig, schema: SchemaRef) -> Self {
39-
Self { config, schema }
39+
pub fn new(
40+
config: FileSinkConfig,
41+
schema: SchemaRef,
42+
write_options_factory: Arc<VortexWriteOptionsFactory>,
43+
) -> Self {
44+
Self {
45+
config,
46+
schema,
47+
write_options_factory,
48+
}
4049
}
4150
}
4251

@@ -106,6 +115,7 @@ impl FileSink for VortexSink {
106115
let row_counter = row_counter.clone();
107116
let object_store = object_store.clone();
108117
let writer_schema = get_writer_schema(&self.config);
118+
let write_options_factory = Arc::clone(&self.write_options_factory);
109119
let dtype = DType::from_arrow(writer_schema);
110120

111121
// We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered,
@@ -126,7 +136,8 @@ impl FileSink for VortexSink {
126136
))
127137
})?;
128138

129-
VortexWriteOptions::default()
139+
let write_options = write_options_factory.build();
140+
write_options
130141
.write(&mut sink, stream_adapter)
131142
.await
132143
.map_err(|e| {

vortex-file/src/writer.rs

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,84 @@ use crate::footer::FileStatistics;
2929
use crate::segments::writer::BufferedSegmentSink;
3030
use crate::{Footer, MAGIC_BYTES, WriteStrategyBuilder};
3131

32+
const DEFAULT_EXCLUDE_DTYPE: bool = false;
33+
const DEFAULT_MAX_VARIABLE_LENGTH_STATISTICS_SIZE: usize = 64;
34+
const DEFAULT_FILE_STATISTICS: &[Stat] = PRUNING_STATS;
35+
36+
#[derive(Clone)]
37+
pub struct VortexWriteOptionsFactory {
38+
strategy: Arc<dyn LayoutStrategy>,
39+
exclude_dtype: Option<bool>,
40+
max_variable_length_statistics_size: Option<usize>,
41+
file_statistics: Option<Vec<Stat>>,
42+
}
43+
44+
impl std::fmt::Debug for VortexWriteOptionsFactory {
45+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46+
f.debug_struct("VortexWriteOptions")
47+
.field("exclude_dtype", &self.exclude_dtype)
48+
.field(
49+
"max_variable_length_statistics_size",
50+
&self.max_variable_length_statistics_size,
51+
)
52+
.field("file_statistics", &self.file_statistics)
53+
.finish()
54+
}
55+
}
56+
57+
impl Default for VortexWriteOptionsFactory {
58+
fn default() -> Self {
59+
Self {
60+
strategy: WriteStrategyBuilder::new().build(),
61+
exclude_dtype: None,
62+
max_variable_length_statistics_size: None,
63+
file_statistics: None,
64+
}
65+
}
66+
}
67+
68+
impl VortexWriteOptionsFactory {
69+
pub fn new() -> Self {
70+
Self::default()
71+
}
72+
73+
pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
74+
self.strategy = strategy;
75+
self
76+
}
77+
78+
pub fn exclude_dtype(mut self) -> Self {
79+
self.exclude_dtype = Some(true);
80+
self
81+
}
82+
83+
pub fn with_max_variable_length_statistics_size(mut self, size: usize) -> Self {
84+
self.max_variable_length_statistics_size = Some(size);
85+
self
86+
}
87+
88+
pub fn with_file_statistics(mut self, stats: Vec<Stat>) -> Self {
89+
self.file_statistics = Some(stats);
90+
self
91+
}
92+
93+
pub fn build(&self) -> VortexWriteOptions {
94+
VortexWriteOptions {
95+
strategy: self.strategy.clone(),
96+
exclude_dtype: self.exclude_dtype.clone().unwrap_or(DEFAULT_EXCLUDE_DTYPE),
97+
max_variable_length_statistics_size: self
98+
.max_variable_length_statistics_size
99+
.clone()
100+
.unwrap_or(DEFAULT_MAX_VARIABLE_LENGTH_STATISTICS_SIZE),
101+
file_statistics: self
102+
.file_statistics
103+
.clone()
104+
.unwrap_or_else(|| DEFAULT_FILE_STATISTICS.to_vec()),
105+
handle: None,
106+
}
107+
}
108+
}
109+
32110
/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink that implements [`VortexWrite`].
33111
///
34112
/// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no
@@ -45,9 +123,9 @@ impl Default for VortexWriteOptions {
45123
fn default() -> Self {
46124
Self {
47125
strategy: WriteStrategyBuilder::new().build(),
48-
exclude_dtype: false,
49-
file_statistics: PRUNING_STATS.to_vec(),
50-
max_variable_length_statistics_size: 64,
126+
exclude_dtype: DEFAULT_EXCLUDE_DTYPE,
127+
file_statistics: DEFAULT_FILE_STATISTICS.to_vec(),
128+
max_variable_length_statistics_size: DEFAULT_MAX_VARIABLE_LENGTH_STATISTICS_SIZE,
51129
handle: Handle::find(),
52130
}
53131
}

0 commit comments

Comments
 (0)