Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 113 additions & 2 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ use uuid::Uuid;

use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use super::UrlExt;
use crate::engine::arrow_conversion::TryIntoArrow as _;
use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{
fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes,
RowIndexBuilder,
};
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::schema::{SchemaRef, StructType};
use crate::{
DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetHandler,
PredicateRef,
Expand Down Expand Up @@ -245,6 +245,41 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
self.readahead,
)
}

fn get_parquet_schema(&self, file: &FileMeta) -> DeltaResult<SchemaRef> {
let store = self.store.clone();
let location = file.location.clone();

self.task_executor.block_on(async move {
let arrow_schema =
if location.is_presigned() {
// Handle presigned URLs by fetching the file via HTTP
let client = reqwest::Client::new();
let response = client.get(location.as_str()).send().await.map_err(|e| {
Error::generic(format!("Failed to fetch presigned URL: {}", e))
})?;
let bytes = response.bytes().await.map_err(|e| {
Error::generic(format!("Failed to read response bytes: {}", e))
})?;

// Load metadata from bytes
let metadata = ArrowReaderMetadata::load(&bytes, Default::default())?;
metadata.schema().clone()
} else {
// Handle object store paths
let path = Path::from_url_path(location.path())?;
let mut reader = ParquetObjectReader::new(store, path);
let metadata =
ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?;
metadata.schema().clone()
};

// Convert Arrow schema to Kernel schema
StructType::try_from_arrow(arrow_schema.as_ref())
.map(Arc::new)
.map_err(Error::Arrow)
})
}
}

/// Implements [`FileOpener`] for a parquet file
Expand Down Expand Up @@ -641,4 +676,80 @@ mod tests {
"Generic delta kernel error: Path must end with a trailing slash: memory:///data",
);
}

#[test]
fn test_get_parquet_schema() {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

// Use a checkpoint parquet file
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/with_checkpoint_no_last_checkpoint/_delta_log/00000000000000000002.checkpoint.parquet",
))
.unwrap();
let url = Url::from_file_path(path).unwrap();

let file_meta = FileMeta {
location: url,
last_modified: 0,
size: 0,
};

// Get the schema
let schema = handler.get_parquet_schema(&file_meta).unwrap();

// Verify the schema has fields
assert!(
schema.fields().count() > 0,
"Schema should have at least one field"
);

// Verify this is a checkpoint schema with expected fields
let field_names: Vec<&String> = schema.fields().map(|f| f.name()).collect();
assert!(
field_names.iter().any(|&name| name == "txn"),
"Checkpoint should have 'txn' field"
);
assert!(
field_names.iter().any(|&name| name == "add"),
"Checkpoint should have 'add' field"
);
assert!(
field_names.iter().any(|&name| name == "remove"),
"Checkpoint should have 'remove' field"
);
assert!(
field_names.iter().any(|&name| name == "metaData"),
"Checkpoint should have 'metaData' field"
);
assert!(
field_names.iter().any(|&name| name == "protocol"),
"Checkpoint should have 'protocol' field"
);

// Verify we can access field properties
for field in schema.fields() {
assert!(!field.name().is_empty(), "Field name should not be empty");
let _data_type = field.data_type(); // Should not panic
}
}

#[test]
fn test_get_parquet_schema_invalid_file() {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

// Test with a non-existent file
let mut temp_path = std::env::temp_dir();
temp_path.push("non_existent_file_for_test.parquet");
let url = Url::from_file_path(temp_path).unwrap();
let file_meta = FileMeta {
location: url,
last_modified: 0,
size: 0,
};

let result = handler.get_parquet_schema(&file_meta);
assert!(result.is_err(), "Should error on non-existent file");
}
}
110 changes: 108 additions & 2 deletions kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
use std::fs::File;
use std::sync::Arc;

use crate::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use crate::parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder};

use super::read_files;
use crate::engine::arrow_conversion::TryFromArrow as _;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{
fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes,
RowIndexBuilder,
};
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef};
use crate::schema::{SchemaRef, StructType};
use crate::{
DeltaResult, Error, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef,
};

pub(crate) struct SyncParquetHandler;

Expand Down Expand Up @@ -52,4 +56,106 @@ impl ParquetHandler for SyncParquetHandler {
) -> DeltaResult<FileDataReadResultIterator> {
read_files(files, schema, predicate, try_create_from_parquet)
}

fn get_parquet_schema(&self, file: &FileMeta) -> DeltaResult<SchemaRef> {
// Convert URL to file path
let path = file
.location
.to_file_path()
.map_err(|_| Error::generic("SyncEngine can only read local files"))?;

// Open the file
let file = File::open(path)?;

// Load metadata from the file
let metadata = ArrowReaderMetadata::load(&file, Default::default())?;
let arrow_schema = metadata.schema();

// Convert Arrow schema to Kernel schema
StructType::try_from_arrow(arrow_schema.as_ref())
.map(Arc::new)
.map_err(Error::Arrow)
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use url::Url;

#[test]
fn test_sync_get_parquet_schema() -> DeltaResult<()> {
let handler = SyncParquetHandler;

// Use a checkpoint parquet file
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/with_checkpoint_no_last_checkpoint/_delta_log/00000000000000000002.checkpoint.parquet",
))?;
let url = Url::from_file_path(path).unwrap();

let file_meta = FileMeta {
location: url,
last_modified: 0,
size: 0,
};

// Get the schema
let schema = handler.get_parquet_schema(&file_meta)?;

// Verify the schema has fields
assert!(
schema.fields().count() > 0,
"Schema should have at least one field"
);

// Verify this is a checkpoint schema with expected fields
let field_names: Vec<&String> = schema.fields().map(|f| f.name()).collect();
assert!(
field_names.iter().any(|&name| name == "txn"),
"Checkpoint should have 'txn' field"
);
assert!(
field_names.iter().any(|&name| name == "add"),
"Checkpoint should have 'add' field"
);
assert!(
field_names.iter().any(|&name| name == "remove"),
"Checkpoint should have 'remove' field"
);
assert!(
field_names.iter().any(|&name| name == "metaData"),
"Checkpoint should have 'metaData' field"
);
assert!(
field_names.iter().any(|&name| name == "protocol"),
"Checkpoint should have 'protocol' field"
);

// Verify we can access field properties
for field in schema.fields() {
assert!(!field.name().is_empty(), "Field name should not be empty");
let _data_type = field.data_type(); // Should not panic
}

Ok(())
}

#[test]
fn test_sync_get_parquet_schema_invalid_file() {
let handler = SyncParquetHandler;

// Test with a non-existent file
let mut temp_path = std::env::temp_dir();
temp_path.push("non_existent_file_for_sync_test.parquet");
let url = Url::from_file_path(temp_path).unwrap();
let file_meta = FileMeta {
location: url,
last_modified: 0,
size: 0,
};

let result = handler.get_parquet_schema(&file_meta);
assert!(result.is_err(), "Should error on non-existent file");
}
}
24 changes: 24 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,30 @@ pub trait ParquetHandler: AsAny {
physical_schema: SchemaRef,
predicate: Option<PredicateRef>,
) -> DeltaResult<FileDataReadResultIterator>;

/// Read the schema from a Parquet file's footer metadata without reading the data.
///
/// This method reads only the Parquet file footer (metadata section) to extract the schema,
/// which is useful for schema inspection, compatibility checking, and determining whether
/// parsed statistics columns are present and compatible with the current table schema.
///
/// # Parameters
///
/// - `file` - File metadata for the Parquet file whose schema should be read.
///
/// # Returns
///
/// A [`DeltaResult`] containing a [`SchemaRef`] representing the Parquet file's schema,
/// converted to Delta Kernel's schema format.
///
/// # Errors
///
/// Returns an error if:
/// - The file cannot be accessed or does not exist
/// - The file is not a valid Parquet file
/// - The footer cannot be read or parsed
/// - The schema cannot be converted to Delta Kernel's format
fn get_parquet_schema(&self, file: &FileMeta) -> DeltaResult<SchemaRef>;
}

/// The `Engine` trait encapsulates all the functionality an engine or connector needs to provide
Expand Down
Loading