diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md index 0813eeeb2c..9bf681cb08 100644 --- a/docs/source/user-guide/latest/iceberg.md +++ b/docs/source/user-guide/latest/iceberg.md @@ -19,10 +19,15 @@ # Accelerating Apache Iceberg Parquet Scans using Comet (Experimental) -**Note: Iceberg integration is a work-in-progress. It is currently necessary to build Iceberg from -source rather than using available artifacts in Maven** +**Note: Iceberg integration is a work-in-progress. Comet currently has two distinct Iceberg +code paths: 1) a hybrid reader (native Parquet decoding, JVM otherwise) that requires +building Iceberg from source rather than using available artifacts in Maven, and 2) fully-native +reader (based on [iceberg-rust](https://github.com/apache/iceberg-rust)). Directions for both +designs are provided below.** -## Build Comet +## Hybrid Reader + +### Build Comet Run a Maven install so that we can compile Iceberg against latest Comet: @@ -42,7 +47,7 @@ Set `COMET_JAR` env var: export COMET_JAR=`pwd`/spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION.jar ``` -## Build Iceberg +### Build Iceberg Clone the Iceberg repository and apply code changes needed by Comet @@ -59,7 +64,7 @@ Perform a clean build ./gradlew clean build -x test -x integrationTest ``` -## Test +### Test Set `ICEBERG_JAR` environment variable. @@ -140,7 +145,52 @@ scala> spark.sql(s"SELECT * from t1").explain() +- CometBatchScan spark_catalog.default.t1[c0#26, c1#27] spark_catalog.default.t1 (branch=null) [filters=, groupedBy=] RuntimeFilters: [] ``` -## Known issues +### Known issues - Spark Runtime Filtering isn't [working](https://github.com/apache/datafusion-comet/issues/2116) - You can bypass the issue by either setting `spark.sql.adaptive.enabled=false` or `spark.comet.exec.broadcastExchange.enabled=false` + +## Native Reader + +Comet's fully-native Iceberg integration does not require modifying Iceberg source +code. Instead, Comet relies on reflection to extract `FileScanTask`s from Iceberg, which are +then serialized to Comet's native execution engine (see +[PR #2528](https://github.com/apache/datafusion-comet/pull/2528)). + +The example below uses Spark's package downloader to retrieve Comet 0.12.0 and Iceberg +1.8.1, but Comet has been tested with Iceberg 1.5, 1.7, 1.8, and 1.10. The key configuration +to enable fully-native Iceberg is `spark.comet.scan.icebergNative.enabled=true`. This +configuration should **not** be used with the hybrid Iceberg configuration +`spark.sql.iceberg.parquet.reader-type=COMET` from above. + +```shell +$SPARK_HOME/bin/spark-shell \ + --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.12.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \ + --repositories https://repo1.maven.org/maven2/ \ + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.spark_catalog.type=hadoop \ + --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \ + --conf spark.plugins=org.apache.spark.CometPlugin \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ + --conf spark.comet.scan.icebergNative.enabled=true \ + --conf spark.comet.explainFallback.enabled=true \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=2g +``` + +The same sample queries from above can be used to test Comet's fully-native Iceberg integration, +however the scan node to look for is `CometIcebergNativeScan`. + +### Current limitations + +The following scenarios are not yet supported, but are work in progress: + +- Iceberg table spec v3 scans will fall back. +- Iceberg writes will fall back. +- Iceberg table scans backed by Avro or ORC data files will fall back. +- Iceberg table scans partitioned on `BINARY` or `DECIMAL` (with precision >28) columns will fall back. +- Iceberg scans with residual filters (_i.e._, filter expressions that are not partition values, + and are evaluated on the column values at scan time) of `truncate`, `bucket`, `year`, `month`, + `day`, `hour` will fall back.