diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index e3b0e40566..712f6900c0 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -127,6 +127,7 @@ jobs: org.apache.spark.sql.comet.ParquetEncryptionITCase org.apache.comet.exec.CometNativeReaderSuite org.apache.comet.CometIcebergNativeSuite + org.apache.comet.CometDeltaNativeSuite - name: "exec" value: | org.apache.comet.exec.CometAggregateSuite diff --git a/docs/source/user-guide/latest/datasources.md b/docs/source/user-guide/latest/datasources.md index cfce4d13d4..a29aac8a9b 100644 --- a/docs/source/user-guide/latest/datasources.md +++ b/docs/source/user-guide/latest/datasources.md @@ -34,6 +34,10 @@ Comet accelerates Iceberg scans of Parquet files. See the [Iceberg Guide] for mo [Iceberg Guide]: iceberg.md +### Delta Lake + +Comet can perform native scans of Delta tables when they do not have column mapping or deletion vectors enabled. Scans of Delta tables without these features are simply regular Spark parquet scans, so they follow the same behavior as the `parquet` format behavior described above. Scans of tables using these features may be supported in the future. + ### CSV Comet does not provide native CSV scan, but when `spark.comet.convert.csv.enabled` is enabled, data is immediately diff --git a/native/Cargo.lock b/native/Cargo.lock index dd908ce467..6eacfe258d 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1793,7 +1793,6 @@ dependencies = [ "datafusion-spark", "futures", "hdfs-sys", - "hdrs", "hex", "iceberg", "itertools 0.14.0", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 7b32be36a2..9fa5dde1d9 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -76,7 +76,7 @@ parking_lot = "0.12.5" datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"] } object_store_opendal = {version = "0.55.0", optional = true} -hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]} +hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3", "vendored"]} opendal = { version ="0.55.0", optional = true, features = ["services-hdfs"] } iceberg = { workspace = true } serde_json = "1.0" @@ -85,9 +85,6 @@ uuid = "1.18.1" [target.'cfg(target_os = "linux")'.dependencies] procfs = "0.18.0" -[target.'cfg(target_os = "macos")'.dependencies] -hdrs = { version = "0.3.2", features = ["vendored"] } - [dev-dependencies] pprof = { version = "0.15", features = ["flamegraph"] } criterion = { version = "0.7", features = ["async", "async_tokio", "async_std"] } diff --git a/spark/pom.xml b/spark/pom.xml index 3b832e37a2..793b37920a 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -112,6 +112,12 @@ under the License. + + com.google.guava + failureaccess + 1.0.3 + test + org.codehaus.jackson jackson-mapper-asl @@ -188,6 +194,12 @@ under the License. 1.5.2 test + + io.delta + delta-core_${scala.binary.version} + 2.4.0 + test + org.eclipse.jetty @@ -216,6 +228,12 @@ under the License. 1.8.1 test + + io.delta + delta-spark_${scala.binary.version} + 3.3.2 + test + org.eclipse.jetty @@ -241,6 +259,12 @@ under the License. 1.10.0 test + + io.delta + delta-spark_${scala.binary.version} + 4.0.0 + test + org.eclipse.jetty diff --git a/spark/src/main/scala/org/apache/comet/delta/DeltaReflection.scala b/spark/src/main/scala/org/apache/comet/delta/DeltaReflection.scala new file mode 100644 index 0000000000..ee1eeca4ea --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/delta/DeltaReflection.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.delta + +import org.apache.spark.internal.Logging + +/** + * Shared reflection utilities for Delta operations. + * + * This object provides common reflection methods used across Comet for interacting with Delta + * classes without requiring a runtime dependency on Delta + */ +object DeltaReflection extends Logging { + + /** + * Gets the Delta Protocol from a Spark FileFormat + */ + def getProtocol(fileFormat: Any): Option[Any] = { + try { + val field = fileFormat.getClass.getDeclaredField("protocol") + field.setAccessible(true) + Some(field.get(fileFormat)) + } catch { + case e: Exception => + logError( + s"Delta reflection failure: Failed to get protocol from FileFormat: ${e.getMessage}") + None + } + } + + /** + * Gets the tasks from a SparkScan. + * + * The tasks() method is protected in SparkScan, requiring reflection to access. + */ + def getMinReaderVersion(protocol: Any): Option[Int] = { + try { + val field = protocol.getClass.getDeclaredField("minReaderVersion") + field.setAccessible(true) + Some(field.get(protocol).asInstanceOf[Int]) + } catch { + case e: Exception => + logError( + "Delta reflection failure: Failed to get minReaderVersion from protocol: " + + s"${e.getMessage}") + None + } + } + + def getReaderFeatures(protocol: Any): Option[Set[String]] = { + try { + val field = protocol.getClass.getDeclaredField("readerFeatures") + field.setAccessible(true) + Some(field.get(protocol).asInstanceOf[Option[Set[String]]].getOrElse(Set.empty)) + } catch { + case e: Exception => + logError( + "Delta reflection failure: Failed to get minReaderVersion from protocol: " + + s"${e.getMessage}") + None + } + } +} + +/** + * Pre-extracted Delta metadata for native scan execution. + * + * This class holds all metadata extracted from Delta during the planning/validation phase in + * CometScanRule. By extracting all metadata once during validation (where reflection failures + * trigger fallback to Spark), we avoid redundant reflection during serialization (where failures + * would be fatal runtime errors). + * + * @param minReaderVersion + * The minimum reader version of the table + * @param readerFeatures + * A list of enabled reader features on the table + */ +case class CometDeltaNativeScanMetadata(minReaderVersion: Int, readerFeatures: Set[String]) + +object CometDeltaNativeScanMetadata extends Logging { + import DeltaReflection._ + + /** + * Extracts all Delta metadata needed for native scan execution. + * + * This method performs all reflection operations once during planning/validation. If any + * reflection operation fails, returns None to trigger fallback to Spark. + * + * @param fileFormat + * The FileFormat instance from the HadoopFsRelation + * @return + * Some(metadata) if all reflection succeeds, None to trigger fallback + */ + def extract(fileFormat: Any): Option[CometDeltaNativeScanMetadata] = { + getProtocol(fileFormat).flatMap { protocol => + for { + minReaderVersion <- getMinReaderVersion(protocol) + readerFeatures <- getReaderFeatures(protocol) + } yield { + CometDeltaNativeScanMetadata(minReaderVersion, readerFeatures) + } + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index e283f6b2cf..df30fd6874 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -45,6 +45,7 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.collection._ import org.apache.comet.{CometConf, MetricsSupport} +import org.apache.comet.delta.CometDeltaNativeScanMetadata import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory} /** @@ -583,7 +584,33 @@ object CometScanExec { def isFileFormatSupported(fileFormat: FileFormat): Boolean = { // Only support Spark's built-in Parquet scans, not others such as Delta which use a subclass // of ParquetFileFormat. - fileFormat.getClass().equals(classOf[ParquetFileFormat]) + fileFormat.getClass().equals(classOf[ParquetFileFormat]) || isSupportedDeltaScan(fileFormat) + } + + val unsupportedDeltaReaderFeatures: Set[String] = Set("columnMapping", "deletionVectors") + + def isSupportedDeltaScan(fileFormat: FileFormat): Boolean = { + if (fileFormat.getClass().getName() != "org.apache.spark.sql.delta.DeltaParquetFileFormat") { + return false + } + + // Delta scans without certain features enabled are simply normal Parquet scans that can + // take advantage of the native scan, so check to see if it is compatible + val deltaMetadata = CometDeltaNativeScanMetadata.extract(fileFormat) match { + case Some(m) => m + case None => return false + } + + // Version 1 has no special features + // Version 2 introduced column mapping, which is not supported + // Version 3 changes to use the readerFeatures list instead, so we check for incompatible + // features + deltaMetadata.minReaderVersion match { + case 1 => true + case 2 => false + case 3 => + deltaMetadata.readerFeatures.intersect(unsupportedDeltaReaderFeatures).isEmpty + } } } diff --git a/spark/src/test/scala/org/apache/comet/CometDeltaNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometDeltaNativeSuite.scala new file mode 100644 index 0000000000..b1923b1bf1 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometDeltaNativeSuite.scala @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.Row +import org.apache.spark.sql.comet.CometNativeScanExec +import org.apache.spark.sql.execution.SparkPlan + +import io.delta.tables.DeltaTable + +/** + * Test suite for native Delta scan using built-in native Parquet reader for simple reads + */ +class CometDeltaNativeSuite extends CometTestBase { + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .set("spark.databricks.delta.snapshotPartitions", "2") + .set(CometConf.COMET_NATIVE_SCAN_IMPL.key, "native_datafusion") + } + + /** Collects all ComeNativeScanExec nodes from a plan */ + private def collectNativeScans(plan: SparkPlan): Seq[CometNativeScanExec] = { + collect(plan) { case scan: CometNativeScanExec => + scan + } + } + + /** + * Helper to verify query correctness and that exactly one CometNativeScanExec is used. This + * ensures both correct results and that the native scan operator is being used. + */ + private def checkNativeScan(query: String, native: Boolean): Unit = { + val (_, plan) = checkSparkAnswer(query) + val nativeScans = collectNativeScans(plan) + + if (native) { + assert( + nativeScans.length == 1, + s"Expected exactly 1 CometNativeScanExec but found ${nativeScans.length}. Plan:\n$plan") + } else { + assert( + nativeScans.length == 0, + s"Expected no CometNativeScanExec but found ${nativeScans.length}. Plan:\n$plan") + } + } + + test("create and query simple Delta table") { + import testImplicits._ + + withTempDir { dir => + withTable("test_table") { + DeltaTable + .create(spark) + .tableName("test_table") + .addColumn("id", "INT") + .addColumn("name", "STRING") + .addColumn("value", "DOUBLE") + .location(dir.getAbsolutePath) + .execute() + + Seq((1, "Alice", 10.5), (2, "Bob", 20.3), (3, "Charlie", 30.7)) + .toDF("id", "name", "value") + .write + .format("delta") + .mode("append") + .save(dir.getAbsolutePath) + + checkNativeScan("SELECT * FROM test_table ORDER BY id", true) + } + } + } + + test("supported reader features") { + import testImplicits._ + + withTempDir { dir => + withTable("test_table") { + val table = DeltaTable + .create(spark) + .tableName("test_table") + .addColumn("id", "INT") + .addColumn("name", "STRING") + .addColumn("value", "DOUBLE") + .location(dir.getAbsolutePath) + .execute() + + table.addFeatureSupport("timestampNtz") + + Seq((1, "Alice", 10.5), (2, "Bob", 20.3), (3, "Charlie", 30.7)) + .toDF("id", "name", "value") + .write + .format("delta") + .mode("append") + .save(dir.getAbsolutePath) + + checkNativeScan("SELECT * FROM test_table ORDER BY id", true) + } + } + } + + test("deletion vectors not supported") { + import testImplicits._ + + withTempDir { dir => + withTable("test_table") { + DeltaTable + .create(spark) + .tableName("test_table") + .addColumn("id", "INT") + .addColumn("name", "STRING") + .addColumn("value", "DOUBLE") + .property("delta.enableDeletionVectors", "true") + .location(dir.getAbsolutePath) + .execute() + + Seq((1, "Alice", 10.5), (2, "Bob", 20.3), (3, "Charlie", 30.7)) + .toDF("id", "name", "value") + .write + .format("delta") + .mode("append") + .save(dir.getAbsolutePath) + + checkNativeScan("SELECT * FROM test_table ORDER BY id", false) + } + } + } + + test("column mapping not supported") { + import testImplicits._ + + withTempDir { dir => + withTable("test_table") { + DeltaTable + .create(spark) + .tableName("test_table") + .addColumn("id", "INT") + .addColumn("name", "STRING") + .addColumn("value", "DOUBLE") + .property("delta.columnMapping.mode", "name") + .location(dir.getAbsolutePath) + .execute() + + Seq((1, "Alice", 10.5), (2, "Bob", 20.3), (3, "Charlie", 30.7)) + .toDF("id", "name", "value") + .write + .format("delta") + .mode("append") + .save(dir.getAbsolutePath) + + checkNativeScan("SELECT * FROM test_table ORDER BY id", false) + } + } + } + + test("change data feed enabled") { + import testImplicits._ + + withTempDir { dir => + withTable("test_table") { + DeltaTable + .create(spark) + .tableName("test_table") + .addColumn("id", "INT") + .addColumn("name", "STRING") + .addColumn("value", "DOUBLE") + .property("delta.enableChangeDataFeed", "true") + .location(dir.getAbsolutePath) + .execute() + + Seq((1, "Alice", 10.5), (2, "Bob", 20.3), (3, "Charlie", 30.7)) + .toDF("id", "name", "value") + .write + .format("delta") + .mode("append") + .save(dir.getAbsolutePath) + + // Reading from a table with change data feed enabled is supported if not reading change + // feed + checkNativeScan("SELECT * FROM test_table ORDER BY id", true) + + // Read a change feed uses a different relation that is not supported + checkNativeScan("SELECT * FROM table_changes('test_table', 0)", false) + } + } + } + + test("complex Delta table") { + + withTempDir { dir => + withTable("test_table") { + val table = DeltaTable + .create(spark) + .tableName("test_table") + .addColumn("nested", "struct") + .addColumn("arr", "array>") + .location(dir.getAbsolutePath) + .execute() + + spark + .createDataFrame( + List( + Row((1, 10.5), Seq((1, 10.5), (2, 15.0))), + Row((2, 20.3), Seq((2, 20.3), (3, 25.5)))).asJava, + table.toDF.schema) + .write + .format("delta") + .mode("append") + .save(dir.getAbsolutePath) + + checkNativeScan("SELECT * FROM test_table ORDER BY nested.id", true) + } + } + } +}