Skip to content

Commit c0d6d42

Browse files
Spark: Change delete file granularity to file in Spark 3.5 (apache#11478)
1 parent 4d35682 commit c0d6d42

File tree

7 files changed

+33
-23
lines changed

7 files changed

+33
-23
lines changed

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.iceberg.Table;
6464
import org.apache.iceberg.TableProperties;
6565
import org.apache.iceberg.data.GenericRecord;
66+
import org.apache.iceberg.deletes.DeleteGranularity;
6667
import org.apache.iceberg.exceptions.ValidationException;
6768
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
6869
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -151,12 +152,15 @@ public void testCoalesceDelete() throws Exception {
151152

152153
// set the open file cost large enough to produce a separate scan task per file
153154
// use range distribution to trigger a shuffle
155+
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
154156
Map<String, String> tableProps =
155157
ImmutableMap.of(
156158
SPLIT_OPEN_FILE_COST,
157159
String.valueOf(Integer.MAX_VALUE),
158160
DELETE_DISTRIBUTION_MODE,
159-
DistributionMode.RANGE.modeName());
161+
DistributionMode.RANGE.modeName(),
162+
TableProperties.DELETE_GRANULARITY,
163+
DeleteGranularity.PARTITION.toString());
160164
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
161165

162166
createBranchIfNeeded();
@@ -1306,10 +1310,8 @@ public void testDeleteWithMultipleSpecs() {
13061310
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
13071311
if (mode(table) == COPY_ON_WRITE) {
13081312
validateCopyOnWrite(currentSnapshot, "3", "4", "1");
1309-
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
1310-
validateMergeOnRead(currentSnapshot, "3", "4", null);
13111313
} else {
1312-
validateMergeOnRead(currentSnapshot, "3", "3", null);
1314+
validateMergeOnRead(currentSnapshot, "3", "4", null);
13131315
}
13141316

13151317
assertEquals(

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iceberg.spark.extensions;
2020

2121
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
22+
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
2223
import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
2324
import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
2425
import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE;
@@ -56,6 +57,7 @@
5657
import org.apache.iceberg.Table;
5758
import org.apache.iceberg.TableProperties;
5859
import org.apache.iceberg.data.GenericRecord;
60+
import org.apache.iceberg.deletes.DeleteGranularity;
5961
import org.apache.iceberg.exceptions.ValidationException;
6062
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
6163
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -233,7 +235,6 @@ public void testMergeWithVectorizedReads() {
233235

234236
@TestTemplate
235237
public void testCoalesceMerge() {
236-
assumeThat(formatVersion).isLessThan(3);
237238
createAndInitTable("id INT, salary INT, dep STRING");
238239

239240
String[] records = new String[100];
@@ -252,7 +253,9 @@ public void testCoalesceMerge() {
252253
SPLIT_OPEN_FILE_COST,
253254
String.valueOf(Integer.MAX_VALUE),
254255
MERGE_DISTRIBUTION_MODE,
255-
DistributionMode.NONE.modeName());
256+
DistributionMode.NONE.modeName(),
257+
TableProperties.DELETE_GRANULARITY,
258+
DeleteGranularity.PARTITION.toString());
256259
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
257260

258261
createBranchIfNeeded();
@@ -295,6 +298,9 @@ public void testCoalesceMerge() {
295298
// AQE detects that all shuffle blocks are small and processes them in 1 task
296299
// otherwise, there would be 200 tasks writing to the table
297300
validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "1");
301+
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
302+
validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
303+
validateProperty(currentSnapshot, SnapshotSummary.ADDED_DVS_PROP, "4");
298304
} else {
299305
// MoR MERGE would perform a join on `id`
300306
// every task has data for each of 200 reducers

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private void createTable(boolean partitioned) throws Exception {
4949
String partitionStmt = partitioned ? "PARTITIONED BY (id)" : "";
5050
sql(
5151
"CREATE TABLE %s (id bigint, data string) USING iceberg %s TBLPROPERTIES"
52-
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
52+
+ "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.delete.granularity'='partition')",
5353
tableName, partitionStmt);
5454

5555
List<SimpleRecord> records =

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.iceberg.Table;
5959
import org.apache.iceberg.TableProperties;
6060
import org.apache.iceberg.data.GenericRecord;
61+
import org.apache.iceberg.deletes.DeleteGranularity;
6162
import org.apache.iceberg.exceptions.ValidationException;
6263
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
6364
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -131,12 +132,15 @@ public void testCoalesceUpdate() {
131132

132133
// set the open file cost large enough to produce a separate scan task per file
133134
// use range distribution to trigger a shuffle
135+
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
134136
Map<String, String> tableProps =
135137
ImmutableMap.of(
136138
SPLIT_OPEN_FILE_COST,
137139
String.valueOf(Integer.MAX_VALUE),
138140
UPDATE_DISTRIBUTION_MODE,
139-
DistributionMode.RANGE.modeName());
141+
DistributionMode.RANGE.modeName(),
142+
TableProperties.DELETE_GRANULARITY,
143+
DeleteGranularity.PARTITION.toString());
140144
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
141145

142146
createBranchIfNeeded();
@@ -440,10 +444,8 @@ public void testUpdateWithoutCondition() {
440444
validateProperty(currentSnapshot, CHANGED_PARTITION_COUNT_PROP, "2");
441445
validateProperty(currentSnapshot, DELETED_FILES_PROP, "3");
442446
validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3"));
443-
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
444-
validateMergeOnRead(currentSnapshot, "2", "3", "2");
445447
} else {
446-
validateMergeOnRead(currentSnapshot, "2", "2", "2");
448+
validateMergeOnRead(currentSnapshot, "2", "3", "2");
447449
}
448450

449451
assertEquals(

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -714,14 +714,12 @@ private double shuffleCompressionRatio(FileFormat outputFileFormat, String outpu
714714
}
715715

716716
public DeleteGranularity deleteGranularity() {
717-
String valueAsString =
718-
confParser
719-
.stringConf()
720-
.option(SparkWriteOptions.DELETE_GRANULARITY)
721-
.tableProperty(TableProperties.DELETE_GRANULARITY)
722-
.defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT)
723-
.parse();
724-
return DeleteGranularity.fromString(valueAsString);
717+
return confParser
718+
.enumConf(DeleteGranularity::fromString)
719+
.option(SparkWriteOptions.DELETE_GRANULARITY)
720+
.tableProperty(TableProperties.DELETE_GRANULARITY)
721+
.defaultValue(DeleteGranularity.FILE)
722+
.parse();
725723
}
726724

727725
public boolean useDVs() {

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void testDeleteGranularityDefault() {
142142
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
143143

144144
DeleteGranularity value = writeConf.deleteGranularity();
145-
assertThat(value).isEqualTo(DeleteGranularity.PARTITION);
145+
assertThat(value).isEqualTo(DeleteGranularity.FILE);
146146
}
147147

148148
@TestTemplate
@@ -151,13 +151,13 @@ public void testDeleteGranularityTableProperty() {
151151

152152
table
153153
.updateProperties()
154-
.set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.FILE.toString())
154+
.set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.PARTITION.toString())
155155
.commit();
156156

157157
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
158158

159159
DeleteGranularity value = writeConf.deleteGranularity();
160-
assertThat(value).isEqualTo(DeleteGranularity.FILE);
160+
assertThat(value).isEqualTo(DeleteGranularity.PARTITION);
161161
}
162162

163163
@TestTemplate

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,9 @@ private Map<String, String> tableProperties() {
757757
TableProperties.FORMAT_VERSION,
758758
"2",
759759
TableProperties.DEFAULT_FILE_FORMAT,
760-
format.toString());
760+
format.toString(),
761+
TableProperties.DELETE_GRANULARITY,
762+
DeleteGranularity.PARTITION.toString());
761763
}
762764

763765
private void writeRecords(Table table, int files, int numRecords) {

0 commit comments

Comments
 (0)