Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
package-lock.json
package.json
.DS_Store
.cargo
.cargo
.zed
5 changes: 5 additions & 0 deletions datafusion-federation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,8 @@ required-features = ["sql"]
name = "df-csv-advanced"
path = "examples/df-csv-advanced.rs"
required-features = ["sql"]

[[example]]
name = "df-unnest"
path = "examples/df-unnest.rs"
required-features = ["sql"]
3 changes: 3 additions & 0 deletions datafusion-federation/examples/data/unnest.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "id": 1, "foo": "bar1", "qux": [ 1, 2, 3 ] }
{ "id": 2, "foo": "bar2", "qux": [ 4, 5, 6 ] }
{ "id": 3, "foo": "bar3", "qux": [ 7, 8, 9 ] }
119 changes: 119 additions & 0 deletions datafusion-federation/examples/df-unnest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
mod shared;

use std::sync::Arc;

use datafusion::execution::SessionStateDefaults;
use datafusion::prelude::{CsvReadOptions, NdJsonReadOptions};
use datafusion::{
execution::{context::SessionContext, session_state::SessionStateBuilder},
optimizer::Optimizer,
};

use datafusion_federation::{
sql::{MultiSchemaProvider, SQLFederationProvider, SQLSchemaProvider},
FederatedQueryPlanner, FederationOptimizerRule,
};

use shared::{overwrite_default_schema, MockPostgresExecutor, MockSqliteExecutor};

const JSON_PATH_POSTGRES: &str = "./examples/data/unnest.ndjson";
const CSV_PATH_SQLITE: &str = "./examples/data/test.csv";
const TABLE_NAME_POSTGRES: &str = "arrays_pg";
const TABLE_NAME_SQLITE: &str = "items_sqlite";

#[tokio::main]
async fn main() {
// This example demonstrates DataFusion Federation with cross-database JOIN and unnest operations.
// The query demonstrates federation across two engines with an unnest operation:
//
// ```sql
// SELECT unnest(a.qux) as item_id, i.foo as item_name
// FROM arrays_pg as a
// JOIN items_sqlite as i ON i.bar = unnest(a.qux)
// ```
//
// - `arrays_pg` is a JSON file with array data (qux column contains [1,2,3] etc.)
// - `items_sqlite` is a CSV file with item data (bar=1,2,3 etc., foo=names)
// - The unnest operation is federated to PostgreSQL
// - The JOIN happens in the main DataFusion engine
// This validates that our Unnest fix enables proper federation of complex queries.

/////////////////////
// Remote sqlite DB
/////////////////////
let sqlite_remote_ctx = Arc::new(SessionContext::new());
sqlite_remote_ctx
.register_csv(TABLE_NAME_SQLITE, CSV_PATH_SQLITE, CsvReadOptions::new())
.await
.expect("Register sqlite csv file");

let sqlite_known_tables: Vec<String> = [TABLE_NAME_SQLITE].iter().map(|&x| x.into()).collect();
let sqlite_executor = Arc::new(MockSqliteExecutor::new(sqlite_remote_ctx));
let sqlite_federation_provider = Arc::new(SQLFederationProvider::new(sqlite_executor));
let sqlite_schema_provider = Arc::new(
SQLSchemaProvider::new_with_tables(sqlite_federation_provider, sqlite_known_tables)
.await
.expect("Create sqlite schema provider"),
);

/////////////////////
// Remote postgres DB
/////////////////////
let postgres_remote_ctx = Arc::new(SessionContext::new());
postgres_remote_ctx
.register_json(
TABLE_NAME_POSTGRES,
JSON_PATH_POSTGRES,
NdJsonReadOptions {
file_extension: ".ndjson",
..NdJsonReadOptions::default()
},
)
.await
.expect("Register postgres json file");

let postgres_known_tables: Vec<String> =
[TABLE_NAME_POSTGRES].iter().map(|&x| x.into()).collect();
let postgres_executor = Arc::new(MockPostgresExecutor::new(postgres_remote_ctx));
let postgres_federation_provider = Arc::new(SQLFederationProvider::new(postgres_executor));
let postgres_schema_provider = Arc::new(
SQLSchemaProvider::new_with_tables(postgres_federation_provider, postgres_known_tables)
.await
.expect("Create postgres schema provider"),
);

/////////////////////
// Main (local) DB
/////////////////////
let mut rules = Optimizer::new().rules;
rules.push(Arc::new(FederationOptimizerRule::new()));
let state = SessionStateBuilder::new()
.with_optimizer_rules(rules)
.with_query_planner(Arc::new(FederatedQueryPlanner::new()))
.build();

let schema_provider =
MultiSchemaProvider::new(vec![sqlite_schema_provider, postgres_schema_provider]);
overwrite_default_schema(&state, Arc::new(schema_provider))
.expect("Overwrite the default schema");

let ctx = SessionContext::new_with_state(state);
SessionStateDefaults::register_builtin_functions(&mut ctx.state_ref().write());

// Run a federated query with unnest and join
// First unnest the arrays, then join with the items table
let query = r#"
SELECT unnest(a.qux) as item_value
, a.foo as array_name
, i.foo as item_name
FROM arrays_pg as a JOIN items_sqlite as i ON a.id = i.bar
"#;
let df = ctx
.sql(query)
.await
.expect("Create a dataframe from federated sql query");

df.show()
.await
.expect("Execute the federated dataframe with unnest");
}
169 changes: 166 additions & 3 deletions datafusion-federation/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,23 @@ impl FederationOptimizerRule {
return Ok((None, ScanResult::None));
}

// Unnest LogicalPlans should not be included in federation, otherwise an error will occur.
// By flagging inputs as root, they can be federated individually.
let must_be_root = matches!(plan, LogicalPlan::Unnest(_));

// Recursively optimize inputs
let input_results = inputs
.iter()
.map(|i| self.optimize_plan_recursively(i, false, _config))
.map(|i| self.optimize_plan_recursively(i, must_be_root, _config))
.collect::<Result<Vec<_>>>()?;

// Aggregate the input providers
input_results.iter().for_each(|(_, scan_result)| {
sole_provider.merge(scan_result.clone());
});

if sole_provider.is_none() {
// If the provider must be root, it's ambiguous, allow federation of inputs
if sole_provider.is_none() && !must_be_root {
// No providers found
// TODO: Is/should this be reachable?
return Ok((None, ScanResult::None));
Expand Down Expand Up @@ -249,7 +254,13 @@ impl FederationOptimizerRule {
};

// Construct the optimized plan
let new_plan = plan.with_new_exprs(new_expressions, new_inputs)?;
let new_plan = match plan {
LogicalPlan::Unnest(_) => {
// Unnest doesn't accept expressions in with_new_exprs despite returning them
plan.with_new_exprs(vec![], new_inputs)?
}
_ => plan.with_new_exprs(new_expressions, new_inputs)?,
};

// Return the federated plan
Ok((Some(new_plan), ScanResult::Ambiguous))
Expand Down Expand Up @@ -366,3 +377,155 @@ pub fn get_table_source(
// Return original FederatedTableSource
Ok(Some(Arc::clone(&wrapper.source)))
}

#[cfg(all(test, feature = "sql"))]
mod tests {
use super::*;
use crate::sql::{
RemoteTable, RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
};
use async_trait::async_trait;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema, SchemaRef},
common::{DFSchema, UnnestOptions},
datasource::{DefaultTableSource, TableProvider},
execution::SendableRecordBatchStream,
logical_expr::{LogicalPlanBuilder, Unnest},
optimizer::OptimizerContext,
prelude::*,
sql::unparser::{self, dialect::Dialect},
};
use std::sync::Arc;

#[derive(Debug, Clone)]
struct TestExecutor {
compute_context: String,
}

#[async_trait]
impl SQLExecutor for TestExecutor {
fn name(&self) -> &str {
"TestExecutor"
}

fn compute_context(&self) -> Option<String> {
Some(self.compute_context.clone())
}

fn dialect(&self) -> Arc<dyn Dialect> {
Arc::new(unparser::dialect::DefaultDialect {})
}

fn execute(
&self,
_query: &str,
_schema: SchemaRef,
) -> datafusion::error::Result<SendableRecordBatchStream> {
unimplemented!()
}

async fn table_names(&self) -> datafusion::error::Result<Vec<String>> {
unimplemented!()
}

async fn get_table_schema(
&self,
_table_name: &str,
) -> datafusion::error::Result<SchemaRef> {
unimplemented!()
}
}

fn get_federated_table_provider() -> Arc<dyn TableProvider> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"array_col",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
false,
),
]));
let table_ref = RemoteTableRef::try_from("remote_table".to_string()).unwrap();
let table = Arc::new(RemoteTable::new(table_ref, schema));
let executor = TestExecutor {
compute_context: "test".to_string(),
};
let provider = Arc::new(SQLFederationProvider::new(Arc::new(executor)));
let table_source = Arc::new(SQLTableSource { provider, table });
Arc::new(FederatedTableProviderAdaptor::new(table_source))
}

#[test]
fn test_federation_optimizer_rule_handles_unnest() {
// Test that FederationOptimizerRule::rewrite can handle plans containing Unnest
// This verifies the fix for the Unnest with_new_exprs issue

// Create a federated table provider that will trigger transformation
let federated_provider = get_federated_table_provider();
let table_source = Arc::new(DefaultTableSource::new(federated_provider));

// Create a table scan
let table_scan = LogicalPlanBuilder::scan("test_table", table_source, None)
.unwrap()
.build()
.unwrap();

// Create a DFSchema for the Unnest
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"array_col",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
false,
),
]));
let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap();

// Create an Unnest plan on top of the table scan
let unnest_plan = LogicalPlan::Unnest(Unnest {
input: Arc::new(table_scan),
exec_columns: vec![Column::from_name("array_col")],
list_type_columns: vec![(
0,
datafusion::logical_expr::ColumnUnnestList {
output_column: Column::from_name("array_col"),
depth: 1,
},
)],
struct_type_columns: vec![],
dependency_indices: vec![],
schema: Arc::new(df_schema),
options: UnnestOptions::default(),
});

// Test the FederationOptimizerRule
let optimizer_rule = FederationOptimizerRule::new();
let config = OptimizerContext::new();

// This should not panic or fail due to the Unnest with_new_exprs issue
let result = optimizer_rule.rewrite(unnest_plan, &config);

// The rewrite should succeed (whether it transforms or not depends on federation setup)
assert!(
result.is_ok(),
"FederationOptimizerRule should handle Unnest plans without error"
);

// Verify we get a transformed result back
let transformed = result.unwrap();

// The key assertion: the plan should be transformed (Transformed::yes)
// This proves our fix is working - before the fix, this would panic during with_new_exprs
assert!(
transformed.transformed,
"Plan should be transformed (Transformed::yes) - this validates the Unnest fix
"
);

// Verify the transformed plan has a valid schema
assert!(
transformed.data.schema().fields().len() > 0,
"Result should have a valid schema"
);
}
}