Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 89 additions & 34 deletions datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion::logical_expr::{
use std::sync::Arc;
use substrait::proto::expression::MaskExpression;
use substrait::proto::read_rel::ReadType;
use substrait::proto::read_rel::local_files::file_or_files::PathType::UriFile;
use substrait::proto::read_rel::local_files::file_or_files::PathType;
use substrait::proto::{Expression, ReadRel};
use url::Url;

Expand Down Expand Up @@ -176,45 +176,100 @@ pub async fn from_read_rel(
}))
}
Some(ReadType::LocalFiles(lf)) => {
fn extract_filename(name: &str) -> Option<String> {
let corrected_url =
if name.starts_with("file://") && !name.starts_with("file:///") {
name.replacen("file://", "file:///", 1)
} else {
name.to_string()
};

Url::parse(&corrected_url).ok().and_then(|url| {
let path = url.path();
std::path::Path::new(path)
.file_name()
.map(|filename| filename.to_string_lossy().to_string())
})
/// Extracts the path string from a PathType variant.
/// Normalizes file:// URLs to file:/// format for proper URL parsing.
fn extract_path(path_type: Option<&PathType>) -> Option<String> {
let path_str = match path_type? {
PathType::UriPath(p) => p.clone(),
PathType::UriPathGlob(p) => p.clone(),
PathType::UriFile(p) => p.clone(),
PathType::UriFolder(p) => p.clone(),
};

// Normalize file:// to file:/// for proper URL parsing
let normalized = if path_str.starts_with("file://")
&& !path_str.starts_with("file:///")
{
path_str.replacen("file://", "file:///", 1)
} else {
path_str
};

// Parse URL and extract the file system path component
Url::parse(&normalized)
.ok()
.map(|url| url.path().to_string())
}

// we could use the file name to check the original table provider
// TODO: currently does not support multiple local files
let filename: Option<String> =
lf.items.first().and_then(|x| match x.path_type.as_ref() {
Some(UriFile(name)) => extract_filename(name),
_ => None,
});
// Collect all file paths from LocalFiles items
let paths: Vec<String> = lf
.items
.iter()
.filter_map(|item| extract_path(item.path_type.as_ref()))
.collect();

if lf.items.len() > 1 || filename.is_none() {
return not_impl_err!("Only single file reads are supported");
if paths.is_empty() {
return plan_err!("No valid file paths found in LocalFiles");
}
let name = filename.unwrap();
// directly use unwrap here since we could determine it is a valid one
let table_reference = TableReference::Bare { table: name.into() };

read_with_schema(
consumer,
// Generate a table name from the first file path
let table_name = std::path::Path::new(&paths[0])
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "local_files".to_string());

let table_reference = TableReference::Bare {
table: table_name.clone().into(),
};

// Try to resolve files using the new trait method.
// If not implemented, fall back to the legacy single-file behavior
// for backward compatibility.
let provider = match consumer
.resolve_local_files(&paths, &substrait_schema)
.await
{
Ok(provider) => provider,
Err(_) => {
// Fallback: use the filename to look up a pre-registered table
// This maintains backward compatibility with the original behavior
match consumer.resolve_table_ref(&table_reference).await? {
Some(provider) => provider,
None => {
return plan_err!(
"No table named '{}' found. \
To support LocalFiles with multiple files or custom file resolution, \
implement the resolve_local_files method on your SubstraitConsumer.",
table_name
);
}
}
}
};

// Build the scan plan inline
let schema = substrait_schema.replace_qualifier(table_reference.clone());

let filters = if let Some(f) = &read.filter {
let filter_expr = consumer.consume_expression(f, &schema).await?;
split_conjunction_owned(filter_expr)
} else {
vec![]
};

let plan = LogicalPlanBuilder::scan_with_filters(
table_reference,
substrait_schema,
&read.projection,
&read.filter,
)
.await
provider_as_source(provider),
None,
filters,
)?
.build()?;

ensure_schema_compatibility(plan.schema(), schema.clone())?;

let schema = apply_masking(schema, &read.projection)?;

apply_projection(plan, schema)
}
_ => {
not_impl_err!("Unsupported Readtype: {:?}", read.read_type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,26 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
table_ref: &TableReference,
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>>;

/// Resolves a list of local file paths to a TableProvider.
///
/// Override this method to customize how file paths from Substrait LocalFiles
/// are resolved to table providers. The default implementation returns an error
/// since resolving arbitrary file paths requires access to an object store or
/// file system which may not be available in all contexts.
///
/// # Arguments
/// * `paths` - List of file paths (URIs) from Substrait LocalFiles
/// * `schema` - The expected schema of the files
async fn resolve_local_files(
&self,
_paths: &[String],
_schema: &DFSchema,
) -> datafusion::common::Result<Arc<dyn TableProvider>> {
not_impl_err!(
"resolve_local_files is not implemented. Override this method to support LocalFiles read types."
)
}

// TODO: Remove these two methods
// Ideally, the abstract consumer should not place any constraints on implementations.
// The functionality for which the Extensions and FunctionRegistry is needed should be abstracted
Expand Down
Loading