Skip to content

Commit bbb77a8

Browse files
author
CI Builder
committed
Cherry pick commits from iceberg-1.5 branch into spark-3.3 version.
1 parent 1753a6d commit bbb77a8

File tree

5 files changed

+247
-4
lines changed

5 files changed

+247
-4
lines changed

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,16 @@ private SparkSQLProperties() {}
6666

6767
// Controls whether to report locality information to Spark while allocating input partitions
6868
public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";
69+
70+
// Controls compression codec for write operations
71+
public static final String COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec";
72+
73+
// Controls compression level for write operations
74+
public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.compression-level";
75+
76+
// Controls compression strategy for write operations
77+
public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.compression-strategy";
78+
79+
// Controls advisory partition size for write operations
80+
public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size";
6981
}

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,18 @@
2121
import static org.apache.iceberg.DistributionMode.HASH;
2222
import static org.apache.iceberg.DistributionMode.NONE;
2323
import static org.apache.iceberg.DistributionMode.RANGE;
24+
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
25+
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
26+
import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION;
27+
import static org.apache.iceberg.TableProperties.DELETE_AVRO_COMPRESSION_LEVEL;
28+
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
29+
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
30+
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
31+
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
32+
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
33+
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
34+
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
35+
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
2436

2537
import java.util.Locale;
2638
import java.util.Map;
@@ -373,4 +385,197 @@ public String branch() {
373385

374386
return branch;
375387
}
388+
389+
public Map<String, String> writeProperties() {
390+
Map<String, String> writeProperties = Maps.newHashMap();
391+
writeProperties.putAll(dataWriteProperties());
392+
writeProperties.putAll(deleteWriteProperties());
393+
return writeProperties;
394+
}
395+
396+
private Map<String, String> dataWriteProperties() {
397+
Map<String, String> writeProperties = Maps.newHashMap();
398+
FileFormat dataFormat = dataFileFormat();
399+
400+
switch (dataFormat) {
401+
case PARQUET:
402+
writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
403+
String parquetCompressionLevel = parquetCompressionLevel();
404+
if (parquetCompressionLevel != null) {
405+
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
406+
}
407+
break;
408+
409+
case AVRO:
410+
writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
411+
String avroCompressionLevel = avroCompressionLevel();
412+
if (avroCompressionLevel != null) {
413+
writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel);
414+
}
415+
break;
416+
417+
case ORC:
418+
writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
419+
writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy());
420+
break;
421+
422+
default:
423+
// skip
424+
}
425+
426+
return writeProperties;
427+
}
428+
429+
private Map<String, String> deleteWriteProperties() {
430+
Map<String, String> writeProperties = Maps.newHashMap();
431+
FileFormat deleteFormat = deleteFileFormat();
432+
433+
switch (deleteFormat) {
434+
case PARQUET:
435+
writeProperties.put(DELETE_PARQUET_COMPRESSION, deleteParquetCompressionCodec());
436+
String deleteParquetCompressionLevel = deleteParquetCompressionLevel();
437+
if (deleteParquetCompressionLevel != null) {
438+
writeProperties.put(DELETE_PARQUET_COMPRESSION_LEVEL, deleteParquetCompressionLevel);
439+
}
440+
break;
441+
442+
case AVRO:
443+
writeProperties.put(DELETE_AVRO_COMPRESSION, deleteAvroCompressionCodec());
444+
String deleteAvroCompressionLevel = deleteAvroCompressionLevel();
445+
if (deleteAvroCompressionLevel != null) {
446+
writeProperties.put(DELETE_AVRO_COMPRESSION_LEVEL, deleteAvroCompressionLevel);
447+
}
448+
break;
449+
450+
case ORC:
451+
writeProperties.put(DELETE_ORC_COMPRESSION, deleteOrcCompressionCodec());
452+
writeProperties.put(DELETE_ORC_COMPRESSION_STRATEGY, deleteOrcCompressionStrategy());
453+
break;
454+
455+
default:
456+
// skip
457+
}
458+
459+
return writeProperties;
460+
}
461+
462+
private String parquetCompressionCodec() {
463+
return confParser
464+
.stringConf()
465+
.option(SparkWriteOptions.COMPRESSION_CODEC)
466+
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
467+
.tableProperty(TableProperties.PARQUET_COMPRESSION)
468+
.defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
469+
.parse();
470+
}
471+
472+
private String deleteParquetCompressionCodec() {
473+
return confParser
474+
.stringConf()
475+
.option(SparkWriteOptions.COMPRESSION_CODEC)
476+
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
477+
.tableProperty(DELETE_PARQUET_COMPRESSION)
478+
.defaultValue(parquetCompressionCodec())
479+
.parse();
480+
}
481+
482+
private String parquetCompressionLevel() {
483+
return confParser
484+
.stringConf()
485+
.option(SparkWriteOptions.COMPRESSION_LEVEL)
486+
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
487+
.tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL)
488+
.defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)
489+
.parseOptional();
490+
}
491+
492+
private String deleteParquetCompressionLevel() {
493+
return confParser
494+
.stringConf()
495+
.option(SparkWriteOptions.COMPRESSION_LEVEL)
496+
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
497+
.tableProperty(DELETE_PARQUET_COMPRESSION_LEVEL)
498+
.defaultValue(parquetCompressionLevel())
499+
.parseOptional();
500+
}
501+
502+
private String avroCompressionCodec() {
503+
return confParser
504+
.stringConf()
505+
.option(SparkWriteOptions.COMPRESSION_CODEC)
506+
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
507+
.tableProperty(TableProperties.AVRO_COMPRESSION)
508+
.defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT)
509+
.parse();
510+
}
511+
512+
private String deleteAvroCompressionCodec() {
513+
return confParser
514+
.stringConf()
515+
.option(SparkWriteOptions.COMPRESSION_CODEC)
516+
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
517+
.tableProperty(DELETE_AVRO_COMPRESSION)
518+
.defaultValue(avroCompressionCodec())
519+
.parse();
520+
}
521+
522+
private String avroCompressionLevel() {
523+
return confParser
524+
.stringConf()
525+
.option(SparkWriteOptions.COMPRESSION_LEVEL)
526+
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
527+
.tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL)
528+
.defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT)
529+
.parseOptional();
530+
}
531+
532+
private String deleteAvroCompressionLevel() {
533+
return confParser
534+
.stringConf()
535+
.option(SparkWriteOptions.COMPRESSION_LEVEL)
536+
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
537+
.tableProperty(DELETE_AVRO_COMPRESSION_LEVEL)
538+
.defaultValue(avroCompressionLevel())
539+
.parseOptional();
540+
}
541+
542+
private String orcCompressionCodec() {
543+
return confParser
544+
.stringConf()
545+
.option(SparkWriteOptions.COMPRESSION_CODEC)
546+
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
547+
.tableProperty(TableProperties.ORC_COMPRESSION)
548+
.defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT)
549+
.parse();
550+
}
551+
552+
private String deleteOrcCompressionCodec() {
553+
return confParser
554+
.stringConf()
555+
.option(SparkWriteOptions.COMPRESSION_CODEC)
556+
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
557+
.tableProperty(DELETE_ORC_COMPRESSION)
558+
.defaultValue(orcCompressionCodec())
559+
.parse();
560+
}
561+
562+
private String orcCompressionStrategy() {
563+
return confParser
564+
.stringConf()
565+
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
566+
.sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
567+
.tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
568+
.defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
569+
.parse();
570+
}
571+
572+
private String deleteOrcCompressionStrategy() {
573+
return confParser
574+
.stringConf()
575+
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
576+
.sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
577+
.tableProperty(DELETE_ORC_COMPRESSION_STRATEGY)
578+
.defaultValue(orcCompressionStrategy())
579+
.parse();
580+
}
376581
}

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,15 @@ private SparkWriteOptions() {}
7979

8080
// Isolation Level for DataFrame calls. Currently supported by overwritePartitions
8181
public static final String ISOLATION_LEVEL = "isolation-level";
82+
83+
// Controls write compress options
84+
public static final String COMPRESSION_CODEC = "compression-codec";
85+
public static final String COMPRESSION_LEVEL = "compression-level";
86+
public static final String COMPRESSION_STRATEGY = "compression-strategy";
87+
88+
// Overrides the advisory partition size
89+
public static final String ADVISORY_PARTITION_SIZE = "advisory-partition-size";
90+
91+
// Overrides the delete granularity
92+
public static final String DELETE_GRANULARITY = "delete-granularity";
8293
}

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.util.Arrays;
2323
import java.util.List;
24+
import java.util.Map;
2425
import org.apache.iceberg.DeleteFile;
2526
import org.apache.iceberg.FileFormat;
2627
import org.apache.iceberg.MetadataColumns;
@@ -74,6 +75,7 @@ public class SparkPositionDeletesRewrite implements Write {
7475
private final String fileSetId;
7576
private final int specId;
7677
private final StructLike partition;
78+
private final Map<String, String> writeProperties;
7779

7880
/**
7981
* Constructs a {@link SparkPositionDeletesRewrite}.
@@ -106,6 +108,7 @@ public class SparkPositionDeletesRewrite implements Write {
106108
this.fileSetId = writeConf.rewrittenFileSetId();
107109
this.specId = specId;
108110
this.partition = partition;
111+
this.writeProperties = writeConf.writeProperties();
109112
}
110113

111114
@Override
@@ -129,7 +132,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
129132
writeSchema,
130133
dsSchema,
131134
specId,
132-
partition);
135+
partition,
136+
writeProperties);
133137
}
134138

135139
@Override
@@ -174,6 +178,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
174178
private final StructType dsSchema;
175179
private final int specId;
176180
private final StructLike partition;
181+
private final Map<String, String> writeProperties;
177182

178183
PositionDeletesWriterFactory(
179184
Broadcast<Table> tableBroadcast,
@@ -183,7 +188,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
183188
Schema writeSchema,
184189
StructType dsSchema,
185190
int specId,
186-
StructLike partition) {
191+
StructLike partition,
192+
Map<String, String> writeProperties) {
187193
this.tableBroadcast = tableBroadcast;
188194
this.queryId = queryId;
189195
this.format = format;
@@ -192,6 +198,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
192198
this.dsSchema = dsSchema;
193199
this.specId = specId;
194200
this.partition = partition;
201+
this.writeProperties = writeProperties;
195202
}
196203

197204
@Override

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
102102
private final Map<String, String> extraSnapshotMetadata;
103103
private final Distribution requiredDistribution;
104104
private final SortOrder[] requiredOrdering;
105+
private final Map<String, String> writeProperties;
105106

106107
private boolean cleanupOnAbort = true;
107108

@@ -130,6 +131,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
130131
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
131132
this.requiredDistribution = requiredDistribution;
132133
this.requiredOrdering = requiredOrdering;
134+
this.writeProperties = writeConf.writeProperties();
133135
}
134136

135137
@Override
@@ -154,7 +156,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
154156
// broadcast the table metadata as the writer factory will be sent to executors
155157
Broadcast<Table> tableBroadcast =
156158
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
157-
return new PositionDeltaWriteFactory(tableBroadcast, command, context);
159+
return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties);
158160
}
159161

160162
@Override
@@ -331,11 +333,17 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory {
331333
private final Broadcast<Table> tableBroadcast;
332334
private final Command command;
333335
private final Context context;
336+
private final Map<String, String> writeProperties;
334337

335-
PositionDeltaWriteFactory(Broadcast<Table> tableBroadcast, Command command, Context context) {
338+
PositionDeltaWriteFactory(
339+
Broadcast<Table> tableBroadcast,
340+
Command command,
341+
Context context,
342+
Map<String, String> writeProperties) {
336343
this.tableBroadcast = tableBroadcast;
337344
this.command = command;
338345
this.context = context;
346+
this.writeProperties = writeProperties;
339347
}
340348

341349
@Override

0 commit comments

Comments
 (0)