Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions docs/source/user-guide/latest/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

# Accelerating Apache Iceberg Parquet Scans using Comet (Experimental)

**Note: Iceberg integration is a work-in-progress. It is currently necessary to build Iceberg from
source rather than using available artifacts in Maven**
**Note: Iceberg integration is a work-in-progress. Comet currently has two distinct Iceberg
code paths: 1) a hybrid reader (native Parquet decoding, JVM otherwise) that requires
building Iceberg from source rather than using available artifacts in Maven, and 2) fully-native
reader (based on [iceberg-rust](https://github.com/apache/iceberg-rust)). Directions for both
designs are provided below.**

## Build Comet
## Hybrid Reader

### Build Comet

Run a Maven install so that we can compile Iceberg against latest Comet:

Expand All @@ -42,7 +47,7 @@ Set `COMET_JAR` env var:
export COMET_JAR=`pwd`/spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION.jar
```

## Build Iceberg
### Build Iceberg

Clone the Iceberg repository and apply code changes needed by Comet

Expand All @@ -59,7 +64,7 @@ Perform a clean build
./gradlew clean build -x test -x integrationTest
```

## Test
### Test

Set `ICEBERG_JAR` environment variable.

Expand Down Expand Up @@ -140,7 +145,52 @@ scala> spark.sql(s"SELECT * from t1").explain()
+- CometBatchScan spark_catalog.default.t1[c0#26, c1#27] spark_catalog.default.t1 (branch=null) [filters=, groupedBy=] RuntimeFilters: []
```

## Known issues
### Known issues

- Spark Runtime Filtering isn't [working](https://github.com/apache/datafusion-comet/issues/2116)
- You can bypass the issue by either setting `spark.sql.adaptive.enabled=false` or `spark.comet.exec.broadcastExchange.enabled=false`

## Native Reader

Comet's fully-native Iceberg integration does not require modifying Iceberg source
code. Instead, Comet relies on reflection to extract `FileScanTask`s from Iceberg, which are
then serialized to Comet's native execution engine (see
[PR #2528](https://github.com/apache/datafusion-comet/pull/2528)).

The example below uses Spark's package downloader to retrieve Comet 0.12.0 and Iceberg
1.8.1, but Comet has been tested with Iceberg 1.5, 1.7, 1.8, and 1.10. The key configuration
to enable fully-native Iceberg is `spark.comet.scan.icebergNative.enabled=true`. This
configuration should **not** be used with the hybrid Iceberg configuration
`spark.sql.iceberg.parquet.reader-type=COMET` from above.

```shell
$SPARK_HOME/bin/spark-shell \
--packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.12.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) We don't need org.apache.iceberg:iceberg-core:1.8.1 if org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 is available.

Copy link
Contributor Author

@mbutrovich mbutrovich Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not find that to be the case in my testing because Comet relies on classes that are in core and not just runtime, but it's been a few weeks. Have you tested it?

I can test tomorrow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand it correctly, you're talking about the core or api classes you used in IcebergReflection.

jar tf iceberg-spark-runtime-3.5_2.12-1.10.0.jar | grep -E "org.apache.iceberg.ContentScanTask.class|org.apache.iceberg.FileScanTask.class|org.apache.iceberg.ContentFile.class|org.apache.iceberg.StructLike.class|org.apache.iceberg.PartitionScanTask.class|org.apache.iceberg.DeleteFile.class|org.apache.iceberg.expressions.Literal.class|org.apache.iceberg.SchemaParser.class|org.apache.iceberg.Schema.class|org.apache.iceberg.PartitionSpecParser.class|org.apache.iceberg.PartitionSpec.class|org.apache.iceberg.PartitionField.class|org/apache/iceberg/expressions/UnboundPredicate.class"
org/apache/iceberg/PartitionSpecParser.class
org/apache/iceberg/SchemaParser.class
org/apache/iceberg/ContentFile.class
org/apache/iceberg/ContentScanTask.class
org/apache/iceberg/DeleteFile.class
org/apache/iceberg/FileScanTask.class
org/apache/iceberg/PartitionField.class
org/apache/iceberg/PartitionScanTask.class
org/apache/iceberg/PartitionSpec.class
org/apache/iceberg/Schema.class
org/apache/iceberg/StructLike.class
org/apache/iceberg/expressions/Literal.class
org/apache/iceberg/expressions/UnboundPredicate.class

jar can be found from https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.5_2.12/1.10.0

--repositories https://repo1.maven.org/maven2/ \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.scan.icebergNative.enabled=true \
--conf spark.comet.explainFallback.enabled=true \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2g
```

The same sample queries from above can be used to test Comet's fully-native Iceberg integration,
however the scan node to look for is `CometIcebergNativeScan`.

### Current limitations

The following scenarios are not yet supported, but are work in progress:

- Iceberg table spec v3 scans will fall back.
- Iceberg writes will fall back.
- Iceberg table scans backed by Avro or ORC data files will fall back.
- Iceberg table scans partitioned on `BINARY` or `DECIMAL` (with precision >28) columns will fall back.
- Iceberg scans with residual filters (_i.e._, filter expressions that are not partition values,
and are evaluated on the column values at scan time) of `truncate`, `bucket`, `year`, `month`,
`day`, `hour` will fall back.