Skip to content

Commit 08b9a3c

Browse files
Merge branch 'main' into perf-octet-length
2 parents 82517db + 955fd41 commit 08b9a3c

File tree

28 files changed

+824
-317
lines changed

28 files changed

+824
-317
lines changed

.github/workflows/audit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ jobs:
4242
steps:
4343
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
4444
- name: Install cargo-audit
45-
uses: taiki-e/install-action@e0db384ad69f5ba2c6dd0129d8934e0d0ba465c1 # v2.65.10
45+
uses: taiki-e/install-action@a983ca795126a4be4e8a44879ac5c4c3ef66cae1 # v2.65.11
4646
with:
4747
tool: cargo-audit
4848
- name: Run audit check

.github/workflows/rust.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ jobs:
421421
sudo apt-get update -qq
422422
sudo apt-get install -y -qq clang
423423
- name: Setup wasm-pack
424-
uses: taiki-e/install-action@e0db384ad69f5ba2c6dd0129d8934e0d0ba465c1 # v2.65.10
424+
uses: taiki-e/install-action@a983ca795126a4be4e8a44879ac5c4c3ef66cae1 # v2.65.11
425425
with:
426426
tool: wasm-pack
427427
- name: Run tests with headless mode
@@ -741,7 +741,7 @@ jobs:
741741
- name: Setup Rust toolchain
742742
uses: ./.github/actions/setup-builder
743743
- name: Install cargo-msrv
744-
uses: taiki-e/install-action@e0db384ad69f5ba2c6dd0129d8934e0d0ba465c1 # v2.65.10
744+
uses: taiki-e/install-action@a983ca795126a4be4e8a44879ac5c4c3ef66cae1 # v2.65.11
745745
with:
746746
tool: cargo-msrv
747747

Cargo.lock

Lines changed: 24 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,7 @@ sqlparser = { version = "0.59.0", default-features = false, features = ["std", "
185185
strum = "0.27.2"
186186
strum_macros = "0.27.2"
187187
tempfile = "3"
188-
testcontainers = { version = "0.25.2", features = ["default"] }
189-
testcontainers-modules = { version = "0.13" }
188+
testcontainers-modules = { version = "0.14" }
190189
tokio = { version = "1.48", features = ["macros", "rt", "sync"] }
191190
url = "2.5.7"
192191
zstd = { version = "0.13", default-features = false }

datafusion-cli/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,4 @@ ctor = { workspace = true }
7474
insta = { workspace = true }
7575
insta-cmd = "0.6.0"
7676
rstest = { workspace = true }
77-
testcontainers = { workspace = true }
7877
testcontainers-modules = { workspace = true, features = ["minio"] }

datafusion-cli/tests/cli_integration.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ use insta::{Settings, glob};
2424
use insta_cmd::{assert_cmd_snapshot, get_cargo_bin};
2525
use std::path::PathBuf;
2626
use std::{env, fs};
27-
use testcontainers::core::{CmdWaitFor, ExecCommand, Mount};
28-
use testcontainers::runners::AsyncRunner;
29-
use testcontainers::{ContainerAsync, ImageExt, TestcontainersError};
3027
use testcontainers_modules::minio;
28+
use testcontainers_modules::testcontainers::core::{CmdWaitFor, ExecCommand, Mount};
29+
use testcontainers_modules::testcontainers::runners::AsyncRunner;
30+
use testcontainers_modules::testcontainers::{
31+
ContainerAsync, ImageExt, TestcontainersError,
32+
};
3133

3234
fn cli() -> Command {
3335
Command::new(get_cargo_bin("datafusion-cli"))

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 6 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,12 @@ use std::{
8383
any::Any,
8484
fmt::{self, Debug, Formatter},
8585
hash::{Hash, Hasher},
86-
ops::{Add, Div, Mul, Sub},
8786
pin::Pin,
88-
str::FromStr,
8987
sync::Arc,
9088
task::{Context, Poll},
9189
};
9290

91+
use arrow::datatypes::{Float64Type, Int64Type};
9392
use arrow::{
9493
array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
9594
compute,
@@ -102,6 +101,7 @@ use futures::{
102101
use rand::{Rng, SeedableRng, rngs::StdRng};
103102
use tonic::async_trait;
104103

104+
use datafusion::optimizer::simplify_expressions::simplify_literal::parse_literal;
105105
use datafusion::{
106106
execution::{
107107
RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder,
@@ -410,11 +410,12 @@ impl RelationPlanner for TableSamplePlanner {
410410
"TABLESAMPLE requires a quantity (percentage, fraction, or row count)"
411411
);
412412
};
413+
let quantity_value_expr = context.sql_to_expr(quantity.value, input.schema())?;
413414

414415
match quantity.unit {
415416
// TABLESAMPLE (N ROWS) - exact row limit
416417
Some(TableSampleUnit::Rows) => {
417-
let rows = parse_quantity::<i64>(&quantity.value)?;
418+
let rows: i64 = parse_literal::<Int64Type>(&quantity_value_expr)?;
418419
if rows < 0 {
419420
return plan_err!("row count must be non-negative, got {}", rows);
420421
}
@@ -426,15 +427,15 @@ impl RelationPlanner for TableSamplePlanner {
426427

427428
// TABLESAMPLE (N PERCENT) - percentage sampling
428429
Some(TableSampleUnit::Percent) => {
429-
let percent = parse_quantity::<f64>(&quantity.value)?;
430+
let percent: f64 = parse_literal::<Float64Type>(&quantity_value_expr)?;
430431
let fraction = percent / 100.0;
431432
let plan = TableSamplePlanNode::new(input, fraction, seed).into_plan();
432433
Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
433434
}
434435

435436
// TABLESAMPLE (N) - fraction if <1.0, row limit if >=1.0
436437
None => {
437-
let value = parse_quantity::<f64>(&quantity.value)?;
438+
let value = parse_literal::<Float64Type>(&quantity_value_expr)?;
438439
if value < 0.0 {
439440
return plan_err!("sample value must be non-negative, got {}", value);
440441
}
@@ -453,40 +454,6 @@ impl RelationPlanner for TableSamplePlanner {
453454
}
454455
}
455456

456-
/// Parse a SQL expression as a numeric value (supports basic arithmetic).
457-
fn parse_quantity<T>(expr: &ast::Expr) -> Result<T>
458-
where
459-
T: FromStr + Add<Output = T> + Sub<Output = T> + Mul<Output = T> + Div<Output = T>,
460-
{
461-
eval_numeric_expr(expr)
462-
.ok_or_else(|| plan_datafusion_err!("invalid numeric expression: {:?}", expr))
463-
}
464-
465-
/// Recursively evaluate numeric SQL expressions.
466-
fn eval_numeric_expr<T>(expr: &ast::Expr) -> Option<T>
467-
where
468-
T: FromStr + Add<Output = T> + Sub<Output = T> + Mul<Output = T> + Div<Output = T>,
469-
{
470-
match expr {
471-
ast::Expr::Value(v) => match &v.value {
472-
ast::Value::Number(n, _) => n.to_string().parse().ok(),
473-
_ => None,
474-
},
475-
ast::Expr::BinaryOp { left, op, right } => {
476-
let l = eval_numeric_expr::<T>(left)?;
477-
let r = eval_numeric_expr::<T>(right)?;
478-
match op {
479-
ast::BinaryOperator::Plus => Some(l + r),
480-
ast::BinaryOperator::Minus => Some(l - r),
481-
ast::BinaryOperator::Multiply => Some(l * r),
482-
ast::BinaryOperator::Divide => Some(l / r),
483-
_ => None,
484-
}
485-
}
486-
_ => None,
487-
}
488-
}
489-
490457
/// Custom logical plan node representing a TABLESAMPLE operation.
491458
///
492459
/// Stores sampling parameters (bounds, seed) and wraps the input plan.

datafusion/core/src/dataframe/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ impl DataFrameWriteOptions {
107107
}
108108

109109
/// Set the single_file_output value to true or false
110+
///
111+
/// When set to true, an output file will always be created even if the DataFrame is empty
110112
pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
111113
self.single_file_output = single_file_output;
112114
self

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,96 @@
1717

1818
//! Re-exports the [`datafusion_datasource_arrow::file_format`] module, and contains tests for it.
1919
pub use datafusion_datasource_arrow::file_format::*;
20+
21+
#[cfg(test)]
22+
mod tests {
23+
use futures::StreamExt;
24+
use std::sync::Arc;
25+
26+
use arrow::array::{Int64Array, StringArray};
27+
use arrow::datatypes::{DataType, Field, Schema};
28+
use arrow::record_batch::RecordBatch;
29+
use datafusion_common::Result;
30+
31+
use crate::execution::options::ArrowReadOptions;
32+
use crate::prelude::SessionContext;
33+
34+
#[tokio::test]
35+
async fn test_write_empty_arrow_from_sql() -> Result<()> {
36+
let ctx = SessionContext::new();
37+
38+
let tmp_dir = tempfile::TempDir::new()?;
39+
let path = format!("{}/empty_sql.arrow", tmp_dir.path().to_string_lossy());
40+
41+
ctx.sql(&format!(
42+
"COPY (SELECT CAST(1 AS BIGINT) AS id LIMIT 0) TO '{path}' STORED AS ARROW",
43+
))
44+
.await?
45+
.collect()
46+
.await?;
47+
48+
assert!(std::path::Path::new(&path).exists());
49+
50+
let read_df = ctx.read_arrow(&path, ArrowReadOptions::default()).await?;
51+
let stream = read_df.execute_stream().await?;
52+
53+
assert_eq!(stream.schema().fields().len(), 1);
54+
assert_eq!(stream.schema().field(0).name(), "id");
55+
56+
let results: Vec<_> = stream.collect().await;
57+
let total_rows: usize = results
58+
.iter()
59+
.filter_map(|r| r.as_ref().ok())
60+
.map(|b| b.num_rows())
61+
.sum();
62+
assert_eq!(total_rows, 0);
63+
64+
Ok(())
65+
}
66+
67+
#[tokio::test]
68+
async fn test_write_empty_arrow_from_record_batch() -> Result<()> {
69+
let ctx = SessionContext::new();
70+
71+
let schema = Arc::new(Schema::new(vec![
72+
Field::new("id", DataType::Int64, false),
73+
Field::new("name", DataType::Utf8, true),
74+
]));
75+
let empty_batch = RecordBatch::try_new(
76+
schema.clone(),
77+
vec![
78+
Arc::new(Int64Array::from(Vec::<i64>::new())),
79+
Arc::new(StringArray::from(Vec::<Option<&str>>::new())),
80+
],
81+
)?;
82+
83+
let tmp_dir = tempfile::TempDir::new()?;
84+
let path = format!("{}/empty_batch.arrow", tmp_dir.path().to_string_lossy());
85+
86+
ctx.register_batch("empty_table", empty_batch)?;
87+
88+
ctx.sql(&format!("COPY empty_table TO '{path}' STORED AS ARROW"))
89+
.await?
90+
.collect()
91+
.await?;
92+
93+
assert!(std::path::Path::new(&path).exists());
94+
95+
let read_df = ctx.read_arrow(&path, ArrowReadOptions::default()).await?;
96+
let stream = read_df.execute_stream().await?;
97+
98+
assert_eq!(stream.schema().fields().len(), 2);
99+
assert_eq!(stream.schema().field(0).name(), "id");
100+
assert_eq!(stream.schema().field(1).name(), "name");
101+
102+
let results: Vec<_> = stream.collect().await;
103+
let total_rows: usize = results
104+
.iter()
105+
.filter_map(|r| r.as_ref().ok())
106+
.map(|b| b.num_rows())
107+
.sum();
108+
assert_eq!(total_rows, 0);
109+
110+
Ok(())
111+
}
112+
}

0 commit comments

Comments
 (0)