diff --git a/docs/source/contributor-guide/parquet_scans.md b/docs/source/contributor-guide/parquet_scans.md index 4aec9f347d..2b1b53792c 100644 --- a/docs/source/contributor-guide/parquet_scans.md +++ b/docs/source/contributor-guide/parquet_scans.md @@ -131,7 +131,505 @@ The S3 support of `native_datafusion` has the following limitations: 2. **Custom credential providers**: Custom implementations of AWS credential providers are not supported. The implementation only supports the standard credential providers listed in the table above. We are planning to add support for custom credential providers through a JNI-based adapter that will allow calling Java credential providers from native code. See [issue #1829](https://github.com/apache/datafusion-comet/issues/1829) for more details. +## Architecture Diagrams + +This section provides detailed architecture diagrams for each of the three Parquet scan implementations. + +### `native_comet` Scan Architecture + +The `native_comet` scan is a hybrid approach where the JVM reads Parquet files and passes data to native execution. + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ NATIVE_COMET SCAN FLOW │ +└─────────────────────────────────────────────────────────────────────────────┘ + +JVM (Scala/Java) Native (Rust) +════════════════════════════════════════════════════════════════════════════════ + +┌───────────────────────────────┐ +│ CometScanExec │ Physical operator for DataSource V1 +│ (CometScanExec.scala:61) │ +│ │ +│ scanImpl = "native_comet" │ +└───────────┬───────────────────┘ + │ + │ wraps + ↓ +┌───────────────────────────────┐ +│ FileSourceScanExec │ Original Spark scan (from Catalyst) +│ (from Spark) │ +└───────────┬───────────────────┘ + │ + │ creates RDD using + ↓ +┌───────────────────────────────┐ +│ CometParquetFileFormat │ Custom Parquet file format handler +│ (CometParquetFileFormat.scala)│ +│ │ +│ Key methods: │ +│ - buildReaderWithPartition │ +│ Values() │ +│ - supportBatch() │ +└───────────┬───────────────────┘ + │ + │ creates via factory + ↓ +┌───────────────────────────────┐ +│ CometParquetPartitionReader │ Factory for creating partition readers +│ Factory (with prefetch) │ Supports asynchronous prefetch optimization +│ (.../CometParquetPartition │ +│ ReaderFactory.scala) │ +└───────────┬───────────────────┘ + │ + │ creates + ↓ +┌───────────────────────────────┐ +│ BatchReader │ Main Parquet reader +│ (BatchReader.java:90) │ Reads Parquet files via Hadoop APIs +│ │ +│ Key components: │ +│ ├─ ParquetFileReader │ Opens Parquet file, reads metadata +│ ├─ PageReadStore │ Reads column pages from file +│ ├─ ColumnDescriptors │ Parquet column metadata +│ └─ Capacity (batch size) │ Configurable batch size +│ │ +│ Process per row group: │ +│ 1. Read file footer/metadata │ +│ 2. For each column: │ +│ - Get ColumnDescriptor │ +│ - Read pages via │ +│ PageReadStore │ +│ - Create CometVector │ +│ from native data │ +│ 3. Return ColumnarBatch │ +└───────────┬───────────────────┘ + │ + │ Uses JNI to access native decoders + │ (not for page reading, only for + │ specialized operations if needed) + │ + ↓ +┌───────────────────────────────┐ +│ ColumnarBatch │ +│ with CometVector[] │ Arrow-compatible columnar vectors +│ │ +│ Each CometVector wraps: │ +│ - Arrow ColumnVector │ +│ - Native memory buffers │ +└───────────┬───────────────────┘ + │ + │ Passed to native execution via JNI + │ + ↓ ┌───────────────────────────┐ +┌───────────────────────────────┐ │ ScanExec │ +│ CometExecIterator │──────→│ (operators/scan.rs:54) │ +│ (JNI boundary) │ JNI │ │ +└───────────────────────────────┘ │ Reads batches from JVM │ + │ via CometBatchIterator │ + │ │ + │ Key operations: │ + │ ├─ next_batch() │ + │ ├─ FFI Arrow conversion │ + │ └─ Selection vectors │ + │ (for deletes) │ + └───────────┬───────────────┘ + │ + │ RecordBatch + ↓ + ┌───────────────────────────┐ + │ Native Execution Pipeline │ + │ (DataFusion operators) │ + │ │ + │ - FilterExec │ + │ - ProjectExec │ + │ - AggregateExec │ + │ - etc. │ + └───────────────────────────┘ + +═══════════════════════════════════════════════════════════════════════════════ +STORAGE ACCESS (native_comet) +═══════════════════════════════════════════════════════════════════════════════ + +┌────────────────────────────────┐ +│ Hadoop FileSystem API │ Used by BatchReader +│ │ +│ - LocalFileSystem │ For local files +│ - HDFS │ For HDFS +│ - S3A (Hadoop-AWS module) │ For S3 (using AWS Java SDK) +│ └─ Uses AWS Java SDK │ +│ │ +│ Configuration: │ +│ - fs.s3a.access.key │ +│ - fs.s3a.secret.key │ +│ - fs.s3a.endpoint │ +│ - Standard Hadoop S3A configs │ +└────────────────────────────────┘ + +Key Characteristics: +- ✅ Maximum Spark compatibility (uses Spark's Parquet reading) +- ✅ All Hadoop FileSystem implementations supported (HDFS, S3A, etc.) +- ✅ Standard Hadoop S3A configuration +- ✅ Can use prefetch for improved performance +- ❌ No complex type support (structs, arrays, maps) +- ❌ Cannot read UINT_8/UINT_16 types by default +- ⚠️ Data crosses JVM→Native boundary via FFI +``` + +### `native_datafusion` Scan Architecture + +The `native_datafusion` scan is a fully native implementation using DataFusion's ParquetExec. + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ NATIVE_DATAFUSION SCAN FLOW │ +└─────────────────────────────────────────────────────────────────────────────┘ + +JVM (Scala/Java) Native (Rust) +════════════════════════════════════════════════════════════════════════════════ + +┌───────────────────────────────┐ +│ CometScanExec │ Initial scan created by CometScanRule +│ (CometScanExec.scala:61) │ +│ │ +│ scanImpl = "native_datafusion"│ +│ │ +│ Note: This is temporary! │ Only exists between CometScanRule +│ CometExecRule converts this │ and CometExecRule optimizer passes +│ to CometNativeScanExec │ +└───────────┬───────────────────┘ + │ + │ CometExecRule transformation + │ (CometExecRule.scala:160-162) + ↓ +┌───────────────────────────────┐ +│ CometNativeScanExec │ Fully native scan operator +│ (CometNativeScanExec.scala:46)│ +│ │ Extends CometLeafExec +│ Fields: │ (native execution root) +│ ├─ nativeOp: Operator │ Protobuf operator definition +│ ├─ relation: HadoopFsRelation │ File relation metadata +│ ├─ output: Seq[Attribute] │ Output schema +│ ├─ requiredSchema: StructType │ Projected schema +│ ├─ dataFilters: Seq[Expr] │ Pushed-down filters +│ └─ serializedPlanOpt │ Serialized native plan +└───────────┬───────────────────┘ + │ + │ Serialization via QueryPlanSerde + │ operator2Proto(scan) + ↓ +┌───────────────────────────────┐ +│ Protobuf Operator │ Serialized scan operator +│ (Operator protobuf message) │ +│ │ Contains: +│ Contains: │ - File paths +│ ├─ file_paths: Vec │ - Schema +│ ├─ schema: Schema │ - Filters (as Expr protobuf) +│ ├─ filters: Vec │ - Projection +│ └─ projection: Vec │ +└───────────┬───────────────────┘ + │ + │ Deserialization in native code + │ + ↓ ┌───────────────────────────┐ + │ PhysicalPlanner │ + │ (planner.rs) │ + │ │ + │ Deserializes protobuf │ + │ Creates physical plan │ + └───────────┬───────────────┘ + │ + │ calls + ↓ + ┌───────────────────────────┐ + │ init_datasource_exec() │ + │ (parquet_exec.rs) │ + │ │ + │ Creates DataFusion │ + │ DataSourceExec with: │ + │ │ + │ ├─ ParquetSource │ + │ ├─ SparkSchemaAdapter │ + │ │ (for Spark/Parquet │ + │ │ type compatibility) │ + │ ├─ Projection │ + │ ├─ Filters │ + │ └─ Partition fields │ + └───────────┬───────────────┘ + │ + ↓ + ┌───────────────────────────┐ + │ DataFusion │ + │ DataSourceExec │ + │ │ + │ ExecutionPlan trait │ + └───────────┬───────────────┘ + │ + │ execute() + ↓ + ┌───────────────────────────┐ + │ ParquetSource │ + │ │ + │ Opens and reads Parquet │ + │ files using arrow-rs │ + │ ParquetRecordBatchReader │ + └───────────┬───────────────┘ + │ + ↓ + ┌────────────────────────────┐ + │ Arrow Parquet Reader │ + │ (from arrow-rs crate) │ + │ │ + │ Operations: │ + │ ├─ Open file │ + │ ├─ Read metadata/foote r │ + │ ├─ Decode pages │ + │ ├─ Build RecordBatches │ + │ └─ Apply filters/projection│ + └───────────┬────────────────┘ + │ + │ RecordBatch stream + ↓ + ┌───────────────────────────┐ + │ Native Execution Pipeline │ + │ (all DataFusion) │ + │ │ + │ - FilterExec │ + │ - ProjectExec │ + │ - AggregateExec │ + │ - etc. │ + └───────────┬───────────────┘ + │ + │ Results via FFI + ↓ +┌───────────────────────────────┐ ┌───────────────────────────┐ +│ Spark execution continues │←──────│ Arrow C Data Interface │ +│ (receives ColumnarBatch) │ FFI │ (Arrow FFI) │ +└───────────────────────────────┘ └───────────────────────────┘ + +═══════════════════════════════════════════════════════════════════════════════ +STORAGE ACCESS (native_datafusion) +═══════════════════════════════════════════════════════════════════════════════ + + ┌───────────────────────────┐ + │ object_store crate │ + │ (objectstore/mod.rs) │ + │ │ + │ Implementations: │ + │ ├─ LocalFileSystem │ + │ ├─ S3 (via aws-sdk-rust) │ + │ │ └─ AWS Rust SDK │ + │ ├─ GCS │ + │ └─ Azure │ + │ │ + │ Configuration translation:│ + │ Hadoop S3A configs → │ + │ object_store configs │ + │ │ + │ - fs.s3a.access.key │ + │ - fs.s3a.secret.key │ + │ - fs.s3a.endpoint │ + │ - fs.s3a.endpoint.region │ + └───────────────────────────┘ + +Key Characteristics: +- ✅ Fully native execution (no JVM→Native data transfer for scan) +- ✅ Complex type support (structs, arrays, maps) +- ✅ Leverages DataFusion community improvements +- ✅ Better performance for some workloads +- ❌ More restrictions than native_comet (no bucketed scans) +- ❌ Cannot read UINT_8/UINT_16 types by default +- ❌ No support for ignoreMissingFiles/ignoreCorruptFiles +- ⚠️ Requires COMET_EXEC_ENABLED=true +- ⚠️ Uses object_store (different from Hadoop FileSystem) +``` + +### `native_iceberg_compat` Scan Architecture + +The `native_iceberg_compat` scan is designed for Iceberg integration, similar to `native_datafusion` but with an Iceberg-specific API layer. + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ NATIVE_ICEBERG_COMPAT SCAN FLOW │ +└─────────────────────────────────────────────────────────────────────────────┘ + +JVM (Scala/Java) Native (Rust) +════════════════════════════════════════════════════════════════════════════════ + +┌───────────────────────────────┐ +│ CometScanExec │ Physical operator for DataSource V1 +│ (CometScanExec.scala:61) │ +│ │ Auto-selected for Iceberg tables +│ scanImpl = │ by CometScanRule.selectScan() +│ "native_iceberg_compat" │ (CometScanRule.scala:357) +│ │ +│ Key difference: │ +│ - Prefetch is DISABLED │ Iceberg manages its own data access +│ (line 434: !usingDataFusion │ patterns +│ Reader) │ +└───────────┬───────────────────┘ + │ + │ Uses Iceberg-specific reader + ↓ +┌───────────────────────────────┐ +│ IcebergCometBatchReader │ Public API for Iceberg integration +│ (IcebergCometBatchReader.java │ +│ :29-43) │ +│ │ +│ Extends: BatchReader │ Inherits batch reading functionality +│ │ +│ Key method: │ +│ init(AbstractColumnReader[]) │ Iceberg provides column readers +│ │ +│ Purpose: │ +│ - Allows Iceberg to control │ +│ column reader initialization│ +│ - Iceberg passes its own │ +│ AbstractColumnReader[] │ +│ - Comet provides batch reading│ +│ interface │ +└───────────┬───────────────────┘ + │ + │ Iceberg creates AbstractColumnReader[] + │ for each Parquet column + │ + ↓ +┌───────────────────────────────┐ +│ AbstractColumnReader[] │ Iceberg-managed column readers +│ (from Iceberg) │ +│ │ +│ Each reader handles: │ +│ ├─ Column metadata │ +│ ├─ Page reading │ +│ ├─ Value decoding │ +│ └─ Deletion vectors │ Iceberg-specific feature +│ (row-level deletes) │ +└───────────┬───────────────────┘ + │ + │ IcebergCometBatchReader delegates to + │ parent BatchReader for batch creation + ↓ +┌───────────────────────────────┐ +│ BatchReader │ Same as native_comet +│ (BatchReader.java:90) │ +│ │ +│ Creates ColumnarBatch with: │ +│ - Data from Iceberg readers │ +│ - CometVector[] wrappers │ +│ - Partition values │ +└───────────┬───────────────────┘ + │ + │ Uses same native path as + │ native_datafusion for execution + │ + ↓ ┌───────────────────────────┐ +┌───────────────────────────────┐ │ Same as native_datafusion:│ +│ CometExecIterator │──────→│ │ +│ (JNI boundary) │ JNI │ - init_datasource_exec() │ +└───────────────────────────────┘ │ (parquet_exec.rs) │ + │ - DataSourceExec │ + │ - ParquetSource │ + │ - Arrow Parquet Reader │ + └───────────┬───────────────┘ + │ + │ RecordBatch + ↓ + ┌───────────────────────────┐ + │ Native Execution Pipeline │ + │ (DataFusion operators) │ + │ │ + │ With Iceberg features: │ + │ - Deletion vectors applied│ + │ - Schema evolution │ + │ - Partition evolution │ + └───────────────────────────┘ + +═══════════════════════════════════════════════════════════════════════════════ +ICEBERG-SPECIFIC FEATURES +═══════════════════════════════════════════════════════════════════════════════ + +┌───────────────────────────────┐ +│ Iceberg Table Integration │ +│ │ +│ Features supported: │ +│ ├─ Table metadata │ Via Iceberg catalog +│ ├─ Manifest files │ List of data files +│ ├─ Data files (Parquet) │ Actual table data +│ ├─ Deletion files │ Row-level deletes +│ │ └─ Position deletes │ Delete by row position +│ │ └─ Equality deletes │ Delete by column values +│ ├─ Schema evolution │ Add/remove/rename columns +│ └─ Partition evolution │ Change partitioning scheme +│ │ +│ Iceberg reads via: │ +│ ├─ FileScanTask objects │ Pre-planned file scans +│ ├─ PartitionSpec │ Partitioning metadata +│ └─ DataFile objects │ File-level metadata +└───────────────────────────────┘ + +═══════════════════════════════════════════════════════════════════════════════ +STORAGE ACCESS (native_iceberg_compat) +═══════════════════════════════════════════════════════════════════════════════ + +JVM Side: Native Side: + +┌────────────────────────────────┐ ┌────────────────────────────┐ +│ Hadoop FileSystem API │ │ object_store crate │ +│ (used by IcebergCometBatch │ │ (same as native_datafusion)│ +│ Reader for metadata) │ │ │ +│ │ │ - LocalFileSystem │ +│ - Local files │ │ - S3 (aws-sdk-rust) │ +│ - S3A (for metadata) │ │ - GCS │ +│ │ │ - Azure │ +└────────────────────────────────┘ └────────────────────────────┘ + +Auto-Selection Criteria (CometScanRule.scala:303-365): +──────────────────────────────────────────────────────── +✅ Iceberg table with SupportsComet trait +✅ Files on local filesystem OR S3 +✅ Valid S3 configuration (if S3) +✅ No unsupported complex types +✅ COMET_EXEC_ENABLED=true +❌ Falls back to native_comet if ANY check fails + +Key Characteristics: +- ✅ Designed for Iceberg integration +- ✅ Iceberg controls column reader initialization +- ✅ Supports Iceberg-specific features (deletes, schema evolution) +- ✅ Complex type support (via DataFusion) +- ✅ Same native execution as native_datafusion +- ❌ Prefetch disabled (Iceberg manages access patterns) +- ❌ Same restrictions as native_datafusion +- ⚠️ Only supports local filesystem and S3 +- ⚠️ Requires COMET_EXEC_ENABLED=true +``` + +## Implementation File Locations + +For reference, here are the key implementation files for each scan mode: + +### JVM Components + +| Component | File Path | Description | +|-----------|-----------|-------------| +| `CometScanExec` | `spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala` | Main scan operator for native_comet and native_iceberg_compat | +| `CometNativeScanExec` | `spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala` | Fully native scan operator for native_datafusion | +| `CometParquetFileFormat` | `spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala` | Custom Parquet file format handler | +| `CometParquetPartitionReaderFactory` | `spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala` | Factory for partition readers with prefetch | +| `BatchReader` | `common/src/main/java/org/apache/comet/parquet/BatchReader.java` | Main Parquet batch reader (native_comet) | +| `IcebergCometBatchReader` | `common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java` | Iceberg-specific batch reader API | +| `CometScanRule` | `spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala` | Optimizer rule for scan selection | + +### Native Components + +| Component | File Path | Description | +|-----------|-----------|-------------| +| `parquet_exec.rs` | `native/core/src/parquet/parquet_exec.rs` | DataFusion ParquetExec initialization | +| `scan.rs` | `native/core/src/execution/operators/scan.rs` | Generic ScanExec for reading from JVM | +| `planner.rs` | `native/core/src/execution/planner.rs` | Physical plan creation from protobuf | +| `objectstore/mod.rs` | `native/core/src/parquet/objectstore/mod.rs` | Object store abstraction for file access | [#1545]: https://github.com/apache/datafusion-comet/issues/1545 [#1758]: https://github.com/apache/datafusion-comet/issues/1758 +[#1829]: https://github.com/apache/datafusion-comet/issues/1829