Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
930608a
fix check license header
xudong963 Aug 21, 2025
66ae588
fix cargo check: cargo check --profile ci --workspace --all-targets -…
xudong963 Aug 21, 2025
292641c
fix cargo example
xudong963 Aug 21, 2025
25058de
fix cargo check --profile ci --no-default-features -p datafusion-proto
xudong963 Aug 22, 2025
c46f7a9
fix cargo doc
xudong963 Aug 22, 2025
deaf2e2
fix ut:custom_sources_cases::statistics::sql_limit(with_node_id of Co…
xudong963 Aug 22, 2025
f1b1bd8
fix ut: test_no_pushdown_through_aggregates & test_plan_with_order_pr…
xudong963 Aug 22, 2025
7dd5e6e
fix format
xudong963 Aug 22, 2025
2eca4c0
fix roundtrip_test
xudong963 Aug 22, 2025
8baa05d
schema_force_view_types to true
xudong963 Aug 25, 2025
9b2fbbb
use utfview8
xudong963 Aug 25, 2025
63c2ebc
schema_force_view_types to false(try true after df49)
xudong963 Aug 25, 2025
ed718c0
fix page_index_filter_one_col and remove an example of proto
xudong963 Aug 25, 2025
0bb16fa
fix configs.md
xudong963 Aug 25, 2025
09ff8f7
fix clippy
xudong963 Aug 25, 2025
1545f2d
update configs.md
xudong963 Aug 25, 2025
ca5b0fb
fix flaky test limit.test
xudong963 Aug 25, 2025
d8c3e03
Simplify predicates in `PushDownFilter` optimizer rule (#16362)
xudong963 Jun 25, 2025
2099882
Fix intermittent SQL logic test failure in limit.slt by adding ORDER …
kosiew Jun 6, 2025
ff8418c
fix limit.rs
xudong963 Aug 25, 2025
2c7836a
fix tpch q19
xudong963 Aug 25, 2025
9191f39
public GroupValues & new_group_values
xudong963 Aug 25, 2025
d358db4
fix clippy
xudong963 Aug 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/audit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ jobs:
- name: Run audit check
# Ignored until https://github.com/apache/datafusion/issues/15571
# ignored py03 warning until arrow 55 upgrade
run: cargo audit --ignore RUSTSEC-2024-0370 --ignore RUSTSEC-2025-0020
run: cargo audit --ignore RUSTSEC-2024-0370 --ignore RUSTSEC-2025-0020 --ignore RUSTSEC-2025-0047
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 0 additions & 26 deletions datafusion-examples/examples/planner_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,35 +80,9 @@ async fn to_physical_plan_in_one_api_demo(
displayable(physical_plan.as_ref()).indent(false)
);

let traversal = extract_node_ids_from_execution_plan_tree(physical_plan.as_ref());
let expected_traversal = vec![
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
Some(7),
Some(8),
Some(9),
];
assert_eq!(expected_traversal, traversal);
Ok(())
}

fn extract_node_ids_from_execution_plan_tree(
physical_plan: &dyn ExecutionPlan,
) -> Vec<Option<usize>> {
let mut traversed_nodes: Vec<Option<usize>> = vec![];
for child in physical_plan.children() {
let node_ids = extract_node_ids_from_execution_plan_tree(child.as_ref());
traversed_nodes.extend(node_ids);
}
traversed_nodes.push(physical_plan.properties().node_id());
traversed_nodes
}

/// Converts a logical plan into a physical plan by utilizing the analyzer,
/// optimizer, and query planner APIs separately. This flavor gives more
/// control over the planning process.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl SortTest {
/// Sort the input using SortExec and ensure the results are
/// correct according to `Vec::sort` both with and without spilling
async fn run(&self) -> (Vec<Vec<RecordBatch>>, Vec<RecordBatch>) {
let input = Arc::clone(self.input());
let input = self.input.clone();
let first_batch = input
.iter()
.flat_map(|p| p.iter())
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ async fn page_index_filter_one_col() {

// 5.create filter date_string_col == "01/01/09"`;
// Note this test doesn't apply type coercion so the literal must match the actual view type
// xudong: use new_utf8, because schema_force_view_types was changed to false now.
let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8("01/01/09")));
let parquet_exec = get_parquet_exec(&state, filter).await;
let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3603,18 +3603,19 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> {
);

// Apply the function
let result = replace_order_preserving_variants(dist_context)?;
let result = replace_order_preserving_variants(dist_context, false)?;

// Verify the plan was transformed to CoalescePartitionsExec
result
.0
.plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.expect("Expected CoalescePartitionsExec");

// Verify fetch was preserved
assert_eq!(
result.plan.fetch(),
result.0.plan.fetch(),
Some(5),
"Fetch value was not preserved after transformation"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ fn test_no_pushdown_through_aggregates() {
Ok:
- FilterExec: b@1 = bar
- CoalesceBatchesExec: target_batch_size=100
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0])
- CoalesceBatchesExec: target_batch_size=10
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,14 @@ impl ExecutionPlan for DataSinkExec {

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut new_plan = DataSinkExec::new(
Arc::clone(self.input()),
Arc::clone(&self.sink),
self.sort_order.clone(),
);
let new_props = new_plan.cache.clone().with_node_id(_node_id);
let new_props = new_plan.cache.clone().with_node_id(node_id);
new_plan.cache = new_props;
Ok(Some(Arc::new(new_plan)))
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ impl ExecutionPlan for DataSourceExec {

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut new_plan = DataSourceExec::new(self.data_source.clone());
let new_props = new_plan.cache.clone().with_node_id(_node_id);
let mut new_plan = DataSourceExec::new(Arc::clone(&self.data_source));
let new_props = new_plan.cache.clone().with_node_id(node_id);
new_plan.cache = new_props;
Ok(Some(Arc::new(new_plan)))
}
Expand Down
11 changes: 6 additions & 5 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1883,11 +1883,12 @@ impl Expr {
}
}

/// Check if the Expr is literal
pub fn is_literal(&self) -> bool {
match self {
Expr::Literal(_, _) => true,
_ => false,
/// Check if the Expr is literal and get the literal value if it is.
pub fn as_literal(&self) -> Option<&ScalarValue> {
if let Expr::Literal(lit, _) = self {
Some(lit)
} else {
None
}
}
}
Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ pub mod push_down_limit;
pub mod replace_distinct_aggregate;
pub mod scalar_subquery_to_join;
pub mod simplify_expressions;
mod simplify_predicates;
pub mod single_distinct_to_groupby;
pub mod utils;

Expand Down
10 changes: 6 additions & 4 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use datafusion_expr::{

use crate::optimizer::ApplyOrder;
use crate::utils::{has_all_column_refs, is_restrict_null_predicate};
use crate::{simplify_predicates::simplify_predicates, OptimizerConfig, OptimizerRule};
use crate::{simplify_expressions::simplify_predicates, OptimizerConfig, OptimizerRule};

/// Optimizer rule for pushing (moving) filter expressions down in a plan so
/// they are applied as early as possible.
Expand Down Expand Up @@ -783,7 +783,9 @@ impl OptimizerRule for PushDownFilter {
let new_predicates = simplify_predicates(predicate)?;
if old_predicate_len != new_predicates.len() {
let Some(new_predicate) = conjunction(new_predicates) else {
return plan_err!("at least one expression exists");
// new_predicates is empty - remove the filter entirely
// Return the child plan without the filter
return Ok(Transformed::yes(Arc::unwrap_or_clone(filter.input)));
};
filter.predicate = new_predicate;
}
Expand Down Expand Up @@ -2308,7 +2310,7 @@ mod tests {
plan,
@r"
Projection: test.a, test1.d
Cross Join:
Cross Join:
Projection: test.a, test.b, test.c
TableScan: test, full_filters=[test.a = Int32(1)]
Projection: test1.d, test1.e, test1.f
Expand Down Expand Up @@ -2338,7 +2340,7 @@ mod tests {
plan,
@r"
Projection: test.a, test1.a
Cross Join:
Cross Join:
Projection: test.a, test.b, test.c
TableScan: test, full_filters=[test.a = Int32(1)]
Projection: test1.a, test1.b, test1.c
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/simplify_expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod guarantees;
mod inlist_simplifier;
mod regex;
pub mod simplify_exprs;
mod simplify_predicates;
mod unwrap_cast;
mod utils;

Expand All @@ -31,6 +32,7 @@ pub use datafusion_expr::simplify::{SimplifyContext, SimplifyInfo};

pub use expr_simplifier::*;
pub use simplify_exprs::*;
pub use simplify_predicates::simplify_predicates;

// Export for test in datafusion/core/tests/optimizer_integration.rs
pub use guarantees::GuaranteeRewriter;
Loading
Loading