Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometDeltaNativeSuite
- name: "exec"
value: |
org.apache.comet.exec.CometAggregateSuite
Expand Down
4 changes: 4 additions & 0 deletions docs/source/user-guide/latest/datasources.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ Comet accelerates Iceberg scans of Parquet files. See the [Iceberg Guide] for mo

[Iceberg Guide]: iceberg.md

### Delta Lake

Comet can perform native scans of Delta tables when they do not have column mapping or deletion vectors enabled. Scans of Delta tables without these features are simply regular Spark parquet scans, so they follow the same behavior as the `parquet` format behavior described above. Scans of tables using these features may be supported in the future.

### CSV

Comet does not provide native CSV scan, but when `spark.comet.convert.csv.enabled` is enabled, data is immediately
Expand Down
1 change: 0 additions & 1 deletion native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ parking_lot = "0.12.5"
datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"] }
object_store_opendal = {version = "0.55.0", optional = true}
hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]}
hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3", "vendored"]}
Copy link
Contributor Author

@Kimahriman Kimahriman Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With opendal enabled by default in #2929, I was getting failures finding libhdfs.so running unit tests on Linux. The hdrs dependency below for macos was purely activating the vendored feature of hdfs-sys, so I just enabled that globally here so you don't need libhdfs available for developing on Linux either

opendal = { version ="0.55.0", optional = true, features = ["services-hdfs"] }
iceberg = { workspace = true }
serde_json = "1.0"
Expand All @@ -85,9 +85,6 @@ uuid = "1.18.1"
[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.18.0"

[target.'cfg(target_os = "macos")'.dependencies]
hdrs = { version = "0.3.2", features = ["vendored"] }

[dev-dependencies]
pprof = { version = "0.15", features = ["flamegraph"] }
criterion = { version = "0.7", features = ["async", "async_tokio", "async_std"] }
Expand Down
24 changes: 24 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>failureaccess</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
Comment on lines +115 to +120
Copy link
Contributor Author

@Kimahriman Kimahriman Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this I'm getting an error creating the Delta tables:

Cause: java.lang.ClassNotFoundException: com.google.common.util.concurrent.internal.InternalFutureFailureAccess
  at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
  at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
  at java.base/java.lang.ClassLoader.defineClass1(Native Method)
  at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
  at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:150)
  at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:862)
  at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:760)
  at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:681)
  at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:639)
  ...

Not sure why, could use some help figuring out what is missing with Guava from where

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
Expand Down Expand Up @@ -188,6 +194,12 @@ under the License.
<version>1.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_${scala.binary.version}</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<!-- Jetty 9.4.x for Spark 3.4 (JDK 11, javax.* packages) -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
Expand Down Expand Up @@ -216,6 +228,12 @@ under the License.
<version>1.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
<version>3.3.2</version>
<scope>test</scope>
</dependency>
<!-- Jetty 9.4.x for Spark 3.5 (JDK 11, javax.* packages) -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
Expand All @@ -241,6 +259,12 @@ under the License.
<version>1.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
<version>4.0.0</version>
<scope>test</scope>
</dependency>
<!-- Jetty 11.x for Spark 4.0 (jakarta.servlet) -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
Expand Down
121 changes: 121 additions & 0 deletions spark/src/main/scala/org/apache/comet/delta/DeltaReflection.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.delta

import org.apache.spark.internal.Logging

/**
* Shared reflection utilities for Delta operations.
*
* This object provides common reflection methods used across Comet for interacting with Delta
* classes without requiring a runtime dependency on Delta
*/
object DeltaReflection extends Logging {

/**
* Gets the Delta Protocol from a Spark FileFormat
*/
def getProtocol(fileFormat: Any): Option[Any] = {
try {
val field = fileFormat.getClass.getDeclaredField("protocol")
field.setAccessible(true)
Some(field.get(fileFormat))
} catch {
case e: Exception =>
logError(
s"Delta reflection failure: Failed to get protocol from FileFormat: ${e.getMessage}")
None
}
}

/**
* Gets the tasks from a SparkScan.
*
* The tasks() method is protected in SparkScan, requiring reflection to access.
*/
def getMinReaderVersion(protocol: Any): Option[Int] = {
try {
val field = protocol.getClass.getDeclaredField("minReaderVersion")
field.setAccessible(true)
Some(field.get(protocol).asInstanceOf[Int])
} catch {
case e: Exception =>
logError(
"Delta reflection failure: Failed to get minReaderVersion from protocol: " +
s"${e.getMessage}")
None
}
}

def getReaderFeatures(protocol: Any): Option[Set[String]] = {
try {
val field = protocol.getClass.getDeclaredField("readerFeatures")
field.setAccessible(true)
Some(field.get(protocol).asInstanceOf[Option[Set[String]]].getOrElse(Set.empty))
} catch {
case e: Exception =>
logError(
"Delta reflection failure: Failed to get minReaderVersion from protocol: " +
s"${e.getMessage}")
None
}
}
}

/**
* Pre-extracted Delta metadata for native scan execution.
*
* This class holds all metadata extracted from Delta during the planning/validation phase in
* CometScanRule. By extracting all metadata once during validation (where reflection failures
* trigger fallback to Spark), we avoid redundant reflection during serialization (where failures
* would be fatal runtime errors).
*
* @param minReaderVersion
* The minimum reader version of the table
* @param readerFeatures
* A list of enabled reader features on the table
*/
case class CometDeltaNativeScanMetadata(minReaderVersion: Int, readerFeatures: Set[String])

object CometDeltaNativeScanMetadata extends Logging {
import DeltaReflection._

/**
* Extracts all Delta metadata needed for native scan execution.
*
* This method performs all reflection operations once during planning/validation. If any
* reflection operation fails, returns None to trigger fallback to Spark.
*
* @param fileFormat
* The FileFormat instance from the HadoopFsRelation
* @return
* Some(metadata) if all reflection succeeds, None to trigger fallback
*/
def extract(fileFormat: Any): Option[CometDeltaNativeScanMetadata] = {
getProtocol(fileFormat).flatMap { protocol =>
for {
minReaderVersion <- getMinReaderVersion(protocol)
readerFeatures <- getReaderFeatures(protocol)
} yield {
CometDeltaNativeScanMetadata(minReaderVersion, readerFeatures)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.collection._

import org.apache.comet.{CometConf, MetricsSupport}
import org.apache.comet.delta.CometDeltaNativeScanMetadata
import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory}

/**
Expand Down Expand Up @@ -583,7 +584,33 @@ object CometScanExec {
def isFileFormatSupported(fileFormat: FileFormat): Boolean = {
// Only support Spark's built-in Parquet scans, not others such as Delta which use a subclass
// of ParquetFileFormat.
fileFormat.getClass().equals(classOf[ParquetFileFormat])
fileFormat.getClass().equals(classOf[ParquetFileFormat]) || isSupportedDeltaScan(fileFormat)
}

val unsupportedDeltaReaderFeatures: Set[String] = Set("columnMapping", "deletionVectors")

def isSupportedDeltaScan(fileFormat: FileFormat): Boolean = {
if (fileFormat.getClass().getName() != "org.apache.spark.sql.delta.DeltaParquetFileFormat") {
return false
}

// Delta scans without certain features enabled are simply normal Parquet scans that can
// take advantage of the native scan, so check to see if it is compatible
val deltaMetadata = CometDeltaNativeScanMetadata.extract(fileFormat) match {
case Some(m) => m
case None => return false
}

// Version 1 has no special features
// Version 2 introduced column mapping, which is not supported
// Version 3 changes to use the readerFeatures list instead, so we check for incompatible
// features
deltaMetadata.minReaderVersion match {
case 1 => true
case 2 => false
case 3 =>
deltaMetadata.readerFeatures.intersect(unsupportedDeltaReaderFeatures).isEmpty
}
}

}
Loading
Loading