Skip to content

Commit f961dc9

Browse files
Merge branch 'main' into add-filterexec-with-projection
2 parents aa4a892 + 418f62a commit f961dc9

File tree

34 files changed

+1112
-113
lines changed

34 files changed

+1112
-113
lines changed

Cargo.lock

Lines changed: 24 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,7 @@ sqlparser = { version = "0.59.0", default-features = false, features = ["std", "
185185
strum = "0.27.2"
186186
strum_macros = "0.27.2"
187187
tempfile = "3"
188-
testcontainers = { version = "0.25.2", features = ["default"] }
189-
testcontainers-modules = { version = "0.13" }
188+
testcontainers-modules = { version = "0.14" }
190189
tokio = { version = "1.48", features = ["macros", "rt", "sync"] }
191190
url = "2.5.7"
192191
zstd = { version = "0.13", default-features = false }

datafusion-cli/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,4 @@ ctor = { workspace = true }
7474
insta = { workspace = true }
7575
insta-cmd = "0.6.0"
7676
rstest = { workspace = true }
77-
testcontainers = { workspace = true }
7877
testcontainers-modules = { workspace = true, features = ["minio"] }

datafusion-cli/tests/cli_integration.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ use insta::{Settings, glob};
2424
use insta_cmd::{assert_cmd_snapshot, get_cargo_bin};
2525
use std::path::PathBuf;
2626
use std::{env, fs};
27-
use testcontainers::core::{CmdWaitFor, ExecCommand, Mount};
28-
use testcontainers::runners::AsyncRunner;
29-
use testcontainers::{ContainerAsync, ImageExt, TestcontainersError};
3027
use testcontainers_modules::minio;
28+
use testcontainers_modules::testcontainers::core::{CmdWaitFor, ExecCommand, Mount};
29+
use testcontainers_modules::testcontainers::runners::AsyncRunner;
30+
use testcontainers_modules::testcontainers::{
31+
ContainerAsync, ImageExt, TestcontainersError,
32+
};
3133

3234
fn cli() -> Command {
3335
Command::new(get_cargo_bin("datafusion-cli"))

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 6 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,12 @@ use std::{
8383
any::Any,
8484
fmt::{self, Debug, Formatter},
8585
hash::{Hash, Hasher},
86-
ops::{Add, Div, Mul, Sub},
8786
pin::Pin,
88-
str::FromStr,
8987
sync::Arc,
9088
task::{Context, Poll},
9189
};
9290

91+
use arrow::datatypes::{Float64Type, Int64Type};
9392
use arrow::{
9493
array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
9594
compute,
@@ -102,6 +101,7 @@ use futures::{
102101
use rand::{Rng, SeedableRng, rngs::StdRng};
103102
use tonic::async_trait;
104103

104+
use datafusion::optimizer::simplify_expressions::simplify_literal::parse_literal;
105105
use datafusion::{
106106
execution::{
107107
RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder,
@@ -410,11 +410,12 @@ impl RelationPlanner for TableSamplePlanner {
410410
"TABLESAMPLE requires a quantity (percentage, fraction, or row count)"
411411
);
412412
};
413+
let quantity_value_expr = context.sql_to_expr(quantity.value, input.schema())?;
413414

414415
match quantity.unit {
415416
// TABLESAMPLE (N ROWS) - exact row limit
416417
Some(TableSampleUnit::Rows) => {
417-
let rows = parse_quantity::<i64>(&quantity.value)?;
418+
let rows: i64 = parse_literal::<Int64Type>(&quantity_value_expr)?;
418419
if rows < 0 {
419420
return plan_err!("row count must be non-negative, got {}", rows);
420421
}
@@ -426,15 +427,15 @@ impl RelationPlanner for TableSamplePlanner {
426427

427428
// TABLESAMPLE (N PERCENT) - percentage sampling
428429
Some(TableSampleUnit::Percent) => {
429-
let percent = parse_quantity::<f64>(&quantity.value)?;
430+
let percent: f64 = parse_literal::<Float64Type>(&quantity_value_expr)?;
430431
let fraction = percent / 100.0;
431432
let plan = TableSamplePlanNode::new(input, fraction, seed).into_plan();
432433
Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
433434
}
434435

435436
// TABLESAMPLE (N) - fraction if <1.0, row limit if >=1.0
436437
None => {
437-
let value = parse_quantity::<f64>(&quantity.value)?;
438+
let value = parse_literal::<Float64Type>(&quantity_value_expr)?;
438439
if value < 0.0 {
439440
return plan_err!("sample value must be non-negative, got {}", value);
440441
}
@@ -453,40 +454,6 @@ impl RelationPlanner for TableSamplePlanner {
453454
}
454455
}
455456

456-
/// Parse a SQL expression as a numeric value (supports basic arithmetic).
457-
fn parse_quantity<T>(expr: &ast::Expr) -> Result<T>
458-
where
459-
T: FromStr + Add<Output = T> + Sub<Output = T> + Mul<Output = T> + Div<Output = T>,
460-
{
461-
eval_numeric_expr(expr)
462-
.ok_or_else(|| plan_datafusion_err!("invalid numeric expression: {:?}", expr))
463-
}
464-
465-
/// Recursively evaluate numeric SQL expressions.
466-
fn eval_numeric_expr<T>(expr: &ast::Expr) -> Option<T>
467-
where
468-
T: FromStr + Add<Output = T> + Sub<Output = T> + Mul<Output = T> + Div<Output = T>,
469-
{
470-
match expr {
471-
ast::Expr::Value(v) => match &v.value {
472-
ast::Value::Number(n, _) => n.to_string().parse().ok(),
473-
_ => None,
474-
},
475-
ast::Expr::BinaryOp { left, op, right } => {
476-
let l = eval_numeric_expr::<T>(left)?;
477-
let r = eval_numeric_expr::<T>(right)?;
478-
match op {
479-
ast::BinaryOperator::Plus => Some(l + r),
480-
ast::BinaryOperator::Minus => Some(l - r),
481-
ast::BinaryOperator::Multiply => Some(l * r),
482-
ast::BinaryOperator::Divide => Some(l / r),
483-
_ => None,
484-
}
485-
}
486-
_ => None,
487-
}
488-
}
489-
490457
/// Custom logical plan node representing a TABLESAMPLE operation.
491458
///
492459
/// Stores sampling parameters (bounds, seed) and wraps the input plan.

datafusion/common/src/config.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use arrow_ipc::CompressionType;
2323
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
2424
use crate::error::_config_err;
2525
use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
26+
use crate::parquet_config::DFParquetWriterVersion;
2627
use crate::parsers::CompressionTypeVariant;
2728
use crate::utils::get_available_parallelism;
2829
use crate::{DataFusionError, Result};
@@ -742,7 +743,7 @@ config_namespace! {
742743

743744
/// (writing) Sets parquet writer version
744745
/// valid values are "1.0" and "2.0"
745-
pub writer_version: String, default = "1.0".to_string()
746+
pub writer_version: DFParquetWriterVersion, default = DFParquetWriterVersion::default()
746747

747748
/// (writing) Skip encoding the embedded arrow metadata in the KV_meta
748749
///
@@ -3455,4 +3456,37 @@ mod tests {
34553456
let parsed_metadata = table_config.parquet.key_value_metadata;
34563457
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
34573458
}
3459+
#[cfg(feature = "parquet")]
3460+
#[test]
3461+
fn test_parquet_writer_version_validation() {
3462+
use crate::{config::ConfigOptions, parquet_config::DFParquetWriterVersion};
3463+
3464+
let mut config = ConfigOptions::default();
3465+
3466+
// Valid values should work
3467+
config
3468+
.set("datafusion.execution.parquet.writer_version", "1.0")
3469+
.unwrap();
3470+
assert_eq!(
3471+
config.execution.parquet.writer_version,
3472+
DFParquetWriterVersion::V1_0
3473+
);
3474+
3475+
config
3476+
.set("datafusion.execution.parquet.writer_version", "2.0")
3477+
.unwrap();
3478+
assert_eq!(
3479+
config.execution.parquet.writer_version,
3480+
DFParquetWriterVersion::V2_0
3481+
);
3482+
3483+
// Invalid value should error immediately at SET time
3484+
let err = config
3485+
.set("datafusion.execution.parquet.writer_version", "3.0")
3486+
.unwrap_err();
3487+
assert_eq!(
3488+
err.to_string(),
3489+
"Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
3490+
);
3491+
}
34583492
}

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use parquet::{
3333
metadata::KeyValue,
3434
properties::{
3535
DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties,
36-
WriterPropertiesBuilder, WriterVersion,
36+
WriterPropertiesBuilder,
3737
},
3838
},
3939
schema::types::ColumnPath,
@@ -214,7 +214,7 @@ impl ParquetOptions {
214214
let mut builder = WriterProperties::builder()
215215
.set_data_page_size_limit(*data_pagesize_limit)
216216
.set_write_batch_size(*write_batch_size)
217-
.set_writer_version(parse_version_string(writer_version.as_str())?)
217+
.set_writer_version((*writer_version).into())
218218
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
219219
.set_statistics_enabled(
220220
statistics_enabled
@@ -373,18 +373,6 @@ pub fn parse_compression_string(
373373
}
374374
}
375375

376-
pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
377-
let str_setting_lower: &str = &str_setting.to_lowercase();
378-
match str_setting_lower {
379-
"1.0" => Ok(WriterVersion::PARQUET_1_0),
380-
"2.0" => Ok(WriterVersion::PARQUET_2_0),
381-
_ => Err(DataFusionError::Configuration(format!(
382-
"Unknown or unsupported parquet writer version {str_setting} \
383-
valid options are 1.0 and 2.0"
384-
))),
385-
}
386-
}
387-
388376
pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
389377
let str_setting_lower: &str = &str_setting.to_lowercase();
390378
match str_setting_lower {
@@ -405,6 +393,7 @@ mod tests {
405393
#[cfg(feature = "parquet_encryption")]
406394
use crate::config::ConfigFileEncryptionProperties;
407395
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
396+
use crate::parquet_config::DFParquetWriterVersion;
408397
use parquet::basic::Compression;
409398
use parquet::file::properties::{
410399
BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV,
@@ -431,16 +420,17 @@ mod tests {
431420

432421
fn parquet_options_with_non_defaults() -> ParquetOptions {
433422
let defaults = ParquetOptions::default();
434-
let writer_version = if defaults.writer_version.eq("1.0") {
435-
"2.0"
423+
let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0)
424+
{
425+
DFParquetWriterVersion::V2_0
436426
} else {
437-
"1.0"
427+
DFParquetWriterVersion::V1_0
438428
};
439429

440430
ParquetOptions {
441431
data_pagesize_limit: 42,
442432
write_batch_size: 42,
443-
writer_version: writer_version.into(),
433+
writer_version,
444434
compression: Some("zstd(22)".into()),
445435
dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
446436
dictionary_page_size_limit: 42,
@@ -548,7 +538,7 @@ mod tests {
548538
// global options
549539
data_pagesize_limit: props.dictionary_page_size_limit(),
550540
write_batch_size: props.write_batch_size(),
551-
writer_version: format!("{}.0", props.writer_version().as_num()),
541+
writer_version: props.writer_version().into(),
552542
dictionary_page_size_limit: props.dictionary_page_size_limit(),
553543
max_row_group_size: props.max_row_group_size(),
554544
created_by: props.created_by().to_string(),

0 commit comments

Comments
 (0)