Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ parking_lot = "0.12.5"
datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"] }
object_store_opendal = {version = "0.55.0", optional = true}
hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]}
hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3", "vendored"]}
Copy link
Contributor Author

@Kimahriman Kimahriman Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With opendal enabled by default in #2929, I was getting failures finding libhdfs.so running unit tests on Linux. The hdrs dependency below for macos was purely activating the vendored feature of hdfs-sys, so I just enabled that globally here so you don't need libhdfs available for developing on Linux either

opendal = { version ="0.55.0", optional = true, features = ["services-hdfs"] }
iceberg = { workspace = true }
serde_json = "1.0"
Expand All @@ -85,9 +85,6 @@ uuid = "1.18.1"
[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.18.0"

[target.'cfg(target_os = "macos")'.dependencies]
hdrs = { version = "0.3.2", features = ["vendored"] }

[dev-dependencies]
pprof = { version = "0.15", features = ["flamegraph"] }
criterion = { version = "0.7", features = ["async", "async_tokio", "async_std"] }
Expand Down
24 changes: 24 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>failureaccess</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
Comment on lines +115 to +120
Copy link
Contributor Author

@Kimahriman Kimahriman Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this I'm getting an error creating the Delta tables:

Cause: java.lang.ClassNotFoundException: com.google.common.util.concurrent.internal.InternalFutureFailureAccess
  at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
  at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
  at java.base/java.lang.ClassLoader.defineClass1(Native Method)
  at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017)
  at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:150)
  at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:862)
  at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:760)
  at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:681)
  at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:639)
  ...

Not sure why, could use some help figuring out what is missing with Guava from where

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
Expand Down Expand Up @@ -188,6 +194,12 @@ under the License.
<version>1.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_${scala.binary.version}</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<!-- Jetty 9.4.x for Spark 3.4 (JDK 11, javax.* packages) -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
Expand Down Expand Up @@ -216,6 +228,12 @@ under the License.
<version>1.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
<version>3.3.2</version>
<scope>test</scope>
</dependency>
<!-- Jetty 9.4.x for Spark 3.5 (JDK 11, javax.* packages) -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
Expand All @@ -241,6 +259,12 @@ under the License.
<version>1.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
<version>4.0.0</version>
<scope>test</scope>
</dependency>
<!-- Jetty 11.x for Spark 4.0 (jakarta.servlet) -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
Expand Down
121 changes: 121 additions & 0 deletions spark/src/main/scala/org/apache/comet/delta/DeltaReflection.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.delta

import org.apache.spark.internal.Logging

/**
* Shared reflection utilities for Delta operations.
*
* This object provides common reflection methods used across Comet for interacting with Delta
* classes without requiring a runtime dependency on Delta
*/
object DeltaReflection extends Logging {

/**
* Gets the Delta Protocol from a Spark FileFormat
*/
def getProtocol(fileFormat: Any): Option[Any] = {
try {
val field = fileFormat.getClass.getDeclaredField("protocol")
field.setAccessible(true)
Some(field.get(fileFormat))
} catch {
case e: Exception =>
logError(
s"Delta reflection failure: Failed to get protocol from FileFormat: ${e.getMessage}")
None
}
}

/**
* Gets the tasks from a SparkScan.
*
* The tasks() method is protected in SparkScan, requiring reflection to access.
*/
def getMinReaderVersion(protocol: Any): Option[Int] = {
try {
val field = protocol.getClass.getDeclaredField("minReaderVersion")
field.setAccessible(true)
Some(field.get(protocol).asInstanceOf[Int])
} catch {
case e: Exception =>
logError(
"Delta reflection failure: Failed to get minReaderVersion from protocol: " +
s"${e.getMessage}")
None
}
}

def getReaderFeatures(protocol: Any): Option[Set[String]] = {
try {
val field = protocol.getClass.getDeclaredField("readerFeatures")
field.setAccessible(true)
Some(field.get(protocol).asInstanceOf[Option[Set[String]]].getOrElse(Set.empty))
} catch {
case e: Exception =>
logError(
"Delta reflection failure: Failed to get minReaderVersion from protocol: " +
s"${e.getMessage}")
None
}
}
}

/**
* Pre-extracted Delta metadata for native scan execution.
*
* This class holds all metadata extracted from Delta during the planning/validation phase in
* CometScanRule. By extracting all metadata once during validation (where reflection failures
* trigger fallback to Spark), we avoid redundant reflection during serialization (where failures
* would be fatal runtime errors).
*
* @param minReaderVersion
* The minimum reader version of the table
* @param readerFeatures
* A list of enabled reader features on the table
*/
case class CometDeltaNativeScanMetadata(minReaderVersion: Int, readerFeatures: Set[String])

object CometDeltaNativeScanMetadata extends Logging {
import DeltaReflection._

/**
* Extracts all Delta metadata needed for native scan execution.
*
* This method performs all reflection operations once during planning/validation. If any
* reflection operation fails, returns None to trigger fallback to Spark.
*
* @param fileFormat
* The FileFormat instance from the HadoopFsRelation
* @return
* Some(metadata) if all reflection succeeds, None to trigger fallback
*/
def extract(fileFormat: Any): Option[CometDeltaNativeScanMetadata] = {
getProtocol(fileFormat).flatMap { protocol =>
for {
minReaderVersion <- getMinReaderVersion(protocol)
readerFeatures <- getReaderFeatures(protocol)
} yield {
CometDeltaNativeScanMetadata(minReaderVersion, readerFeatures)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.collection._

import org.apache.comet.{CometConf, MetricsSupport}
import org.apache.comet.delta.CometDeltaNativeScanMetadata
import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory}

/**
Expand Down Expand Up @@ -583,7 +584,33 @@ object CometScanExec {
def isFileFormatSupported(fileFormat: FileFormat): Boolean = {
// Only support Spark's built-in Parquet scans, not others such as Delta which use a subclass
// of ParquetFileFormat.
fileFormat.getClass().equals(classOf[ParquetFileFormat])
fileFormat.getClass().equals(classOf[ParquetFileFormat]) || isSupportedDeltaScan(fileFormat)
}

val unsupportedDeltaReaderFeatures: Set[String] = Set("columnMapping", "deletionVectors")

def isSupportedDeltaScan(fileFormat: FileFormat): Boolean = {
if (fileFormat.getClass().getName() != "org.apache.spark.sql.delta.DeltaParquetFileFormat") {
return false
}

// Delta scans without certain features enabled are simply normal Parquet scans that can
// take advantage of the native scan, so check to see if it is compatible
val deltaMetadata = CometDeltaNativeScanMetadata.extract(fileFormat) match {
case Some(m) => m
case None => return false
}

// Version 1 has no special features
// Version 2 introduced column mapping, which is not supported
// Version 3 changes to use the readerFeatures list instead, so we check for incompatible
// features
deltaMetadata.minReaderVersion match {
case 1 => true
case 2 => false
case 3 =>
deltaMetadata.readerFeatures.intersect(unsupportedDeltaReaderFeatures).isEmpty
}
}

}
Loading
Loading