diff --git a/.gitignore b/.gitignore index 8971e72..c251802 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ package-lock.json package.json .DS_Store -.cargo \ No newline at end of file +.cargo +.zed diff --git a/datafusion-federation/Cargo.toml b/datafusion-federation/Cargo.toml index 8844cd2..1b49cdd 100644 --- a/datafusion-federation/Cargo.toml +++ b/datafusion-federation/Cargo.toml @@ -41,3 +41,8 @@ required-features = ["sql"] name = "df-csv-advanced" path = "examples/df-csv-advanced.rs" required-features = ["sql"] + +[[example]] +name = "df-unnest" +path = "examples/df-unnest.rs" +required-features = ["sql"] diff --git a/datafusion-federation/examples/data/unnest.ndjson b/datafusion-federation/examples/data/unnest.ndjson new file mode 100644 index 0000000..f349b94 --- /dev/null +++ b/datafusion-federation/examples/data/unnest.ndjson @@ -0,0 +1,3 @@ +{ "id": 1, "foo": "bar1", "qux": [ 1, 2, 3 ] } +{ "id": 2, "foo": "bar2", "qux": [ 4, 5, 6 ] } +{ "id": 3, "foo": "bar3", "qux": [ 7, 8, 9 ] } diff --git a/datafusion-federation/examples/df-unnest.rs b/datafusion-federation/examples/df-unnest.rs new file mode 100644 index 0000000..a359ac0 --- /dev/null +++ b/datafusion-federation/examples/df-unnest.rs @@ -0,0 +1,119 @@ +mod shared; + +use std::sync::Arc; + +use datafusion::execution::SessionStateDefaults; +use datafusion::prelude::{CsvReadOptions, NdJsonReadOptions}; +use datafusion::{ + execution::{context::SessionContext, session_state::SessionStateBuilder}, + optimizer::Optimizer, +}; + +use datafusion_federation::{ + sql::{MultiSchemaProvider, SQLFederationProvider, SQLSchemaProvider}, + FederatedQueryPlanner, FederationOptimizerRule, +}; + +use shared::{overwrite_default_schema, MockPostgresExecutor, MockSqliteExecutor}; + +const JSON_PATH_POSTGRES: &str = "./examples/data/unnest.ndjson"; +const CSV_PATH_SQLITE: &str = "./examples/data/test.csv"; +const TABLE_NAME_POSTGRES: &str = "arrays_pg"; +const TABLE_NAME_SQLITE: &str = "items_sqlite"; + +#[tokio::main] +async fn main() { + // This example demonstrates DataFusion Federation with cross-database JOIN and unnest operations. + // The query demonstrates federation across two engines with an unnest operation: + // + // ```sql + // SELECT unnest(a.qux) as item_id, i.foo as item_name + // FROM arrays_pg as a + // JOIN items_sqlite as i ON i.bar = unnest(a.qux) + // ``` + // + // - `arrays_pg` is a JSON file with array data (qux column contains [1,2,3] etc.) + // - `items_sqlite` is a CSV file with item data (bar=1,2,3 etc., foo=names) + // - The unnest operation is federated to PostgreSQL + // - The JOIN happens in the main DataFusion engine + // This validates that our Unnest fix enables proper federation of complex queries. + + ///////////////////// + // Remote sqlite DB + ///////////////////// + let sqlite_remote_ctx = Arc::new(SessionContext::new()); + sqlite_remote_ctx + .register_csv(TABLE_NAME_SQLITE, CSV_PATH_SQLITE, CsvReadOptions::new()) + .await + .expect("Register sqlite csv file"); + + let sqlite_known_tables: Vec = [TABLE_NAME_SQLITE].iter().map(|&x| x.into()).collect(); + let sqlite_executor = Arc::new(MockSqliteExecutor::new(sqlite_remote_ctx)); + let sqlite_federation_provider = Arc::new(SQLFederationProvider::new(sqlite_executor)); + let sqlite_schema_provider = Arc::new( + SQLSchemaProvider::new_with_tables(sqlite_federation_provider, sqlite_known_tables) + .await + .expect("Create sqlite schema provider"), + ); + + ///////////////////// + // Remote postgres DB + ///////////////////// + let postgres_remote_ctx = Arc::new(SessionContext::new()); + postgres_remote_ctx + .register_json( + TABLE_NAME_POSTGRES, + JSON_PATH_POSTGRES, + NdJsonReadOptions { + file_extension: ".ndjson", + ..NdJsonReadOptions::default() + }, + ) + .await + .expect("Register postgres json file"); + + let postgres_known_tables: Vec = + [TABLE_NAME_POSTGRES].iter().map(|&x| x.into()).collect(); + let postgres_executor = Arc::new(MockPostgresExecutor::new(postgres_remote_ctx)); + let postgres_federation_provider = Arc::new(SQLFederationProvider::new(postgres_executor)); + let postgres_schema_provider = Arc::new( + SQLSchemaProvider::new_with_tables(postgres_federation_provider, postgres_known_tables) + .await + .expect("Create postgres schema provider"), + ); + + ///////////////////// + // Main (local) DB + ///////////////////// + let mut rules = Optimizer::new().rules; + rules.push(Arc::new(FederationOptimizerRule::new())); + let state = SessionStateBuilder::new() + .with_optimizer_rules(rules) + .with_query_planner(Arc::new(FederatedQueryPlanner::new())) + .build(); + + let schema_provider = + MultiSchemaProvider::new(vec![sqlite_schema_provider, postgres_schema_provider]); + overwrite_default_schema(&state, Arc::new(schema_provider)) + .expect("Overwrite the default schema"); + + let ctx = SessionContext::new_with_state(state); + SessionStateDefaults::register_builtin_functions(&mut ctx.state_ref().write()); + + // Run a federated query with unnest and join + // First unnest the arrays, then join with the items table + let query = r#" + SELECT unnest(a.qux) as item_value + , a.foo as array_name + , i.foo as item_name + FROM arrays_pg as a JOIN items_sqlite as i ON a.id = i.bar + "#; + let df = ctx + .sql(query) + .await + .expect("Create a dataframe from federated sql query"); + + df.show() + .await + .expect("Execute the federated dataframe with unnest"); +} diff --git a/datafusion-federation/src/optimizer/mod.rs b/datafusion-federation/src/optimizer/mod.rs index 9c16dfc..3848904 100644 --- a/datafusion-federation/src/optimizer/mod.rs +++ b/datafusion-federation/src/optimizer/mod.rs @@ -167,10 +167,14 @@ impl FederationOptimizerRule { return Ok((None, ScanResult::None)); } + // Unnest LogicalPlans should not be included in federation, otherwise an error will occur. + // By flagging inputs as root, they can be federated individually. + let must_be_root = matches!(plan, LogicalPlan::Unnest(_)); + // Recursively optimize inputs let input_results = inputs .iter() - .map(|i| self.optimize_plan_recursively(i, false, _config)) + .map(|i| self.optimize_plan_recursively(i, must_be_root, _config)) .collect::>>()?; // Aggregate the input providers @@ -178,7 +182,8 @@ impl FederationOptimizerRule { sole_provider.merge(scan_result.clone()); }); - if sole_provider.is_none() { + // If the provider must be root, it's ambiguous, allow federation of inputs + if sole_provider.is_none() && !must_be_root { // No providers found // TODO: Is/should this be reachable? return Ok((None, ScanResult::None)); @@ -249,7 +254,13 @@ impl FederationOptimizerRule { }; // Construct the optimized plan - let new_plan = plan.with_new_exprs(new_expressions, new_inputs)?; + let new_plan = match plan { + LogicalPlan::Unnest(_) => { + // Unnest doesn't accept expressions in with_new_exprs despite returning them + plan.with_new_exprs(vec![], new_inputs)? + } + _ => plan.with_new_exprs(new_expressions, new_inputs)?, + }; // Return the federated plan Ok((Some(new_plan), ScanResult::Ambiguous)) @@ -366,3 +377,155 @@ pub fn get_table_source( // Return original FederatedTableSource Ok(Some(Arc::clone(&wrapper.source))) } + +#[cfg(all(test, feature = "sql"))] +mod tests { + use super::*; + use crate::sql::{ + RemoteTable, RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource, + }; + use async_trait::async_trait; + use datafusion::{ + arrow::datatypes::{DataType, Field, Schema, SchemaRef}, + common::{DFSchema, UnnestOptions}, + datasource::{DefaultTableSource, TableProvider}, + execution::SendableRecordBatchStream, + logical_expr::{LogicalPlanBuilder, Unnest}, + optimizer::OptimizerContext, + prelude::*, + sql::unparser::{self, dialect::Dialect}, + }; + use std::sync::Arc; + + #[derive(Debug, Clone)] + struct TestExecutor { + compute_context: String, + } + + #[async_trait] + impl SQLExecutor for TestExecutor { + fn name(&self) -> &str { + "TestExecutor" + } + + fn compute_context(&self) -> Option { + Some(self.compute_context.clone()) + } + + fn dialect(&self) -> Arc { + Arc::new(unparser::dialect::DefaultDialect {}) + } + + fn execute( + &self, + _query: &str, + _schema: SchemaRef, + ) -> datafusion::error::Result { + unimplemented!() + } + + async fn table_names(&self) -> datafusion::error::Result> { + unimplemented!() + } + + async fn get_table_schema( + &self, + _table_name: &str, + ) -> datafusion::error::Result { + unimplemented!() + } + } + + fn get_federated_table_provider() -> Arc { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "array_col", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + false, + ), + ])); + let table_ref = RemoteTableRef::try_from("remote_table".to_string()).unwrap(); + let table = Arc::new(RemoteTable::new(table_ref, schema)); + let executor = TestExecutor { + compute_context: "test".to_string(), + }; + let provider = Arc::new(SQLFederationProvider::new(Arc::new(executor))); + let table_source = Arc::new(SQLTableSource { provider, table }); + Arc::new(FederatedTableProviderAdaptor::new(table_source)) + } + + #[test] + fn test_federation_optimizer_rule_handles_unnest() { + // Test that FederationOptimizerRule::rewrite can handle plans containing Unnest + // This verifies the fix for the Unnest with_new_exprs issue + + // Create a federated table provider that will trigger transformation + let federated_provider = get_federated_table_provider(); + let table_source = Arc::new(DefaultTableSource::new(federated_provider)); + + // Create a table scan + let table_scan = LogicalPlanBuilder::scan("test_table", table_source, None) + .unwrap() + .build() + .unwrap(); + + // Create a DFSchema for the Unnest + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "array_col", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + false, + ), + ])); + let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap(); + + // Create an Unnest plan on top of the table scan + let unnest_plan = LogicalPlan::Unnest(Unnest { + input: Arc::new(table_scan), + exec_columns: vec![Column::from_name("array_col")], + list_type_columns: vec![( + 0, + datafusion::logical_expr::ColumnUnnestList { + output_column: Column::from_name("array_col"), + depth: 1, + }, + )], + struct_type_columns: vec![], + dependency_indices: vec![], + schema: Arc::new(df_schema), + options: UnnestOptions::default(), + }); + + // Test the FederationOptimizerRule + let optimizer_rule = FederationOptimizerRule::new(); + let config = OptimizerContext::new(); + + // This should not panic or fail due to the Unnest with_new_exprs issue + let result = optimizer_rule.rewrite(unnest_plan, &config); + + // The rewrite should succeed (whether it transforms or not depends on federation setup) + assert!( + result.is_ok(), + "FederationOptimizerRule should handle Unnest plans without error" + ); + + // Verify we get a transformed result back + let transformed = result.unwrap(); + + // The key assertion: the plan should be transformed (Transformed::yes) + // This proves our fix is working - before the fix, this would panic during with_new_exprs + assert!( + transformed.transformed, + "Plan should be transformed (Transformed::yes) - this validates the Unnest fix + " + ); + + // Verify the transformed plan has a valid schema + assert!( + transformed.data.schema().fields().len() > 0, + "Result should have a valid schema" + ); + } +}