diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index d540b9124..06602076b 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -21,7 +21,7 @@ use uuid::Uuid; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; use super::UrlExt; -use crate::engine::arrow_conversion::TryIntoArrow as _; +use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _}; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::{ fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes, @@ -29,7 +29,7 @@ use crate::engine::arrow_utils::{ }; use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; -use crate::schema::SchemaRef; +use crate::schema::{SchemaRef, StructType}; use crate::{ DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef, @@ -245,6 +245,41 @@ impl ParquetHandler for DefaultParquetHandler { self.readahead, ) } + + fn get_parquet_schema(&self, file: &FileMeta) -> DeltaResult { + let store = self.store.clone(); + let location = file.location.clone(); + + self.task_executor.block_on(async move { + let arrow_schema = + if location.is_presigned() { + // Handle presigned URLs by fetching the file via HTTP + let client = reqwest::Client::new(); + let response = client.get(location.as_str()).send().await.map_err(|e| { + Error::generic(format!("Failed to fetch presigned URL: {}", e)) + })?; + let bytes = response.bytes().await.map_err(|e| { + Error::generic(format!("Failed to read response bytes: {}", e)) + })?; + + // Load metadata from bytes + let metadata = ArrowReaderMetadata::load(&bytes, Default::default())?; + metadata.schema().clone() + } else { + // Handle object store paths + let path = Path::from_url_path(location.path())?; + let mut reader = ParquetObjectReader::new(store, path); + let metadata = + ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?; + metadata.schema().clone() + }; + + // Convert Arrow schema to Kernel schema + StructType::try_from_arrow(arrow_schema.as_ref()) + .map(Arc::new) + .map_err(Error::Arrow) + }) + } } /// Implements [`FileOpener`] for a parquet file @@ -641,4 +676,80 @@ mod tests { "Generic delta kernel error: Path must end with a trailing slash: memory:///data", ); } + + #[test] + fn test_get_parquet_schema() { + let store = Arc::new(LocalFileSystem::new()); + let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + + // Use a checkpoint parquet file + let path = std::fs::canonicalize(PathBuf::from( + "./tests/data/with_checkpoint_no_last_checkpoint/_delta_log/00000000000000000002.checkpoint.parquet", + )) + .unwrap(); + let url = Url::from_file_path(path).unwrap(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + // Get the schema + let schema = handler.get_parquet_schema(&file_meta).unwrap(); + + // Verify the schema has fields + assert!( + schema.fields().count() > 0, + "Schema should have at least one field" + ); + + // Verify this is a checkpoint schema with expected fields + let field_names: Vec<&String> = schema.fields().map(|f| f.name()).collect(); + assert!( + field_names.iter().any(|&name| name == "txn"), + "Checkpoint should have 'txn' field" + ); + assert!( + field_names.iter().any(|&name| name == "add"), + "Checkpoint should have 'add' field" + ); + assert!( + field_names.iter().any(|&name| name == "remove"), + "Checkpoint should have 'remove' field" + ); + assert!( + field_names.iter().any(|&name| name == "metaData"), + "Checkpoint should have 'metaData' field" + ); + assert!( + field_names.iter().any(|&name| name == "protocol"), + "Checkpoint should have 'protocol' field" + ); + + // Verify we can access field properties + for field in schema.fields() { + assert!(!field.name().is_empty(), "Field name should not be empty"); + let _data_type = field.data_type(); // Should not panic + } + } + + #[test] + fn test_get_parquet_schema_invalid_file() { + let store = Arc::new(LocalFileSystem::new()); + let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + + // Test with a non-existent file + let mut temp_path = std::env::temp_dir(); + temp_path.push("non_existent_file_for_test.parquet"); + let url = Url::from_file_path(temp_path).unwrap(); + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + let result = handler.get_parquet_schema(&file_meta); + assert!(result.is_err(), "Should error on non-existent file"); + } } diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index d170a2334..dc7ad3caa 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -1,17 +1,21 @@ use std::fs::File; +use std::sync::Arc; use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef; use crate::parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder}; use super::read_files; +use crate::engine::arrow_conversion::TryFromArrow as _; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::{ fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes, RowIndexBuilder, }; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; -use crate::schema::SchemaRef; -use crate::{DeltaResult, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef}; +use crate::schema::{SchemaRef, StructType}; +use crate::{ + DeltaResult, Error, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef, +}; pub(crate) struct SyncParquetHandler; @@ -52,4 +56,106 @@ impl ParquetHandler for SyncParquetHandler { ) -> DeltaResult { read_files(files, schema, predicate, try_create_from_parquet) } + + fn get_parquet_schema(&self, file: &FileMeta) -> DeltaResult { + // Convert URL to file path + let path = file + .location + .to_file_path() + .map_err(|_| Error::generic("SyncEngine can only read local files"))?; + + // Open the file + let file = File::open(path)?; + + // Load metadata from the file + let metadata = ArrowReaderMetadata::load(&file, Default::default())?; + let arrow_schema = metadata.schema(); + + // Convert Arrow schema to Kernel schema + StructType::try_from_arrow(arrow_schema.as_ref()) + .map(Arc::new) + .map_err(Error::Arrow) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + use url::Url; + + #[test] + fn test_sync_get_parquet_schema() -> DeltaResult<()> { + let handler = SyncParquetHandler; + + // Use a checkpoint parquet file + let path = std::fs::canonicalize(PathBuf::from( + "./tests/data/with_checkpoint_no_last_checkpoint/_delta_log/00000000000000000002.checkpoint.parquet", + ))?; + let url = Url::from_file_path(path).unwrap(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + // Get the schema + let schema = handler.get_parquet_schema(&file_meta)?; + + // Verify the schema has fields + assert!( + schema.fields().count() > 0, + "Schema should have at least one field" + ); + + // Verify this is a checkpoint schema with expected fields + let field_names: Vec<&String> = schema.fields().map(|f| f.name()).collect(); + assert!( + field_names.iter().any(|&name| name == "txn"), + "Checkpoint should have 'txn' field" + ); + assert!( + field_names.iter().any(|&name| name == "add"), + "Checkpoint should have 'add' field" + ); + assert!( + field_names.iter().any(|&name| name == "remove"), + "Checkpoint should have 'remove' field" + ); + assert!( + field_names.iter().any(|&name| name == "metaData"), + "Checkpoint should have 'metaData' field" + ); + assert!( + field_names.iter().any(|&name| name == "protocol"), + "Checkpoint should have 'protocol' field" + ); + + // Verify we can access field properties + for field in schema.fields() { + assert!(!field.name().is_empty(), "Field name should not be empty"); + let _data_type = field.data_type(); // Should not panic + } + + Ok(()) + } + + #[test] + fn test_sync_get_parquet_schema_invalid_file() { + let handler = SyncParquetHandler; + + // Test with a non-existent file + let mut temp_path = std::env::temp_dir(); + temp_path.push("non_existent_file_for_sync_test.parquet"); + let url = Url::from_file_path(temp_path).unwrap(); + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + let result = handler.get_parquet_schema(&file_meta); + assert!(result.is_err(), "Should error on non-existent file"); + } } diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 0f611cfbc..d437df370 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -696,6 +696,30 @@ pub trait ParquetHandler: AsAny { physical_schema: SchemaRef, predicate: Option, ) -> DeltaResult; + + /// Read the schema from a Parquet file's footer metadata without reading the data. + /// + /// This method reads only the Parquet file footer (metadata section) to extract the schema, + /// which is useful for schema inspection, compatibility checking, and determining whether + /// parsed statistics columns are present and compatible with the current table schema. + /// + /// # Parameters + /// + /// - `file` - File metadata for the Parquet file whose schema should be read. + /// + /// # Returns + /// + /// A [`DeltaResult`] containing a [`SchemaRef`] representing the Parquet file's schema, + /// converted to Delta Kernel's schema format. + /// + /// # Errors + /// + /// Returns an error if: + /// - The file cannot be accessed or does not exist + /// - The file is not a valid Parquet file + /// - The footer cannot be read or parsed + /// - The schema cannot be converted to Delta Kernel's format + fn get_parquet_schema(&self, file: &FileMeta) -> DeltaResult; } /// The `Engine` trait encapsulates all the functionality an engine or connector needs to provide