diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs index 832110e11131c..d226509f7ac42 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs @@ -31,7 +31,7 @@ use datafusion::logical_expr::{ use std::sync::Arc; use substrait::proto::expression::MaskExpression; use substrait::proto::read_rel::ReadType; -use substrait::proto::read_rel::local_files::file_or_files::PathType::UriFile; +use substrait::proto::read_rel::local_files::file_or_files::PathType; use substrait::proto::{Expression, ReadRel}; use url::Url; @@ -176,45 +176,100 @@ pub async fn from_read_rel( })) } Some(ReadType::LocalFiles(lf)) => { - fn extract_filename(name: &str) -> Option { - let corrected_url = - if name.starts_with("file://") && !name.starts_with("file:///") { - name.replacen("file://", "file:///", 1) - } else { - name.to_string() - }; - - Url::parse(&corrected_url).ok().and_then(|url| { - let path = url.path(); - std::path::Path::new(path) - .file_name() - .map(|filename| filename.to_string_lossy().to_string()) - }) + /// Extracts the path string from a PathType variant. + /// Normalizes file:// URLs to file:/// format for proper URL parsing. + fn extract_path(path_type: Option<&PathType>) -> Option { + let path_str = match path_type? { + PathType::UriPath(p) => p.clone(), + PathType::UriPathGlob(p) => p.clone(), + PathType::UriFile(p) => p.clone(), + PathType::UriFolder(p) => p.clone(), + }; + + // Normalize file:// to file:/// for proper URL parsing + let normalized = if path_str.starts_with("file://") + && !path_str.starts_with("file:///") + { + path_str.replacen("file://", "file:///", 1) + } else { + path_str + }; + + // Parse URL and extract the file system path component + Url::parse(&normalized) + .ok() + .map(|url| url.path().to_string()) } - // we could use the file name to check the original table provider - // TODO: currently does not support multiple local files - let filename: Option = - lf.items.first().and_then(|x| match x.path_type.as_ref() { - Some(UriFile(name)) => extract_filename(name), - _ => None, - }); + // Collect all file paths from LocalFiles items + let paths: Vec = lf + .items + .iter() + .filter_map(|item| extract_path(item.path_type.as_ref())) + .collect(); - if lf.items.len() > 1 || filename.is_none() { - return not_impl_err!("Only single file reads are supported"); + if paths.is_empty() { + return plan_err!("No valid file paths found in LocalFiles"); } - let name = filename.unwrap(); - // directly use unwrap here since we could determine it is a valid one - let table_reference = TableReference::Bare { table: name.into() }; - read_with_schema( - consumer, + // Generate a table name from the first file path + let table_name = std::path::Path::new(&paths[0]) + .file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| "local_files".to_string()); + + let table_reference = TableReference::Bare { + table: table_name.clone().into(), + }; + + // Try to resolve files using the new trait method. + // If not implemented, fall back to the legacy single-file behavior + // for backward compatibility. + let provider = match consumer + .resolve_local_files(&paths, &substrait_schema) + .await + { + Ok(provider) => provider, + Err(_) => { + // Fallback: use the filename to look up a pre-registered table + // This maintains backward compatibility with the original behavior + match consumer.resolve_table_ref(&table_reference).await? { + Some(provider) => provider, + None => { + return plan_err!( + "No table named '{}' found. \ + To support LocalFiles with multiple files or custom file resolution, \ + implement the resolve_local_files method on your SubstraitConsumer.", + table_name + ); + } + } + } + }; + + // Build the scan plan inline + let schema = substrait_schema.replace_qualifier(table_reference.clone()); + + let filters = if let Some(f) = &read.filter { + let filter_expr = consumer.consume_expression(f, &schema).await?; + split_conjunction_owned(filter_expr) + } else { + vec![] + }; + + let plan = LogicalPlanBuilder::scan_with_filters( table_reference, - substrait_schema, - &read.projection, - &read.filter, - ) - .await + provider_as_source(provider), + None, + filters, + )? + .build()?; + + ensure_schema_compatibility(plan.schema(), schema.clone())?; + + let schema = apply_masking(schema, &read.projection)?; + + apply_projection(plan, schema) } _ => { not_impl_err!("Unsupported Readtype: {:?}", read.read_type) diff --git a/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs b/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs index 4c19227a30c75..5a256de930cdf 100644 --- a/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs @@ -156,6 +156,26 @@ pub trait SubstraitConsumer: Send + Sync + Sized { table_ref: &TableReference, ) -> datafusion::common::Result>>; + /// Resolves a list of local file paths to a TableProvider. + /// + /// Override this method to customize how file paths from Substrait LocalFiles + /// are resolved to table providers. The default implementation returns an error + /// since resolving arbitrary file paths requires access to an object store or + /// file system which may not be available in all contexts. + /// + /// # Arguments + /// * `paths` - List of file paths (URIs) from Substrait LocalFiles + /// * `schema` - The expected schema of the files + async fn resolve_local_files( + &self, + _paths: &[String], + _schema: &DFSchema, + ) -> datafusion::common::Result> { + not_impl_err!( + "resolve_local_files is not implemented. Override this method to support LocalFiles read types." + ) + } + // TODO: Remove these two methods // Ideally, the abstract consumer should not place any constraints on implementations. // The functionality for which the Extensions and FunctionRegistry is needed should be abstracted diff --git a/datafusion/substrait/tests/cases/local_files_tests.rs b/datafusion/substrait/tests/cases/local_files_tests.rs new file mode 100644 index 0000000000000..316501fa64d31 --- /dev/null +++ b/datafusion/substrait/tests/cases/local_files_tests.rs @@ -0,0 +1,363 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for LocalFiles support in the logical plan consumer + +use async_trait::async_trait; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::catalog::TableProvider; +use datafusion::common::{DFSchema, TableReference}; +use datafusion::datasource::MemTable; +use datafusion::error::Result; +use datafusion::execution::{FunctionRegistry, SessionState}; +use datafusion::prelude::SessionContext; +use datafusion_substrait::extensions::Extensions; +use datafusion_substrait::logical_plan::consumer::{ + SubstraitConsumer, from_substrait_plan, +}; +use std::sync::Arc; +use substrait::proto::read_rel::local_files::FileOrFiles; +use substrait::proto::read_rel::local_files::file_or_files::PathType; +use substrait::proto::read_rel::{LocalFiles, ReadType}; +use substrait::proto::rel::RelType; +use substrait::proto::{NamedStruct, Plan, PlanRel, ReadRel, Rel, Type, plan_rel}; + +/// Helper to create a simple schema for testing +fn create_test_schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + ]) +} + +/// Helper to create a NamedStruct from a Schema +fn schema_to_named_struct(schema: &Schema) -> NamedStruct { + let names: Vec = schema.fields().iter().map(|f| f.name().clone()).collect(); + + let types: Vec = + vec![substrait::proto::r#type::Struct { + types: schema + .fields() + .iter() + .map(|f| { + let kind = match f.data_type() { + DataType::Int64 => substrait::proto::r#type::Kind::I64( + substrait::proto::r#type::I64 { + nullability: if f.is_nullable() { 1 } else { 2 }, + type_variation_reference: 0, + }, + ), + DataType::Utf8 => substrait::proto::r#type::Kind::String( + substrait::proto::r#type::String { + nullability: if f.is_nullable() { 1 } else { 2 }, + type_variation_reference: 0, + }, + ), + _ => unimplemented!("Only Int64 and Utf8 supported in test"), + }; + Type { kind: Some(kind) } + }) + .collect(), + nullability: 0, + type_variation_reference: 0, + }]; + + NamedStruct { + names, + r#struct: Some(substrait::proto::r#type::Struct { + types: types[0].types.clone(), + nullability: 0, + type_variation_reference: 0, + }), + } +} + +/// Creates a Substrait Plan with LocalFiles read type +fn create_local_files_plan(file_paths: Vec<(PathType, String)>) -> Plan { + let schema = create_test_schema(); + let named_struct = schema_to_named_struct(&schema); + + let items: Vec = file_paths + .into_iter() + .enumerate() + .map(|(idx, (path_type, path))| { + let pt = match path_type { + PathType::UriPath(_) => PathType::UriPath(path), + PathType::UriPathGlob(_) => PathType::UriPathGlob(path), + PathType::UriFile(_) => PathType::UriFile(path), + PathType::UriFolder(_) => PathType::UriFolder(path), + }; + FileOrFiles { + path_type: Some(pt), + partition_index: idx as u64, + start: 0, + length: 0, + file_format: None, + } + }) + .collect(); + + let read_rel = ReadRel { + common: None, + base_schema: Some(named_struct), + filter: None, + best_effort_filter: None, + projection: None, + advanced_extension: None, + read_type: Some(ReadType::LocalFiles(LocalFiles { + items, + advanced_extension: None, + })), + }; + + #[allow(deprecated)] + Plan { + version: None, + extension_uris: vec![], + extensions: vec![], + extension_urns: vec![], + parameter_bindings: vec![], + type_aliases: vec![], + relations: vec![PlanRel { + rel_type: Some(plan_rel::RelType::Root(substrait::proto::RelRoot { + input: Some(Rel { + rel_type: Some(RelType::Read(Box::new(read_rel))), + }), + names: vec!["a".to_string(), "b".to_string()], + })), + }], + advanced_extensions: None, + expected_type_urls: vec![], + } +} + +/// Custom consumer that implements resolve_local_files +struct LocalFilesConsumer { + extensions: Extensions, + state: SessionState, + table: Arc, +} + +#[async_trait] +impl SubstraitConsumer for LocalFilesConsumer { + async fn resolve_table_ref( + &self, + _table_ref: &TableReference, + ) -> Result>> { + // Return None to test that resolve_local_files is called + Ok(None) + } + + async fn resolve_local_files( + &self, + _paths: &[String], + _schema: &DFSchema, + ) -> Result> { + // Return the pre-configured table + Ok(Arc::clone(&self.table)) + } + + fn get_extensions(&self) -> &Extensions { + &self.extensions + } + + fn get_function_registry(&self) -> &impl FunctionRegistry { + &self.state + } +} + +#[tokio::test] +async fn test_local_files_multiple_files() -> Result<()> { + // Create a plan with multiple files + let plan = create_local_files_plan(vec![ + ( + PathType::UriFile("".to_string()), + "file:///path/to/file1.parquet".to_string(), + ), + ( + PathType::UriFile("".to_string()), + "file:///path/to/file2.parquet".to_string(), + ), + ( + PathType::UriFile("".to_string()), + "file:///path/to/file3.parquet".to_string(), + ), + ]); + + // Create a consumer that implements resolve_local_files + let ctx = SessionContext::new(); + let schema = Arc::new(create_test_schema()); + let table = Arc::new(MemTable::try_new(schema, vec![vec![]])?); + + let consumer = LocalFilesConsumer { + extensions: Extensions::default(), + state: ctx.state(), + table, + }; + + // Consume the plan - should succeed with multiple files + let result = + datafusion_substrait::logical_plan::consumer::from_substrait_plan_with_consumer( + &consumer, &plan, + ) + .await; + + assert!(result.is_ok(), "Should handle multiple files: {result:?}"); + let logical_plan = result.unwrap(); + + // Verify the plan structure + let plan_str = format!("{}", logical_plan.display_indent()); + assert!( + plan_str.contains("TableScan"), + "Plan should contain TableScan: {plan_str}", + ); + + Ok(()) +} + +#[tokio::test] +async fn test_local_files_fallback_behavior() -> Result<()> { + // Create a plan with single file - use a simple filename without extension + // to avoid dot being parsed as schema.table separator + let plan = create_local_files_plan(vec![( + PathType::UriFile("".to_string()), + "file:///path/to/testdata".to_string(), + )]); + + // Create a context with the table registered by filename + let ctx = SessionContext::new(); + let schema = Arc::new(create_test_schema()); + let table = Arc::new(MemTable::try_new(schema, vec![vec![]])?); + // Register with the basename that will be extracted from the path ("testdata") + ctx.register_table("testdata", table)?; + + // Use from_substrait_plan directly - this uses DefaultSubstraitConsumer + // which doesn't implement resolve_local_files, triggering the fallback + let result = from_substrait_plan(&ctx.state(), &plan).await; + + assert!( + result.is_ok(), + "Should fall back to resolve_table_ref: {result:?}", + ); + + Ok(()) +} + +#[tokio::test] +async fn test_local_files_path_type_variants() -> Result<()> { + // Test all PathType variants + let test_cases = vec![ + ( + PathType::UriPath("".to_string()), + "file:///path/to/uri_path.parquet", + ), + ( + PathType::UriPathGlob("".to_string()), + "file:///path/to/*.parquet", + ), + ( + PathType::UriFile("".to_string()), + "file:///path/to/uri_file.parquet", + ), + ( + PathType::UriFolder("".to_string()), + "file:///path/to/folder/", + ), + ]; + + for (path_type, path) in test_cases { + let plan = create_local_files_plan(vec![(path_type.clone(), path.to_string())]); + + let ctx = SessionContext::new(); + let schema = Arc::new(create_test_schema()); + let table = Arc::new(MemTable::try_new(schema, vec![vec![]])?); + + let consumer = LocalFilesConsumer { + extensions: Extensions::default(), + state: ctx.state(), + table, + }; + + let result = + datafusion_substrait::logical_plan::consumer::from_substrait_plan_with_consumer( + &consumer, + &plan, + ) + .await; + + assert!( + result.is_ok(), + "PathType {path_type:?} with path {path} should work: {result:?}", + ); + } + + Ok(()) +} + +#[tokio::test] +async fn test_local_files_no_valid_paths() -> Result<()> { + // Create a plan with empty items + let schema = create_test_schema(); + let named_struct = schema_to_named_struct(&schema); + + let read_rel = ReadRel { + common: None, + base_schema: Some(named_struct), + filter: None, + best_effort_filter: None, + projection: None, + advanced_extension: None, + read_type: Some(ReadType::LocalFiles(LocalFiles { + items: vec![], + advanced_extension: None, + })), + }; + + #[allow(deprecated)] + let plan = Plan { + version: None, + extension_uris: vec![], + extensions: vec![], + extension_urns: vec![], + parameter_bindings: vec![], + type_aliases: vec![], + relations: vec![PlanRel { + rel_type: Some(plan_rel::RelType::Root(substrait::proto::RelRoot { + input: Some(Rel { + rel_type: Some(RelType::Read(Box::new(read_rel))), + }), + names: vec!["a".to_string(), "b".to_string()], + })), + }], + advanced_extensions: None, + expected_type_urls: vec![], + }; + + let ctx = SessionContext::new(); + + // This should fail because there are no valid paths + let result = from_substrait_plan(&ctx.state(), &plan).await; + + assert!(result.is_err(), "Should fail with no valid paths"); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("No valid file paths"), + "Error should mention no valid paths: {err_msg}", + ); + + Ok(()) +} diff --git a/datafusion/substrait/tests/cases/mod.rs b/datafusion/substrait/tests/cases/mod.rs index 0870c56cd3ba2..615a7e6cd0bcb 100644 --- a/datafusion/substrait/tests/cases/mod.rs +++ b/datafusion/substrait/tests/cases/mod.rs @@ -20,6 +20,7 @@ mod builtin_expr_semantics_tests; mod consumer_integration; mod emit_kind_tests; mod function_test; +mod local_files_tests; mod logical_plans; mod roundtrip_logical_plan; #[cfg(feature = "physical")] diff --git a/datafusion/substrait/tests/utils.rs b/datafusion/substrait/tests/utils.rs index 2d63980aadf0d..686bdbe488ef4 100644 --- a/datafusion/substrait/tests/utils.rs +++ b/datafusion/substrait/tests/utils.rs @@ -172,7 +172,8 @@ pub mod test { match read_type { // Virtual Tables do not contribute to the schema ReadType::VirtualTable(_) => (), - ReadType::LocalFiles(_) => todo!(), + // LocalFiles are resolved by the consumer, not pre-registered + ReadType::LocalFiles(_) => (), ReadType::NamedTable(nt) => self.collect_named_table(r, nt)?, ReadType::ExtensionTable(_) => todo!(), ReadType::IcebergTable(_) => todo!(),