From 0ce372f6d797d0bc98ac29665db5c4d944b6feff Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Wed, 19 Nov 2025 01:50:08 +0000 Subject: [PATCH 1/9] vibed --- kernel/src/engine/default/parquet.rs | 39 ++++++++++++++++++++++++-- kernel/src/engine/sync/parquet.rs | 28 +++++++++++++++++-- kernel/src/lib.rs | 41 ++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 4 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index d540b9124..ad350b105 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -21,7 +21,7 @@ use uuid::Uuid; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; use super::UrlExt; -use crate::engine::arrow_conversion::TryIntoArrow as _; +use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _}; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::{ fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes, @@ -29,7 +29,7 @@ use crate::engine::arrow_utils::{ }; use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; -use crate::schema::SchemaRef; +use crate::schema::{SchemaRef, StructType}; use crate::{ DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef, @@ -245,6 +245,41 @@ impl ParquetHandler for DefaultParquetHandler { self.readahead, ) } + + fn get_parquet_schema(&self, file: &FileMeta) -> DeltaResult { + let store = self.store.clone(); + let location = file.location.clone(); + + self.task_executor.block_on(async move { + let arrow_schema = + if location.is_presigned() { + // Handle presigned URLs by fetching the file via HTTP + let client = reqwest::Client::new(); + let response = client.get(location.as_str()).send().await.map_err(|e| { + Error::generic(format!("Failed to fetch presigned URL: {}", e)) + })?; + let bytes = response.bytes().await.map_err(|e| { + Error::generic(format!("Failed to read response bytes: {}", e)) + })?; + + // Load metadata from bytes + let metadata = ArrowReaderMetadata::load(&bytes, Default::default())?; + metadata.schema().clone() + } else { + // Handle object store paths + let path = Path::from_url_path(location.path())?; + let mut reader = ParquetObjectReader::new(store, path); + let metadata = + ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?; + metadata.schema().clone() + }; + + // Convert Arrow schema to Kernel schema + StructType::try_from_arrow(arrow_schema.as_ref()) + .map(Arc::new) + .map_err(Error::Arrow) + }) + } } /// Implements [`FileOpener`] for a parquet file diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index d170a2334..b3140c589 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -1,17 +1,21 @@ use std::fs::File; +use std::sync::Arc; use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef; use crate::parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder}; use super::read_files; +use crate::engine::arrow_conversion::TryFromArrow as _; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::{ fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes, RowIndexBuilder, }; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; -use crate::schema::SchemaRef; -use crate::{DeltaResult, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef}; +use crate::schema::{SchemaRef, StructType}; +use crate::{ + DeltaResult, Error, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef, +}; pub(crate) struct SyncParquetHandler; @@ -52,4 +56,24 @@ impl ParquetHandler for SyncParquetHandler { ) -> DeltaResult { read_files(files, schema, predicate, try_create_from_parquet) } + + fn get_parquet_schema(&self, file: &FileMeta) -> DeltaResult { + // Convert URL to file path + let path = file + .location + .to_file_path() + .map_err(|_| Error::generic("SyncEngine can only read local files"))?; + + // Open the file + let file = File::open(path)?; + + // Load metadata from the file + let metadata = ArrowReaderMetadata::load(&file, Default::default())?; + let arrow_schema = metadata.schema(); + + // Convert Arrow schema to Kernel schema + StructType::try_from_arrow(arrow_schema.as_ref()) + .map(Arc::new) + .map_err(Error::Arrow) + } } diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 0f611cfbc..38aee3617 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -696,6 +696,47 @@ pub trait ParquetHandler: AsAny { physical_schema: SchemaRef, predicate: Option, ) -> DeltaResult; + + /// Read the schema from a Parquet file's footer metadata without reading the data. + /// + /// This method reads only the Parquet file footer (metadata section) to extract the schema, + /// which is useful for schema inspection, compatibility checking, and determining whether + /// parsed statistics columns are present and compatible with the current table schema. + /// + /// # Use Case + /// + /// This method is primarily used for checking whether a checkpoint file contains a + /// `stats_parsed` column with a compatible schema before attempting to read it. + /// Schema compatibility checking allows the kernel to: + /// - Determine if parsed stats are available + /// - Validate that the stats schema matches the current table schema + /// - Decide whether to use parsed stats or fall back to JSON stats + /// + /// # Performance + /// + /// Reading the footer is efficient as it: + /// - Requires only a single I/O operation + /// - Reads a small amount of data (typically 10KB - 1MB) + /// - Does not require decompression or data scanning + /// - Much faster than reading actual file data + /// + /// # Parameters + /// + /// - `file` - File metadata for the Parquet file whose schema should be read. + /// + /// # Returns + /// + /// A [`DeltaResult`] containing a [`SchemaRef`] representing the Parquet file's schema, + /// converted to Delta Kernel's schema format. + /// + /// # Errors + /// + /// Returns an error if: + /// - The file cannot be accessed or does not exist + /// - The file is not a valid Parquet file + /// - The footer cannot be read or parsed + /// - The schema cannot be converted to Delta Kernel's format + fn get_parquet_schema(&self, file: &FileMeta) -> DeltaResult; } /// The `Engine` trait encapsulates all the functionality an engine or connector needs to provide From 9335a6676e990dc06018cfbd68a47abb2b0b8011 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Wed, 19 Nov 2025 02:40:38 +0000 Subject: [PATCH 2/9] test --- kernel/src/engine/default/parquet.rs | 51 ++++++++++++++++++++++++++++ kernel/src/lib.rs | 17 ---------- 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index ad350b105..b992c0fc2 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -676,4 +676,55 @@ mod tests { "Generic delta kernel error: Path must end with a trailing slash: memory:///data", ); } + + #[test] + fn test_get_parquet_schema() { + let store = Arc::new(LocalFileSystem::new()); + let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + + // Test with an existing Parquet file + let path = std::fs::canonicalize(PathBuf::from( + "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet", + )) + .unwrap(); + let url = Url::from_file_path(path).unwrap(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + // Get the schema + let schema = handler.get_parquet_schema(&file_meta).unwrap(); + + // Verify the schema has fields + assert!( + schema.fields().count() > 0, + "Schema should have at least one field" + ); + + // Verify we can access field properties + for field in schema.fields() { + assert!(!field.name().is_empty(), "Field name should not be empty"); + let _data_type = field.data_type(); // Should not panic + } + } + + #[test] + fn test_get_parquet_schema_invalid_file() { + let store = Arc::new(LocalFileSystem::new()); + let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + + // Test with a non-existent file + let url = Url::from_file_path("/tmp/non_existent_file_for_test.parquet").unwrap(); + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + let result = handler.get_parquet_schema(&file_meta); + assert!(result.is_err(), "Should error on non-existent file"); + } } diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 38aee3617..d437df370 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -703,23 +703,6 @@ pub trait ParquetHandler: AsAny { /// which is useful for schema inspection, compatibility checking, and determining whether /// parsed statistics columns are present and compatible with the current table schema. /// - /// # Use Case - /// - /// This method is primarily used for checking whether a checkpoint file contains a - /// `stats_parsed` column with a compatible schema before attempting to read it. - /// Schema compatibility checking allows the kernel to: - /// - Determine if parsed stats are available - /// - Validate that the stats schema matches the current table schema - /// - Decide whether to use parsed stats or fall back to JSON stats - /// - /// # Performance - /// - /// Reading the footer is efficient as it: - /// - Requires only a single I/O operation - /// - Reads a small amount of data (typically 10KB - 1MB) - /// - Does not require decompression or data scanning - /// - Much faster than reading actual file data - /// /// # Parameters /// /// - `file` - File metadata for the Parquet file whose schema should be read. From c3b540bbc6127b6d40785b43752399af37144970 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Wed, 19 Nov 2025 18:45:18 +0000 Subject: [PATCH 3/9] fix --- kernel/src/engine/default/parquet.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index b992c0fc2..cbec172c5 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -717,7 +717,9 @@ mod tests { let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); // Test with a non-existent file - let url = Url::from_file_path("/tmp/non_existent_file_for_test.parquet").unwrap(); + let mut temp_path = std::env::temp_dir(); + temp_path.push("non_existent_file_for_test.parquet"); + let url = Url::from_file_path(temp_path).unwrap(); let file_meta = FileMeta { location: url, last_modified: 0, From 69766a0c32d503d3d924612cc3d10779494d2470 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Thu, 20 Nov 2025 19:47:08 +0000 Subject: [PATCH 4/9] tests --- kernel/tests/parquet_schema_tests.rs | 220 +++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 kernel/tests/parquet_schema_tests.rs diff --git a/kernel/tests/parquet_schema_tests.rs b/kernel/tests/parquet_schema_tests.rs new file mode 100644 index 000000000..a89b5b4b2 --- /dev/null +++ b/kernel/tests/parquet_schema_tests.rs @@ -0,0 +1,220 @@ +//! Tests for getting Parquet schema from file footer +use std::path::PathBuf; +use std::sync::Arc; + +use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::{Engine, FileMeta}; +use object_store::local::LocalFileSystem; +use url::Url; + +#[test] +fn test_get_parquet_schema_simple() -> Result<(), Box> { + // Use an existing test Parquet file + let path = std::fs::canonicalize(PathBuf::from( + "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet", + ))?; + let url = Url::from_file_path(path).unwrap(); + + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let parquet_handler = engine.parquet_handler(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + // Get the schema + let schema = parquet_handler.get_parquet_schema(&file_meta)?; + + // Verify the schema has expected fields + assert!( + schema.fields().count() > 0, + "Schema should have at least one field" + ); + + // This test file should have specific columns - let's verify some basic properties + let field_names: Vec<_> = schema.fields().map(|f| f.name()).collect(); + assert!(!field_names.is_empty(), "Should have field names"); + + Ok(()) +} + +#[test] +fn test_get_parquet_schema_with_stats_parsed() -> Result<(), Box> { + // Use a checkpoint file that might have stats_parsed + let checkpoint_files: Vec<&str> = vec![ + "./tests/data/table-with-dv-small/_delta_log/00000000000000000001.checkpoint.parquet", + "./tests/data/table-without-dv-small/_delta_log/00000000000000000001.checkpoint.parquet", + ]; + + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let parquet_handler = engine.parquet_handler(); + + for checkpoint_path in checkpoint_files { + let path_buf = PathBuf::from(checkpoint_path); + if !path_buf.exists() { + // Skip if file doesn't exist in test data + continue; + } + + let path = std::fs::canonicalize(path_buf)?; + let url = Url::from_file_path(path).unwrap(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + // Get the schema + let schema = parquet_handler.get_parquet_schema(&file_meta)?; + + // Verify the schema is valid + assert!( + schema.fields().count() > 0, + "Checkpoint schema should have fields" + ); + + // Check if add field exists (checkpoints should have add actions) + let fields: Vec<_> = schema.fields().collect(); + let has_add = fields.iter().any(|f| f.name() == "add"); + assert!(has_add, "Checkpoint should have 'add' field"); + + // If stats_parsed exists, verify it's a struct + if let Some(add_field) = fields.iter().find(|f| f.name() == "add") { + if let delta_kernel::schema::DataType::Struct(add_struct) = add_field.data_type() { + let add_fields: Vec<_> = add_struct.fields().collect(); + let has_stats_parsed = add_fields.iter().any(|f| f.name() == "stats_parsed"); + let has_stats = add_fields.iter().any(|f| f.name() == "stats"); + + // Should have at least one stats field + assert!( + has_stats_parsed || has_stats, + "Add action should have either stats or stats_parsed" + ); + } + } + } + + Ok(()) +} + +#[test] +fn test_get_parquet_schema_invalid_file() { + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let parquet_handler = engine.parquet_handler(); + + // Try with a non-existent file + let url = Url::from_file_path("/tmp/non_existent_file.parquet").unwrap(); + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + let result = parquet_handler.get_parquet_schema(&file_meta); + assert!(result.is_err(), "Should error on non-existent file"); +} + +#[test] +fn test_get_parquet_schema_non_parquet_file() -> Result<(), Box> { + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let parquet_handler = engine.parquet_handler(); + + // Try with a non-Parquet file (use a JSON log file) + let path = std::fs::canonicalize(PathBuf::from( + "./tests/data/table-with-dv-small/_delta_log/00000000000000000000.json", + ))?; + let url = Url::from_file_path(path).unwrap(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + let result = parquet_handler.get_parquet_schema(&file_meta); + assert!(result.is_err(), "Should error on non-Parquet file"); + + Ok(()) +} + +#[test] +fn test_get_parquet_schema_with_nested_types() -> Result<(), Box> { + // Find a test file with nested types (struct, array, map) + // Using type-widening test data which might have nested structures + let test_files = vec![ + "./tests/data/type-widening/part-00000-61accb66-b740-416b-9f5b-f0fccaceb415-c000.snappy.parquet", + ]; + + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let parquet_handler = engine.parquet_handler(); + + for test_file in test_files { + let path_buf = PathBuf::from(test_file); + if !path_buf.exists() { + continue; + } + + let path = std::fs::canonicalize(path_buf)?; + let url = Url::from_file_path(path).unwrap(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + // Get the schema + let schema = parquet_handler.get_parquet_schema(&file_meta)?; + + // Verify we can read the schema + assert!(schema.fields().count() > 0, "Schema should have fields"); + + // Print field types for debugging (if needed) + for field in schema.fields() { + let _data_type = field.data_type(); + // Schema was successfully read and converted + } + } + + Ok(()) +} + +#[test] +fn test_get_parquet_schema_preserves_field_metadata() -> Result<(), Box> { + // Test that field metadata (like parquet.field.id) is preserved + let path = std::fs::canonicalize(PathBuf::from( + "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet", + ))?; + let url = Url::from_file_path(path).unwrap(); + + let store = Arc::new(LocalFileSystem::new()); + let engine = Arc::new(DefaultEngine::new(store)); + let parquet_handler = engine.parquet_handler(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + // Get the schema + let schema = parquet_handler.get_parquet_schema(&file_meta)?; + + // Verify fields have been converted properly + for field in schema.fields() { + // Each field should have a name and data type + assert!(!field.name().is_empty(), "Field name should not be empty"); + let _data_type = field.data_type(); + // Successfully accessed field properties + } + + Ok(()) +} From 62eee21231e9e24056332ae28965cd8c854f20a7 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Thu, 20 Nov 2025 19:48:34 +0000 Subject: [PATCH 5/9] smaller --- kernel/tests/parquet_schema_tests.rs | 93 ---------------------------- 1 file changed, 93 deletions(-) diff --git a/kernel/tests/parquet_schema_tests.rs b/kernel/tests/parquet_schema_tests.rs index a89b5b4b2..51e9f7d1f 100644 --- a/kernel/tests/parquet_schema_tests.rs +++ b/kernel/tests/parquet_schema_tests.rs @@ -41,67 +41,6 @@ fn test_get_parquet_schema_simple() -> Result<(), Box> { Ok(()) } -#[test] -fn test_get_parquet_schema_with_stats_parsed() -> Result<(), Box> { - // Use a checkpoint file that might have stats_parsed - let checkpoint_files: Vec<&str> = vec![ - "./tests/data/table-with-dv-small/_delta_log/00000000000000000001.checkpoint.parquet", - "./tests/data/table-without-dv-small/_delta_log/00000000000000000001.checkpoint.parquet", - ]; - - let store = Arc::new(LocalFileSystem::new()); - let engine = Arc::new(DefaultEngine::new(store)); - let parquet_handler = engine.parquet_handler(); - - for checkpoint_path in checkpoint_files { - let path_buf = PathBuf::from(checkpoint_path); - if !path_buf.exists() { - // Skip if file doesn't exist in test data - continue; - } - - let path = std::fs::canonicalize(path_buf)?; - let url = Url::from_file_path(path).unwrap(); - - let file_meta = FileMeta { - location: url, - last_modified: 0, - size: 0, - }; - - // Get the schema - let schema = parquet_handler.get_parquet_schema(&file_meta)?; - - // Verify the schema is valid - assert!( - schema.fields().count() > 0, - "Checkpoint schema should have fields" - ); - - // Check if add field exists (checkpoints should have add actions) - let fields: Vec<_> = schema.fields().collect(); - let has_add = fields.iter().any(|f| f.name() == "add"); - assert!(has_add, "Checkpoint should have 'add' field"); - - // If stats_parsed exists, verify it's a struct - if let Some(add_field) = fields.iter().find(|f| f.name() == "add") { - if let delta_kernel::schema::DataType::Struct(add_struct) = add_field.data_type() { - let add_fields: Vec<_> = add_struct.fields().collect(); - let has_stats_parsed = add_fields.iter().any(|f| f.name() == "stats_parsed"); - let has_stats = add_fields.iter().any(|f| f.name() == "stats"); - - // Should have at least one stats field - assert!( - has_stats_parsed || has_stats, - "Add action should have either stats or stats_parsed" - ); - } - } - } - - Ok(()) -} - #[test] fn test_get_parquet_schema_invalid_file() { let store = Arc::new(LocalFileSystem::new()); @@ -186,35 +125,3 @@ fn test_get_parquet_schema_with_nested_types() -> Result<(), Box Result<(), Box> { - // Test that field metadata (like parquet.field.id) is preserved - let path = std::fs::canonicalize(PathBuf::from( - "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet", - ))?; - let url = Url::from_file_path(path).unwrap(); - - let store = Arc::new(LocalFileSystem::new()); - let engine = Arc::new(DefaultEngine::new(store)); - let parquet_handler = engine.parquet_handler(); - - let file_meta = FileMeta { - location: url, - last_modified: 0, - size: 0, - }; - - // Get the schema - let schema = parquet_handler.get_parquet_schema(&file_meta)?; - - // Verify fields have been converted properly - for field in schema.fields() { - // Each field should have a name and data type - assert!(!field.name().is_empty(), "Field name should not be empty"); - let _data_type = field.data_type(); - // Successfully accessed field properties - } - - Ok(()) -} From 8dcf0ce6afd70a7efe41e56f4c04b0b571e59b48 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Thu, 20 Nov 2025 23:07:15 +0000 Subject: [PATCH 6/9] fix --- kernel/src/engine/default/parquet.rs | 12 +++- kernel/src/engine/sync/parquet.rs | 84 ++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 2 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index cbec172c5..d136e467e 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -682,9 +682,9 @@ mod tests { let store = Arc::new(LocalFileSystem::new()); let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); - // Test with an existing Parquet file + // Use a checkpoint parquet file let path = std::fs::canonicalize(PathBuf::from( - "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet", + "./tests/data/with_checkpoint_no_last_checkpoint/_delta_log/00000000000000000002.checkpoint.parquet", )) .unwrap(); let url = Url::from_file_path(path).unwrap(); @@ -704,6 +704,14 @@ mod tests { "Schema should have at least one field" ); + // Verify this is a checkpoint schema with expected fields + let field_names: Vec<&str> = schema.fields().map(|f| f.name()).collect(); + assert!(field_names.contains(&"txn"), "Checkpoint should have 'txn' field"); + assert!(field_names.contains(&"add"), "Checkpoint should have 'add' field"); + assert!(field_names.contains(&"remove"), "Checkpoint should have 'remove' field"); + assert!(field_names.contains(&"metaData"), "Checkpoint should have 'metaData' field"); + assert!(field_names.contains(&"protocol"), "Checkpoint should have 'protocol' field"); + // Verify we can access field properties for field in schema.fields() { assert!(!field.name().is_empty(), "Field name should not be empty"); diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index b3140c589..f10fe88c4 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -77,3 +77,87 @@ impl ParquetHandler for SyncParquetHandler { .map_err(Error::Arrow) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + use url::Url; + + #[test] + fn test_sync_get_parquet_schema() -> DeltaResult<()> { + let handler = SyncParquetHandler; + + // Use a checkpoint parquet file + let path = std::fs::canonicalize(PathBuf::from( + "./tests/data/with_checkpoint_no_last_checkpoint/_delta_log/00000000000000000002.checkpoint.parquet", + ))?; + let url = Url::from_file_path(path).unwrap(); + + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + // Get the schema + let schema = handler.get_parquet_schema(&file_meta)?; + + println!("schema: {:?}", schema); + + // Verify the schema has fields + assert!( + schema.fields().count() > 0, + "Schema should have at least one field" + ); + + // Verify this is a checkpoint schema with expected fields + let field_names: Vec<&str> = schema.fields().map(|f| f.name()).collect(); + assert!( + field_names.contains(&"txn"), + "Checkpoint should have 'txn' field" + ); + assert!( + field_names.contains(&"add"), + "Checkpoint should have 'add' field" + ); + assert!( + field_names.contains(&"remove"), + "Checkpoint should have 'remove' field" + ); + assert!( + field_names.contains(&"metaData"), + "Checkpoint should have 'metaData' field" + ); + assert!( + field_names.contains(&"protocol"), + "Checkpoint should have 'protocol' field" + ); + + // Verify we can access field properties + for field in schema.fields() { + assert!(!field.name().is_empty(), "Field name should not be empty"); + let _data_type = field.data_type(); // Should not panic + } + + Ok(()) + } + + #[test] + fn test_sync_get_parquet_schema_invalid_file() { + let handler = SyncParquetHandler; + + // Test with a non-existent file + let mut temp_path = std::env::temp_dir(); + temp_path.push("non_existent_file_for_sync_test.parquet"); + let url = Url::from_file_path(temp_path).unwrap(); + let file_meta = FileMeta { + location: url, + last_modified: 0, + size: 0, + }; + + let result = handler.get_parquet_schema(&file_meta); + assert!(result.is_err(), "Should error on non-existent file"); + } +} From 131f2ec72d76fcf46b4e515b99a83fabbf5b7916 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Thu, 20 Nov 2025 23:19:46 +0000 Subject: [PATCH 7/9] parquet --- kernel/src/engine/default/parquet.rs | 25 ++++++++++++++++++++----- kernel/src/engine/sync/parquet.rs | 2 -- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index d136e467e..6b8363f13 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -706,11 +706,26 @@ mod tests { // Verify this is a checkpoint schema with expected fields let field_names: Vec<&str> = schema.fields().map(|f| f.name()).collect(); - assert!(field_names.contains(&"txn"), "Checkpoint should have 'txn' field"); - assert!(field_names.contains(&"add"), "Checkpoint should have 'add' field"); - assert!(field_names.contains(&"remove"), "Checkpoint should have 'remove' field"); - assert!(field_names.contains(&"metaData"), "Checkpoint should have 'metaData' field"); - assert!(field_names.contains(&"protocol"), "Checkpoint should have 'protocol' field"); + assert!( + field_names.contains(&"txn"), + "Checkpoint should have 'txn' field" + ); + assert!( + field_names.contains(&"add"), + "Checkpoint should have 'add' field" + ); + assert!( + field_names.contains(&"remove"), + "Checkpoint should have 'remove' field" + ); + assert!( + field_names.contains(&"metaData"), + "Checkpoint should have 'metaData' field" + ); + assert!( + field_names.contains(&"protocol"), + "Checkpoint should have 'protocol' field" + ); // Verify we can access field properties for field in schema.fields() { diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index f10fe88c4..9c914f183 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -103,8 +103,6 @@ mod tests { // Get the schema let schema = handler.get_parquet_schema(&file_meta)?; - println!("schema: {:?}", schema); - // Verify the schema has fields assert!( schema.fields().count() > 0, From 61d49055bd39eb63b573265c2a4526a1dc90fd0f Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Thu, 20 Nov 2025 23:25:45 +0000 Subject: [PATCH 8/9] fix --- kernel/src/engine/default/parquet.rs | 12 ++++++------ kernel/src/engine/sync/parquet.rs | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 6b8363f13..06602076b 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -705,25 +705,25 @@ mod tests { ); // Verify this is a checkpoint schema with expected fields - let field_names: Vec<&str> = schema.fields().map(|f| f.name()).collect(); + let field_names: Vec<&String> = schema.fields().map(|f| f.name()).collect(); assert!( - field_names.contains(&"txn"), + field_names.iter().any(|&name| name == "txn"), "Checkpoint should have 'txn' field" ); assert!( - field_names.contains(&"add"), + field_names.iter().any(|&name| name == "add"), "Checkpoint should have 'add' field" ); assert!( - field_names.contains(&"remove"), + field_names.iter().any(|&name| name == "remove"), "Checkpoint should have 'remove' field" ); assert!( - field_names.contains(&"metaData"), + field_names.iter().any(|&name| name == "metaData"), "Checkpoint should have 'metaData' field" ); assert!( - field_names.contains(&"protocol"), + field_names.iter().any(|&name| name == "protocol"), "Checkpoint should have 'protocol' field" ); diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index 9c914f183..dc7ad3caa 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -110,25 +110,25 @@ mod tests { ); // Verify this is a checkpoint schema with expected fields - let field_names: Vec<&str> = schema.fields().map(|f| f.name()).collect(); + let field_names: Vec<&String> = schema.fields().map(|f| f.name()).collect(); assert!( - field_names.contains(&"txn"), + field_names.iter().any(|&name| name == "txn"), "Checkpoint should have 'txn' field" ); assert!( - field_names.contains(&"add"), + field_names.iter().any(|&name| name == "add"), "Checkpoint should have 'add' field" ); assert!( - field_names.contains(&"remove"), + field_names.iter().any(|&name| name == "remove"), "Checkpoint should have 'remove' field" ); assert!( - field_names.contains(&"metaData"), + field_names.iter().any(|&name| name == "metaData"), "Checkpoint should have 'metaData' field" ); assert!( - field_names.contains(&"protocol"), + field_names.iter().any(|&name| name == "protocol"), "Checkpoint should have 'protocol' field" ); From ecb6d0c7b3dbe2f4ec003adaa2f6c3770d607c6c Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Thu, 20 Nov 2025 23:33:45 +0000 Subject: [PATCH 9/9] rm --- kernel/tests/parquet_schema_tests.rs | 127 --------------------------- 1 file changed, 127 deletions(-) delete mode 100644 kernel/tests/parquet_schema_tests.rs diff --git a/kernel/tests/parquet_schema_tests.rs b/kernel/tests/parquet_schema_tests.rs deleted file mode 100644 index 51e9f7d1f..000000000 --- a/kernel/tests/parquet_schema_tests.rs +++ /dev/null @@ -1,127 +0,0 @@ -//! Tests for getting Parquet schema from file footer -use std::path::PathBuf; -use std::sync::Arc; - -use delta_kernel::engine::default::DefaultEngine; -use delta_kernel::{Engine, FileMeta}; -use object_store::local::LocalFileSystem; -use url::Url; - -#[test] -fn test_get_parquet_schema_simple() -> Result<(), Box> { - // Use an existing test Parquet file - let path = std::fs::canonicalize(PathBuf::from( - "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet", - ))?; - let url = Url::from_file_path(path).unwrap(); - - let store = Arc::new(LocalFileSystem::new()); - let engine = Arc::new(DefaultEngine::new(store)); - let parquet_handler = engine.parquet_handler(); - - let file_meta = FileMeta { - location: url, - last_modified: 0, - size: 0, - }; - - // Get the schema - let schema = parquet_handler.get_parquet_schema(&file_meta)?; - - // Verify the schema has expected fields - assert!( - schema.fields().count() > 0, - "Schema should have at least one field" - ); - - // This test file should have specific columns - let's verify some basic properties - let field_names: Vec<_> = schema.fields().map(|f| f.name()).collect(); - assert!(!field_names.is_empty(), "Should have field names"); - - Ok(()) -} - -#[test] -fn test_get_parquet_schema_invalid_file() { - let store = Arc::new(LocalFileSystem::new()); - let engine = Arc::new(DefaultEngine::new(store)); - let parquet_handler = engine.parquet_handler(); - - // Try with a non-existent file - let url = Url::from_file_path("/tmp/non_existent_file.parquet").unwrap(); - let file_meta = FileMeta { - location: url, - last_modified: 0, - size: 0, - }; - - let result = parquet_handler.get_parquet_schema(&file_meta); - assert!(result.is_err(), "Should error on non-existent file"); -} - -#[test] -fn test_get_parquet_schema_non_parquet_file() -> Result<(), Box> { - let store = Arc::new(LocalFileSystem::new()); - let engine = Arc::new(DefaultEngine::new(store)); - let parquet_handler = engine.parquet_handler(); - - // Try with a non-Parquet file (use a JSON log file) - let path = std::fs::canonicalize(PathBuf::from( - "./tests/data/table-with-dv-small/_delta_log/00000000000000000000.json", - ))?; - let url = Url::from_file_path(path).unwrap(); - - let file_meta = FileMeta { - location: url, - last_modified: 0, - size: 0, - }; - - let result = parquet_handler.get_parquet_schema(&file_meta); - assert!(result.is_err(), "Should error on non-Parquet file"); - - Ok(()) -} - -#[test] -fn test_get_parquet_schema_with_nested_types() -> Result<(), Box> { - // Find a test file with nested types (struct, array, map) - // Using type-widening test data which might have nested structures - let test_files = vec![ - "./tests/data/type-widening/part-00000-61accb66-b740-416b-9f5b-f0fccaceb415-c000.snappy.parquet", - ]; - - let store = Arc::new(LocalFileSystem::new()); - let engine = Arc::new(DefaultEngine::new(store)); - let parquet_handler = engine.parquet_handler(); - - for test_file in test_files { - let path_buf = PathBuf::from(test_file); - if !path_buf.exists() { - continue; - } - - let path = std::fs::canonicalize(path_buf)?; - let url = Url::from_file_path(path).unwrap(); - - let file_meta = FileMeta { - location: url, - last_modified: 0, - size: 0, - }; - - // Get the schema - let schema = parquet_handler.get_parquet_schema(&file_meta)?; - - // Verify we can read the schema - assert!(schema.fields().count() > 0, "Schema should have fields"); - - // Print field types for debugging (if needed) - for field in schema.fields() { - let _data_type = field.data_type(); - // Schema was successfully read and converted - } - } - - Ok(()) -}