1919
2020# Accelerating Apache Iceberg Parquet Scans using Comet (Experimental)
2121
22- ** Note: Iceberg integration is a work-in-progress. It is currently necessary to build Iceberg from
23- source rather than using available artifacts in Maven**
22+ ** Note: Iceberg integration is a work-in-progress. Comet currently has two distinct Iceberg
23+ code paths: 1) a hybrid reader (native Parquet decoding, JVM otherwise) that requires
24+ building Iceberg from source rather than using available artifacts in Maven, and 2) fully-native
25+ reader (based on [ iceberg-rust] ( https://github.com/apache/iceberg-rust ) ). Directions for both
26+ designs are provided below.**
2427
25- ## Build Comet
28+ ## Hybrid Reader
29+
30+ ### Build Comet
2631
2732Run a Maven install so that we can compile Iceberg against latest Comet:
2833
@@ -42,7 +47,7 @@ Set `COMET_JAR` env var:
4247export COMET_JAR=` pwd` /spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION .jar
4348```
4449
45- ## Build Iceberg
50+ ### Build Iceberg
4651
4752Clone the Iceberg repository and apply code changes needed by Comet
4853
@@ -59,7 +64,7 @@ Perform a clean build
5964./gradlew clean build -x test -x integrationTest
6065```
6166
62- ## Test
67+ ### Test
6368
6469Set ` ICEBERG_JAR ` environment variable.
6570
@@ -140,7 +145,52 @@ scala> spark.sql(s"SELECT * from t1").explain()
140145+- CometBatchScan spark_catalog.default.t1[c0#26, c1#27] spark_catalog.default.t1 (branch=null) [filters=, groupedBy=] RuntimeFilters: []
141146```
142147
143- ## Known issues
148+ ### Known issues
144149
145150- Spark Runtime Filtering isn't [ working] ( https://github.com/apache/datafusion-comet/issues/2116 )
146151 - You can bypass the issue by either setting ` spark.sql.adaptive.enabled=false ` or ` spark.comet.exec.broadcastExchange.enabled=false `
152+
153+ ## Native Reader
154+
155+ Comet's fully-native Iceberg integration does not require modifying Iceberg source
156+ code. Instead, Comet relies on reflection to extract ` FileScanTask ` s from Iceberg, which are
157+ then serialized to Comet's native execution engine (see
158+ [ PR #2528 ] ( https://github.com/apache/datafusion-comet/pull/2528 ) ).
159+
160+ The example below uses Spark's package downloader to retrieve Comet 0.12.0 and Iceberg
161+ 1.8.1, but Comet has been tested with Iceberg 1.5, 1.7, 1.8, and 1.10. The key configuration
162+ to enable fully-native Iceberg is ` spark.comet.scan.icebergNative.enabled=true ` . This
163+ configuration should ** not** be used with the hybrid Iceberg configuration
164+ ` spark.sql.iceberg.parquet.reader-type=COMET ` from above.
165+
166+ ``` shell
167+ $SPARK_HOME /bin/spark-shell \
168+ --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.12.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \
169+ --repositories https://repo1.maven.org/maven2/ \
170+ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
171+ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
172+ --conf spark.sql.catalog.spark_catalog.type=hadoop \
173+ --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \
174+ --conf spark.plugins=org.apache.spark.CometPlugin \
175+ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
176+ --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
177+ --conf spark.comet.scan.icebergNative.enabled=true \
178+ --conf spark.comet.explainFallback.enabled=true \
179+ --conf spark.memory.offHeap.enabled=true \
180+ --conf spark.memory.offHeap.size=2g
181+ ```
182+
183+ The same sample queries from above can be used to test Comet's fully-native Iceberg integration,
184+ however the scan node to look for is ` CometIcebergNativeScan ` .
185+
186+ ### Current limitations
187+
188+ The following scenarios are not yet supported, but are work in progress:
189+
190+ - Iceberg table spec v3 scans will fall back.
191+ - Iceberg writes will fall back.
192+ - Iceberg table scans backed by Avro or ORC data files will fall back.
193+ - Iceberg table scans partitioned on ` BINARY ` or ` DECIMAL ` (with precision >28) columns will fall back.
194+ - Iceberg scans with residual filters (_ i.e._ , filter expressions that are not partition values,
195+ and are evaluated on the column values at scan time) of ` truncate ` , ` bucket ` , ` year ` , ` month ` ,
196+ ` day ` , ` hour ` will fall back.
0 commit comments