From c5013d87bdadd1a4d027f10b427828ad59a406b6 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Thu, 6 Nov 2025 23:36:47 +0800 Subject: [PATCH] chore: [iceberg] test iceberg 1.10.0 --- .github/workflows/iceberg_spark_test.yml | 6 +- dev/diffs/iceberg/1.10.0.diff | 1770 ++++++++++++++++++++++ 2 files changed, 1773 insertions(+), 3 deletions(-) create mode 100644 dev/diffs/iceberg/1.10.0.diff diff --git a/.github/workflows/iceberg_spark_test.yml b/.github/workflows/iceberg_spark_test.yml index 36c94fa6ef..7eb0a6c2bb 100644 --- a/.github/workflows/iceberg_spark_test.yml +++ b/.github/workflows/iceberg_spark_test.yml @@ -46,7 +46,7 @@ jobs: matrix: os: [ubuntu-24.04] java-version: [11, 17] - iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}] + iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}, {short: '1.10', full: '1.10.0'}] spark-version: [{short: '3.5', full: '3.5.7'}] scala-version: ['2.13'] fail-fast: false @@ -85,7 +85,7 @@ jobs: matrix: os: [ubuntu-24.04] java-version: [11, 17] - iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}] + iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}, {short: '1.10', full: '1.10.0'}] spark-version: [{short: '3.5', full: '3.5.7'}] scala-version: ['2.13'] fail-fast: false @@ -124,7 +124,7 @@ jobs: matrix: os: [ubuntu-24.04] java-version: [11, 17] - iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}] + iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}, {short: '1.10', full: '1.10.0'}] spark-version: [{short: '3.5', full: '3.5.7'}] scala-version: ['2.13'] fail-fast: false diff --git a/dev/diffs/iceberg/1.10.0.diff b/dev/diffs/iceberg/1.10.0.diff new file mode 100644 index 0000000000..551fc59faa --- /dev/null +++ b/dev/diffs/iceberg/1.10.0.diff @@ -0,0 +1,1770 @@ +diff --git a/build.gradle b/build.gradle +index 6bc052885fc..db2aca3a5ee 100644 +--- a/build.gradle ++++ b/build.gradle +@@ -878,6 +878,13 @@ project(':iceberg-parquet') { + implementation project(':iceberg-core') + implementation project(':iceberg-common') + ++ implementation("org.apache.datafusion:comet-spark-spark${sparkVersionsString}_${scalaVersion}:${libs.versions.comet.get()}") { ++ exclude group: 'org.apache.arrow' ++ exclude group: 'org.apache.parquet' ++ exclude group: 'org.apache.spark' ++ exclude group: 'org.apache.iceberg' ++ } ++ + implementation(libs.parquet.avro) { + exclude group: 'org.apache.avro', module: 'avro' + // already shaded by Parquet +diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml +index eeabe54f5f0..867018058ee 100644 +--- a/gradle/libs.versions.toml ++++ b/gradle/libs.versions.toml +@@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0" + bson-ver = "4.11.5" + caffeine = "2.9.3" + calcite = "1.40.0" +-comet = "0.8.1" ++comet = "0.12.0-SNAPSHOT" + datasketches = "6.2.0" + delta-standalone = "3.3.2" + delta-spark = "3.3.2" +diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java +new file mode 100644 +index 00000000000..ddf6c7de5ae +--- /dev/null ++++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java +@@ -0,0 +1,255 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package org.apache.iceberg.parquet; ++ ++import java.util.Map; ++import org.apache.comet.parquet.ParquetColumnSpec; ++import org.apache.iceberg.relocated.com.google.common.collect.Maps; ++import org.apache.parquet.column.ColumnDescriptor; ++import org.apache.parquet.schema.LogicalTypeAnnotation; ++import org.apache.parquet.schema.PrimitiveType; ++import org.apache.parquet.schema.Type; ++import org.apache.parquet.schema.Types; ++ ++public class CometTypeUtils { ++ ++ private CometTypeUtils() {} ++ ++ public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) { ++ ++ String[] path = descriptor.getPath(); ++ PrimitiveType primitiveType = descriptor.getPrimitiveType(); ++ String physicalType = primitiveType.getPrimitiveTypeName().name(); ++ ++ int typeLength = ++ primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY ++ ? primitiveType.getTypeLength() ++ : 0; ++ ++ boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; ++ ++ // ToDo: extract this into a Util method ++ String logicalTypeName = null; ++ Map logicalTypeParams = Maps.newHashMap(); ++ LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); ++ ++ if (logicalType != null) { ++ logicalTypeName = logicalType.getClass().getSimpleName(); ++ ++ // Handle specific logical types ++ if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { ++ LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = ++ (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; ++ logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); ++ logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); ++ } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { ++ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = ++ (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; ++ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); ++ logicalTypeParams.put("unit", timestamp.getUnit().name()); ++ } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { ++ LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = ++ (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; ++ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); ++ logicalTypeParams.put("unit", time.getUnit().name()); ++ } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { ++ LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = ++ (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; ++ logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); ++ logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); ++ } ++ } ++ ++ return new ParquetColumnSpec( ++ 1, // ToDo: pass in the correct id ++ path, ++ physicalType, ++ typeLength, ++ isRepeated, ++ descriptor.getMaxDefinitionLevel(), ++ descriptor.getMaxRepetitionLevel(), ++ logicalTypeName, ++ logicalTypeParams); ++ } ++ ++ public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) { ++ PrimitiveType.PrimitiveTypeName primType = ++ PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType()); ++ ++ Type.Repetition repetition; ++ if (columnSpec.getMaxRepetitionLevel() > 0) { ++ repetition = Type.Repetition.REPEATED; ++ } else if (columnSpec.getMaxDefinitionLevel() > 0) { ++ repetition = Type.Repetition.OPTIONAL; ++ } else { ++ repetition = Type.Repetition.REQUIRED; ++ } ++ ++ String name = columnSpec.getPath()[columnSpec.getPath().length - 1]; ++ // Reconstruct the logical type from parameters ++ LogicalTypeAnnotation logicalType = null; ++ if (columnSpec.getLogicalTypeName() != null) { ++ logicalType = ++ reconstructLogicalType( ++ columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams()); ++ } ++ ++ PrimitiveType primitiveType; ++ if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { ++ primitiveType = ++ org.apache.parquet.schema.Types.primitive(primType, repetition) ++ .length(columnSpec.getTypeLength()) ++ .as(logicalType) ++ .id(columnSpec.getFieldId()) ++ .named(name); ++ } else { ++ primitiveType = ++ Types.primitive(primType, repetition) ++ .as(logicalType) ++ .id(columnSpec.getFieldId()) ++ .named(name); ++ } ++ ++ return new ColumnDescriptor( ++ columnSpec.getPath(), ++ primitiveType, ++ columnSpec.getMaxRepetitionLevel(), ++ columnSpec.getMaxDefinitionLevel()); ++ } ++ ++ private static LogicalTypeAnnotation reconstructLogicalType( ++ String logicalTypeName, java.util.Map params) { ++ ++ switch (logicalTypeName) { ++ // MAP ++ case "MapLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.mapType(); ++ ++ // LIST ++ case "ListLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.listType(); ++ ++ // STRING ++ case "StringLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.stringType(); ++ ++ // MAP_KEY_VALUE ++ case "MapKeyValueLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(); ++ ++ // ENUM ++ case "EnumLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.enumType(); ++ ++ // DECIMAL ++ case "DecimalLogicalTypeAnnotation": ++ if (!params.containsKey("scale") || !params.containsKey("precision")) { ++ throw new IllegalArgumentException( ++ "Missing required parameters for DecimalLogicalTypeAnnotation: " + params); ++ } ++ int scale = Integer.parseInt(params.get("scale")); ++ int precision = Integer.parseInt(params.get("precision")); ++ return LogicalTypeAnnotation.decimalType(scale, precision); ++ ++ // DATE ++ case "DateLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.dateType(); ++ ++ // TIME ++ case "TimeLogicalTypeAnnotation": ++ if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { ++ throw new IllegalArgumentException( ++ "Missing required parameters for TimeLogicalTypeAnnotation: " + params); ++ } ++ ++ boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); ++ String timeUnitStr = params.get("unit"); ++ ++ LogicalTypeAnnotation.TimeUnit timeUnit; ++ switch (timeUnitStr) { ++ case "MILLIS": ++ timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS; ++ break; ++ case "MICROS": ++ timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS; ++ break; ++ case "NANOS": ++ timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS; ++ break; ++ default: ++ throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr); ++ } ++ return LogicalTypeAnnotation.timeType(isUTC, timeUnit); ++ ++ // TIMESTAMP ++ case "TimestampLogicalTypeAnnotation": ++ if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { ++ throw new IllegalArgumentException( ++ "Missing required parameters for TimestampLogicalTypeAnnotation: " + params); ++ } ++ boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); ++ String unitStr = params.get("unit"); ++ ++ LogicalTypeAnnotation.TimeUnit unit; ++ switch (unitStr) { ++ case "MILLIS": ++ unit = LogicalTypeAnnotation.TimeUnit.MILLIS; ++ break; ++ case "MICROS": ++ unit = LogicalTypeAnnotation.TimeUnit.MICROS; ++ break; ++ case "NANOS": ++ unit = LogicalTypeAnnotation.TimeUnit.NANOS; ++ break; ++ default: ++ throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr); ++ } ++ return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit); ++ ++ // INTEGER ++ case "IntLogicalTypeAnnotation": ++ if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) { ++ throw new IllegalArgumentException( ++ "Missing required parameters for IntLogicalTypeAnnotation: " + params); ++ } ++ boolean isSigned = Boolean.parseBoolean(params.get("isSigned")); ++ int bitWidth = Integer.parseInt(params.get("bitWidth")); ++ return LogicalTypeAnnotation.intType(bitWidth, isSigned); ++ ++ // JSON ++ case "JsonLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.jsonType(); ++ ++ // BSON ++ case "BsonLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.bsonType(); ++ ++ // UUID ++ case "UUIDLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.uuidType(); ++ ++ // INTERVAL ++ case "IntervalLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(); ++ ++ default: ++ throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName); ++ } ++ } ++} +diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java +new file mode 100644 +index 00000000000..a3cba401827 +--- /dev/null ++++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java +@@ -0,0 +1,260 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package org.apache.iceberg.parquet; ++ ++import java.io.IOException; ++import java.io.UncheckedIOException; ++import java.nio.ByteBuffer; ++import java.util.List; ++import java.util.Map; ++import java.util.NoSuchElementException; ++import java.util.function.Function; ++import org.apache.comet.parquet.FileReader; ++import org.apache.comet.parquet.ParquetColumnSpec; ++import org.apache.comet.parquet.ReadOptions; ++import org.apache.comet.parquet.RowGroupReader; ++import org.apache.comet.parquet.WrappedInputFile; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.iceberg.Schema; ++import org.apache.iceberg.exceptions.RuntimeIOException; ++import org.apache.iceberg.expressions.Expression; ++import org.apache.iceberg.expressions.Expressions; ++import org.apache.iceberg.io.CloseableGroup; ++import org.apache.iceberg.io.CloseableIterable; ++import org.apache.iceberg.io.CloseableIterator; ++import org.apache.iceberg.io.InputFile; ++import org.apache.iceberg.mapping.NameMapping; ++import org.apache.iceberg.relocated.com.google.common.collect.Lists; ++import org.apache.iceberg.util.ByteBuffers; ++import org.apache.parquet.ParquetReadOptions; ++import org.apache.parquet.column.ColumnDescriptor; ++import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; ++import org.apache.parquet.hadoop.metadata.ColumnPath; ++import org.apache.parquet.schema.MessageType; ++ ++public class CometVectorizedParquetReader extends CloseableGroup ++ implements CloseableIterable { ++ private final InputFile input; ++ private final ParquetReadOptions options; ++ private final Schema expectedSchema; ++ private final Function> batchReaderFunc; ++ private final Expression filter; ++ private final boolean reuseContainers; ++ private final boolean caseSensitive; ++ private final int batchSize; ++ private final NameMapping nameMapping; ++ private final Map properties; ++ private Long start = null; ++ private Long length = null; ++ private ByteBuffer fileEncryptionKey = null; ++ private ByteBuffer fileAADPrefix = null; ++ ++ public CometVectorizedParquetReader( ++ InputFile input, ++ Schema expectedSchema, ++ ParquetReadOptions options, ++ Function> readerFunc, ++ NameMapping nameMapping, ++ Expression filter, ++ boolean reuseContainers, ++ boolean caseSensitive, ++ int maxRecordsPerBatch, ++ Map properties, ++ Long start, ++ Long length, ++ ByteBuffer fileEncryptionKey, ++ ByteBuffer fileAADPrefix) { ++ this.input = input; ++ this.expectedSchema = expectedSchema; ++ this.options = options; ++ this.batchReaderFunc = readerFunc; ++ // replace alwaysTrue with null to avoid extra work evaluating a trivial filter ++ this.filter = filter == Expressions.alwaysTrue() ? null : filter; ++ this.reuseContainers = reuseContainers; ++ this.caseSensitive = caseSensitive; ++ this.batchSize = maxRecordsPerBatch; ++ this.nameMapping = nameMapping; ++ this.properties = properties; ++ this.start = start; ++ this.length = length; ++ this.fileEncryptionKey = fileEncryptionKey; ++ this.fileAADPrefix = fileAADPrefix; ++ } ++ ++ private ReadConf conf = null; ++ ++ private ReadConf init() { ++ if (conf == null) { ++ ReadConf readConf = ++ new ReadConf( ++ input, ++ options, ++ expectedSchema, ++ filter, ++ null, ++ batchReaderFunc, ++ nameMapping, ++ reuseContainers, ++ caseSensitive, ++ batchSize); ++ this.conf = readConf.copy(); ++ return readConf; ++ } ++ return conf; ++ } ++ ++ @Override ++ public CloseableIterator iterator() { ++ FileIterator iter = ++ new FileIterator<>(init(), properties, start, length, fileEncryptionKey, fileAADPrefix); ++ addCloseable(iter); ++ return iter; ++ } ++ ++ private static class FileIterator implements CloseableIterator { ++ // private final ParquetFileReader reader; ++ private final boolean[] shouldSkip; ++ private final VectorizedReader model; ++ private final long totalValues; ++ private final int batchSize; ++ private final List> columnChunkMetadata; ++ private final boolean reuseContainers; ++ private int nextRowGroup = 0; ++ private long nextRowGroupStart = 0; ++ private long valuesRead = 0; ++ private T last = null; ++ private final FileReader cometReader; ++ private ReadConf conf; ++ ++ FileIterator( ++ ReadConf conf, ++ Map properties, ++ Long start, ++ Long length, ++ ByteBuffer fileEncryptionKey, ++ ByteBuffer fileAADPrefix) { ++ this.shouldSkip = conf.shouldSkip(); ++ this.totalValues = conf.totalValues(); ++ this.reuseContainers = conf.reuseContainers(); ++ this.model = conf.vectorizedModel(); ++ this.batchSize = conf.batchSize(); ++ this.model.setBatchSize(this.batchSize); ++ this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); ++ this.cometReader = ++ newCometReader( ++ conf.file(), ++ conf.projection(), ++ properties, ++ start, ++ length, ++ fileEncryptionKey, ++ fileAADPrefix); ++ this.conf = conf; ++ } ++ ++ private FileReader newCometReader( ++ InputFile file, ++ MessageType projection, ++ Map properties, ++ Long start, ++ Long length, ++ ByteBuffer fileEncryptionKey, ++ ByteBuffer fileAADPrefix) { ++ try { ++ ReadOptions cometOptions = ReadOptions.builder(new Configuration()).build(); ++ ++ FileReader fileReader = ++ new FileReader( ++ new WrappedInputFile(file), ++ cometOptions, ++ properties, ++ start, ++ length, ++ ByteBuffers.toByteArray(fileEncryptionKey), ++ ByteBuffers.toByteArray(fileAADPrefix)); ++ ++ List columnDescriptors = projection.getColumns(); ++ ++ List specs = Lists.newArrayList(); ++ ++ for (ColumnDescriptor descriptor : columnDescriptors) { ++ ParquetColumnSpec spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); ++ specs.add(spec); ++ } ++ ++ fileReader.setRequestedSchemaFromSpecs(specs); ++ return fileReader; ++ } catch (IOException e) { ++ throw new UncheckedIOException("Failed to open Parquet file: " + file.location(), e); ++ } ++ } ++ ++ @Override ++ public boolean hasNext() { ++ return valuesRead < totalValues; ++ } ++ ++ @Override ++ public T next() { ++ if (!hasNext()) { ++ throw new NoSuchElementException(); ++ } ++ if (valuesRead >= nextRowGroupStart) { ++ advance(); ++ } ++ ++ // batchSize is an integer, so casting to integer is safe ++ int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize); ++ if (reuseContainers) { ++ this.last = model.read(last, numValuesToRead); ++ } else { ++ this.last = model.read(null, numValuesToRead); ++ } ++ valuesRead += numValuesToRead; ++ ++ return last; ++ } ++ ++ private void advance() { ++ while (shouldSkip[nextRowGroup]) { ++ nextRowGroup += 1; ++ cometReader.skipNextRowGroup(); ++ } ++ RowGroupReader pages; ++ try { ++ pages = cometReader.readNextRowGroup(); ++ } catch (IOException e) { ++ throw new RuntimeIOException(e); ++ } ++ ++ model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); ++ nextRowGroupStart += pages.getRowCount(); ++ nextRowGroup += 1; ++ } ++ ++ @Override ++ public void close() throws IOException { ++ model.close(); ++ cometReader.close(); ++ if (conf != null && conf.reader() != null) { ++ conf.reader().close(); ++ } ++ } ++ } ++} +diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +index 6f68fbe150f..b740543f3c9 100644 +--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java ++++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +@@ -1161,6 +1161,7 @@ public class Parquet { + private NameMapping nameMapping = null; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; ++ private boolean isComet; + + private ReadBuilder(InputFile file) { + this.file = file; +@@ -1205,6 +1206,11 @@ public class Parquet { + return this; + } + ++ public ReadBuilder enableComet(boolean enableComet) { ++ this.isComet = enableComet; ++ return this; ++ } ++ + /** + * @deprecated will be removed in 2.0.0; use {@link #createReaderFunc(Function)} instead + */ +@@ -1300,7 +1306,7 @@ public class Parquet { + } + + @Override +- @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) ++ @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "MethodLength"}) + public CloseableIterable build() { + FileDecryptionProperties fileDecryptionProperties = null; + if (fileEncryptionKey != null) { +@@ -1352,16 +1358,35 @@ public class Parquet { + } + + if (batchedReaderFunc != null) { +- return new VectorizedParquetReader<>( +- file, +- schema, +- options, +- batchedReaderFunc, +- mapping, +- filter, +- reuseContainers, +- caseSensitive, +- maxRecordsPerBatch); ++ if (isComet) { ++ LOG.info("Comet enabled"); ++ return new CometVectorizedParquetReader<>( ++ file, ++ schema, ++ options, ++ batchedReaderFunc, ++ mapping, ++ filter, ++ reuseContainers, ++ caseSensitive, ++ maxRecordsPerBatch, ++ properties, ++ start, ++ length, ++ fileEncryptionKey, ++ fileAADPrefix); ++ } else { ++ return new VectorizedParquetReader<>( ++ file, ++ schema, ++ options, ++ batchedReaderFunc, ++ mapping, ++ filter, ++ reuseContainers, ++ caseSensitive, ++ maxRecordsPerBatch); ++ } + } else { + Function> readBuilder = + readerFuncWithSchema != null +diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +index 1fb2372ba56..142e5fbadf1 100644 +--- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java ++++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +@@ -157,6 +157,14 @@ class ReadConf { + return newReader; + } + ++ InputFile file() { ++ return file; ++ } ++ ++ MessageType projection() { ++ return projection; ++ } ++ + ParquetValueReader model() { + return model; + } +diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle +index 69700d84366..49ea338a458 100644 +--- a/spark/v3.5/build.gradle ++++ b/spark/v3.5/build.gradle +@@ -264,6 +264,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio + integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') + integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') + integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') ++ integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" + + // runtime dependencies for running Hive Catalog based integration test + integrationRuntimeOnly project(':iceberg-hive-metastore') +diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java +index 4c1a5095916..964f196daad 100644 +--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java ++++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java +@@ -59,6 +59,16 @@ public abstract class ExtensionsTestBase extends CatalogTestBase { + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") + .config( + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean())) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .enableHiveSupport() + .getOrCreate(); + +diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java +index ecf9e6f8a59..0f8cced69aa 100644 +--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java ++++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java +@@ -56,6 +56,16 @@ public class TestCallStatementParser { + .master("local[2]") + .config("spark.sql.extensions", IcebergSparkSessionExtensions.class.getName()) + .config("spark.extra.prop", "value") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + TestCallStatementParser.parser = spark.sessionState().sqlParser(); + } +diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java +index 64edb1002e9..5bb449f1ac7 100644 +--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java ++++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java +@@ -179,6 +179,16 @@ public class DeleteOrphanFilesBenchmark { + .config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", catalogWarehouse()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .master("local"); + spark = builder.getOrCreate(); + } +diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java +index 77b79384a6d..08f7de1c0de 100644 +--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java ++++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java +@@ -392,6 +392,16 @@ public class IcebergSortCompactionBenchmark { + "spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", getCatalogWarehouse()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .master("local[*]"); + spark = builder.getOrCreate(); + Configuration sparkHadoopConf = spark.sessionState().newHadoopConf(); +diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java +index c6794e43c63..f7359197407 100644 +--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java ++++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java +@@ -239,6 +239,16 @@ public class DVReaderBenchmark { + .config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .master("local[*]") + .getOrCreate(); + } +diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java +index ac74fb5a109..e011b8b2510 100644 +--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java ++++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java +@@ -223,6 +223,16 @@ public class DVWriterBenchmark { + .config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .master("local[*]") + .getOrCreate(); + } +diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +index 68c537e34a4..f66be2f3896 100644 +--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java ++++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +@@ -94,7 +94,19 @@ public abstract class IcebergSourceBenchmark { + } + + protected void setupSpark(boolean enableDictionaryEncoding) { +- SparkSession.Builder builder = SparkSession.builder().config("spark.ui.enabled", false); ++ SparkSession.Builder builder = ++ SparkSession.builder() ++ .config("spark.ui.enabled", false) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true"); + if (!enableDictionaryEncoding) { + builder + .config("parquet.dictionary.page.size", "1") +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +index 81b7d83a707..eba1a2a0fb1 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +@@ -19,18 +19,22 @@ + package org.apache.iceberg.spark.data.vectorized; + + import java.io.IOException; ++import org.apache.comet.CometConf; + import org.apache.comet.CometSchemaImporter; + import org.apache.comet.parquet.AbstractColumnReader; + import org.apache.comet.parquet.ColumnReader; ++import org.apache.comet.parquet.ParquetColumnSpec; ++import org.apache.comet.parquet.RowGroupReader; + import org.apache.comet.parquet.TypeUtil; + import org.apache.comet.parquet.Utils; + import org.apache.comet.shaded.arrow.memory.RootAllocator; ++import org.apache.iceberg.parquet.CometTypeUtils; + import org.apache.iceberg.parquet.VectorizedReader; + import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + import org.apache.iceberg.spark.SparkSchemaUtil; + import org.apache.iceberg.types.Types; + import org.apache.parquet.column.ColumnDescriptor; +-import org.apache.parquet.column.page.PageReader; ++import org.apache.spark.sql.internal.SQLConf; + import org.apache.spark.sql.types.DataType; + import org.apache.spark.sql.types.Metadata; + import org.apache.spark.sql.types.StructField; +@@ -42,23 +46,28 @@ class CometColumnReader implements VectorizedReader { + + private final ColumnDescriptor descriptor; + private final DataType sparkType; ++ private final int fieldId; + + // The delegated ColumnReader from Comet side + private AbstractColumnReader delegate; + private boolean initialized = false; + private int batchSize = DEFAULT_BATCH_SIZE; + private CometSchemaImporter importer; ++ private ParquetColumnSpec spec; + +- CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) { ++ CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) { + this.sparkType = sparkType; + this.descriptor = descriptor; ++ this.fieldId = fieldId; + } + + CometColumnReader(Types.NestedField field) { + DataType dataType = SparkSchemaUtil.convert(field.type()); + StructField structField = new StructField(field.name(), dataType, false, Metadata.empty()); + this.sparkType = dataType; +- this.descriptor = TypeUtil.convertToParquet(structField); ++ this.descriptor = ++ CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField)); ++ this.fieldId = field.fieldId(); + } + + public AbstractColumnReader delegate() { +@@ -92,7 +101,26 @@ class CometColumnReader implements VectorizedReader { + } + + this.importer = new CometSchemaImporter(new RootAllocator()); +- this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); ++ ++ spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); ++ ++ boolean useLegacyTime = ++ Boolean.parseBoolean( ++ SQLConf.get() ++ .getConfString( ++ CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false")); ++ boolean useLazyMaterialization = ++ Boolean.parseBoolean( ++ SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false")); ++ this.delegate = ++ Utils.getColumnReader( ++ sparkType, ++ spec, ++ importer, ++ batchSize, ++ true, // Comet sets this to true for native execution ++ useLazyMaterialization, ++ useLegacyTime); + this.initialized = true; + } + +@@ -111,9 +139,9 @@ class CometColumnReader implements VectorizedReader { + *

NOTE: this should be called before reading a new Parquet column chunk, and after {@link + * CometColumnReader#reset} is called. + */ +- public void setPageReader(PageReader pageReader) throws IOException { ++ public void setPageReader(RowGroupReader pageStore) throws IOException { + Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first"); +- ((ColumnReader) delegate).setPageReader(pageReader); ++ ((ColumnReader) delegate).setRowGroupReader(pageStore, spec); + } + + @Override +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +index 04ac69476ad..916face2bf2 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +@@ -22,8 +22,12 @@ import java.io.IOException; + import java.io.UncheckedIOException; + import java.util.List; + import java.util.Map; ++import org.apache.comet.CometRuntimeException; + import org.apache.comet.parquet.AbstractColumnReader; +-import org.apache.comet.parquet.BatchReader; ++import org.apache.comet.parquet.IcebergCometBatchReader; ++import org.apache.comet.parquet.RowGroupReader; ++import org.apache.comet.vector.CometSelectionVector; ++import org.apache.comet.vector.CometVector; + import org.apache.iceberg.Schema; + import org.apache.iceberg.data.DeleteFilter; + import org.apache.iceberg.parquet.VectorizedReader; +@@ -55,7 +59,7 @@ class CometColumnarBatchReader implements VectorizedReader { + // calling BatchReader.nextBatch, the isDeleted value is not yet available, so + // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is + // available. +- private final BatchReader delegate; ++ private final IcebergCometBatchReader delegate; + private DeleteFilter deletes = null; + private long rowStartPosInBatch = 0; + +@@ -65,9 +69,7 @@ class CometColumnarBatchReader implements VectorizedReader { + this.hasIsDeletedColumn = + readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader); + +- AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()]; +- this.delegate = new BatchReader(abstractColumnReaders); +- delegate.setSparkSchema(SparkSchemaUtil.convert(schema)); ++ this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema)); + } + + @Override +@@ -79,19 +81,22 @@ class CometColumnarBatchReader implements VectorizedReader { + && !(readers[i] instanceof CometPositionColumnReader) + && !(readers[i] instanceof CometDeleteColumnReader)) { + readers[i].reset(); +- readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor())); ++ readers[i].setPageReader((RowGroupReader) pageStore); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e); + } + } + ++ AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length]; + for (int i = 0; i < readers.length; i++) { +- delegate.getColumnReaders()[i] = this.readers[i].delegate(); ++ delegateReaders[i] = readers[i].delegate(); + } + ++ delegate.init(delegateReaders); ++ + this.rowStartPosInBatch = +- pageStore ++ ((RowGroupReader) pageStore) + .getRowIndexOffset() + .orElseThrow( + () -> +@@ -148,9 +153,17 @@ class CometColumnarBatchReader implements VectorizedReader { + Pair pair = buildRowIdMapping(vectors); + if (pair != null) { + int[] rowIdMapping = pair.first(); +- numLiveRows = pair.second(); +- for (int i = 0; i < vectors.length; i++) { +- vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping); ++ if (pair.second() != null) { ++ numLiveRows = pair.second(); ++ for (int i = 0; i < vectors.length; i++) { ++ if (vectors[i] instanceof CometVector) { ++ vectors[i] = ++ new CometSelectionVector((CometVector) vectors[i], rowIdMapping, numLiveRows); ++ } else { ++ throw new CometRuntimeException( ++ "Unsupported column vector type: " + vectors[i].getClass()); ++ } ++ } + } + } + } +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +index 047c96314b1..88d691a607a 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized; + import java.math.BigDecimal; + import java.nio.ByteBuffer; + import org.apache.comet.parquet.ConstantColumnReader; ++import org.apache.iceberg.parquet.CometTypeUtils; + import org.apache.iceberg.types.Types; + import org.apache.spark.sql.types.DataType; + import org.apache.spark.sql.types.DataTypes; +@@ -34,7 +35,11 @@ class CometConstantColumnReader extends CometColumnReader { + super(field); + // use delegate to set constant value on the native side to be consumed by native execution. + setDelegate( +- new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); ++ new ConstantColumnReader( ++ sparkType(), ++ CometTypeUtils.descriptorToParquetColumnSpec(descriptor()), ++ convertToSparkValue(value), ++ false)); + } + + @Override +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +index 6235bfe4865..cba108e4326 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +@@ -51,10 +51,10 @@ class CometDeleteColumnReader extends CometColumnReader { + DeleteColumnReader() { + super( + DataTypes.BooleanType, +- TypeUtil.convertToParquet( ++ TypeUtil.convertToParquetSpec( + new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), + false /* useDecimal128 = false */, +- false /* isConstant = false */); ++ false /* isConstant */); + this.isDeleted = new boolean[0]; + } + +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +index bcc0e514c28..98e80068c51 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data.vectorized; + + import org.apache.comet.parquet.MetadataColumnReader; + import org.apache.comet.parquet.Native; ++import org.apache.iceberg.parquet.CometTypeUtils; + import org.apache.iceberg.types.Types; + import org.apache.parquet.column.ColumnDescriptor; + import org.apache.spark.sql.types.DataTypes; +@@ -44,7 +45,7 @@ class CometPositionColumnReader extends CometColumnReader { + PositionColumnReader(ColumnDescriptor descriptor) { + super( + DataTypes.LongType, +- descriptor, ++ CometTypeUtils.descriptorToParquetColumnSpec(descriptor), + false /* useDecimal128 = false */, + false /* isConstant = false */); + } +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +index d36f1a72747..56f8c9bff93 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +@@ -142,6 +142,7 @@ class CometVectorizedReaderBuilder extends TypeWithSchemaVisitor extends BaseReader taskGroup = (ScanTaskGroup) task; ++ return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads); ++ ++ } else if (task.isFileScanTask() && !task.isDataTask()) { ++ FileScanTask fileScanTask = task.asFileScanTask(); ++ // Comet can't handle delete files for now ++ return fileScanTask.file().format() == FileFormat.PARQUET; ++ ++ } else { ++ return false; ++ } ++ } ++ + // conditions for using ORC batch reads: + // - ORC vectorization is enabled + // - all tasks are of type FileScanTask and read only ORC files with no delete files +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +index 106b296de09..967b0d41d08 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +@@ -24,6 +24,7 @@ import java.util.Map; + import java.util.Optional; + import java.util.function.Supplier; + import java.util.stream.Collectors; ++import org.apache.comet.parquet.SupportsComet; + import org.apache.iceberg.BlobMetadata; + import org.apache.iceberg.ScanTask; + import org.apache.iceberg.ScanTaskGroup; +@@ -95,7 +96,7 @@ import org.apache.spark.sql.types.StructType; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + +-abstract class SparkScan implements Scan, SupportsReportStatistics { ++abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet { + private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); + private static final String NDV_KEY = "ndv"; + +@@ -351,4 +352,10 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { + return splitSize; + } + } ++ ++ @Override ++ public boolean isCometEnabled() { ++ SparkBatch batch = (SparkBatch) this.toBatch(); ++ return batch.useCometBatchReads(); ++ } + } +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java +index 404ba728460..26d6f9b613f 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java +@@ -90,6 +90,16 @@ public abstract class SparkDistributedDataScanTestBase + .master("local[2]") + .config("spark.serializer", serializer) + .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + } +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java +index 659507e4c5e..eb9cedc34c5 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java +@@ -73,6 +73,16 @@ public class TestSparkDistributedDataScanDeletes + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java +index a218f965ea6..395c02441e7 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java +@@ -62,6 +62,16 @@ public class TestSparkDistributedDataScanFilterFiles + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java +index 2665d7ba8d3..306e859ce1a 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java +@@ -63,6 +63,16 @@ public class TestSparkDistributedDataScanReporting + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +index daf4e29ac07..fc9ee40c502 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +@@ -79,6 +79,17 @@ public abstract class TestBase extends SparkTestHelperBase { + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .config("spark.comet.exec.broadcastExchange.enabled", "false") + .enableHiveSupport() + .getOrCreate(); + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java +index 973a17c9a38..dd0fd5cc9aa 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java +@@ -65,6 +65,16 @@ public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVect + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java +index 1c5905744a7..6db62e1f90d 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java +@@ -61,6 +61,16 @@ public abstract class ScanTestBase extends AvroDataTestBase { + ScanTestBase.spark = + SparkSession.builder() + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .master("local[2]") + .getOrCreate(); + ScanTestBase.sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +index 19ec6d13dd5..bf7c837cf38 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +@@ -144,7 +144,20 @@ public class TestCompressionSettings extends CatalogTestBase { + + @BeforeAll + public static void startSpark() { +- TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestCompressionSettings.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @BeforeEach +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +index a7702b169a6..bbb85f7d5c6 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +@@ -74,7 +74,20 @@ public class TestDataSourceOptions extends TestBaseWithCatalog { + + @BeforeAll + public static void startSpark() { +- TestDataSourceOptions.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestDataSourceOptions.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterAll +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +index fd7d52178f2..929ebd405c5 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +@@ -114,6 +114,16 @@ public class TestFilteredScan { + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java +index 153564f7d12..761c20f5d80 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java +@@ -98,6 +98,16 @@ public class TestForwardCompatibility { + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java +index f4f57157e47..d1a7cc64179 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java +@@ -51,6 +51,16 @@ public class TestIcebergSpark { + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +index e1402396fa7..ca4212f52e6 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +@@ -118,6 +118,16 @@ public class TestPartitionPruning { + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + TestPartitionPruning.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java +index 0b6ab2052b6..a8176332fb7 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java +@@ -112,6 +112,16 @@ public class TestPartitionValues { + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +index 11865db7fce..8fe32e8300c 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +@@ -91,6 +91,16 @@ public class TestSnapshotSelection { + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java +index 3051e27d720..6c39f76c286 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java +@@ -125,6 +125,16 @@ public class TestSparkDataFile { + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + TestSparkDataFile.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + } +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +index 4ccbf86f125..40cff1f69a7 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +@@ -100,6 +100,16 @@ public class TestSparkDataWrite { + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +@@ -144,7 +154,7 @@ public class TestSparkDataWrite { + Dataset result = spark.read().format("iceberg").load(targetLocation); + + List actual = +- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); ++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + for (ManifestFile manifest : + SnapshotUtil.latestSnapshot(table, branch).allManifests(table.io())) { +@@ -213,7 +223,7 @@ public class TestSparkDataWrite { + Dataset result = spark.read().format("iceberg").load(targetLocation); + + List actual = +- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); ++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + } + +@@ -258,7 +268,7 @@ public class TestSparkDataWrite { + Dataset result = spark.read().format("iceberg").load(targetLocation); + + List actual = +- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); ++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + } + +@@ -310,7 +320,7 @@ public class TestSparkDataWrite { + Dataset result = spark.read().format("iceberg").load(targetLocation); + + List actual = +- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); ++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + } + +@@ -352,7 +362,7 @@ public class TestSparkDataWrite { + Dataset result = spark.read().format("iceberg").load(targetLocation); + + List actual = +- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); ++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + } + +@@ -391,7 +401,7 @@ public class TestSparkDataWrite { + Dataset result = spark.read().format("iceberg").load(targetLocation); + + List actual = +- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); ++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + + List files = Lists.newArrayList(); +@@ -459,7 +469,7 @@ public class TestSparkDataWrite { + Dataset result = spark.read().format("iceberg").load(targetLocation); + + List actual = +- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); ++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected); + } + +@@ -706,7 +716,7 @@ public class TestSparkDataWrite { + // Since write and commit succeeded, the rows should be readable + Dataset result = spark.read().format("iceberg").load(targetLocation); + List actual = +- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); ++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + assertThat(actual) + .hasSize(records.size() + records2.size()) + .containsExactlyInAnyOrder( +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +index 596d05d30b5..dc8563314c7 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +@@ -88,6 +88,16 @@ public class TestSparkReadProjection extends TestReadProjection { + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + ImmutableMap config = + ImmutableMap.of( +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +index 42699f46623..058c2d79b62 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +@@ -138,6 +138,16 @@ public class TestSparkReaderDeletes extends DeleteReadTests { + .config("spark.ui.liveUpdate.period", 0) + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .enableHiveSupport() + .getOrCreate(); + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java +index baf7fa8f88a..509c5deba51 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java +@@ -182,6 +182,16 @@ public class TestSparkReaderWithBloomFilter { + SparkSession.builder() + .master("local[2]") + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .enableHiveSupport() + .getOrCreate(); + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java +index 54048bbf218..b1a2ca92098 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java +@@ -69,6 +69,16 @@ public class TestStructuredStreaming { + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) + .config("spark.sql.shuffle.partitions", 4) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +index 8b1e3fbfc77..74936e2487e 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +@@ -75,7 +75,20 @@ public class TestTimestampWithoutZone extends TestBase { + + @BeforeAll + public static void startSpark() { +- TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestTimestampWithoutZone.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterAll +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java +index c3fac70dd3f..b7f2431c119 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java +@@ -84,6 +84,16 @@ public class TestWriteMetricsConfig { + SparkSession.builder() + .master("local[2]") + .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + TestWriteMetricsConfig.sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + } +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java +index 5ce56b4feca..0def2a156d4 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java +@@ -63,6 +63,16 @@ public class TestAggregatePushDown extends CatalogTestBase { + SparkSession.builder() + .master("local[2]") + .config("spark.sql.iceberg.aggregate_pushdown", "true") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config( ++ "spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .enableHiveSupport() + .getOrCreate(); + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java +index 9d2ce2b388a..5e233688488 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java +@@ -598,9 +598,7 @@ public class TestFilterPushDown extends TestBaseWithCatalog { + String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", ""); + + if (sparkFilter != null) { +- assertThat(planAsString) +- .as("Post scan filter should match") +- .contains("Filter (" + sparkFilter + ")"); ++ assertThat(planAsString).as("Post scan filter should match").contains("CometFilter"); + } else { + assertThat(planAsString).as("Should be no post scan filter").doesNotContain("Filter ("); + }