Skip to content

Commit 2f59458

Browse files
authored
[spark] Adding support for Iceberg compatibility options to be passed as table properties with dataframe APIs (#6803)
1 parent 3acf89b commit 2f59458

File tree

6 files changed

+102
-12
lines changed

6 files changed

+102
-12
lines changed

paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import org.apache.paimon.options.description.TextElement;
2727
import org.apache.paimon.utils.Preconditions;
2828

29+
import java.lang.reflect.Field;
30+
import java.util.ArrayList;
2931
import java.util.HashMap;
32+
import java.util.List;
3033
import java.util.Map;
3134

3235
import static org.apache.paimon.options.ConfigOptions.key;
@@ -255,4 +258,24 @@ public InlineElement getDescription() {
255258
return TextElement.text(description);
256259
}
257260
}
261+
262+
/**
263+
* Returns all ConfigOption fields defined in this class. This method uses reflection to
264+
* dynamically discover all ConfigOption fields, ensuring that new options are automatically
265+
* included without code changes.
266+
*/
267+
public static List<ConfigOption<?>> getOptions() {
268+
final Field[] fields = IcebergOptions.class.getFields();
269+
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
270+
for (Field field : fields) {
271+
if (ConfigOption.class.isAssignableFrom(field.getType())) {
272+
try {
273+
list.add((ConfigOption<?>) field.get(IcebergOptions.class));
274+
} catch (IllegalAccessException e) {
275+
throw new RuntimeException(e);
276+
}
277+
}
278+
}
279+
return list;
280+
}
258281
}

paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.spark.sql.execution.shim
2020

2121
import org.apache.paimon.CoreOptions
22+
import org.apache.paimon.iceberg.IcebergOptions
2223
import org.apache.paimon.spark.catalog.FormatTableCatalog
2324

2425
import org.apache.spark.sql.{SparkSession, Strategy}
@@ -39,10 +40,16 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends Strate
3940
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
4041
case _ =>
4142
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq
42-
val (coreOptions, writeOptions) = options.partition {
43-
case (key, _) => coreOptionKeys.contains(key)
43+
44+
// Include Iceberg compatibility options in table properties (fix for DataFrame writer options)
45+
val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq
46+
47+
val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
48+
49+
val (tableOptions, writeOptions) = options.partition {
50+
case (key, _) => allTableOptionKeys.contains(key)
4451
}
45-
val newProps = CatalogV2Util.withDefaultOwnership(props) ++ coreOptions
52+
val newProps = CatalogV2Util.withDefaultOwnership(props) ++ tableOptions
4653

4754
val isPartitionedFormatTable = {
4855
catalog match {

paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.spark.sql.execution.shim
2020

2121
import org.apache.paimon.CoreOptions
22+
import org.apache.paimon.iceberg.IcebergOptions
2223
import org.apache.paimon.spark.SparkCatalog
2324
import org.apache.paimon.spark.catalog.FormatTableCatalog
2425

@@ -51,10 +52,16 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
5152
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
5253
case _ =>
5354
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq
54-
val (coreOptions, writeOptions) = options.partition {
55-
case (key, _) => coreOptionKeys.contains(key)
55+
56+
// Include Iceberg compatibility options in table properties (fix for DataFrame writer options)
57+
val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq
58+
59+
val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
60+
61+
val (tableOptions, writeOptions) = options.partition {
62+
case (key, _) => allTableOptionKeys.contains(key)
5663
}
57-
val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ coreOptions)
64+
val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions)
5865

5966
val isPartitionedFormatTable = {
6067
catalog match {

paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.spark.sql.execution.shim
2020

2121
import org.apache.paimon.CoreOptions
22+
import org.apache.paimon.iceberg.IcebergOptions
2223
import org.apache.paimon.spark.SparkCatalog
2324
import org.apache.paimon.spark.catalog.FormatTableCatalog
2425

@@ -53,10 +54,16 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
5354
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
5455
case _ =>
5556
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq
56-
val (coreOptions, writeOptions) = options.partition {
57-
case (key, _) => coreOptionKeys.contains(key)
57+
58+
// Include Iceberg compatibility options in table properties (fix for DataFrame writer options)
59+
val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq
60+
61+
val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
62+
63+
val (tableOptions, writeOptions) = options.partition {
64+
case (key, _) => allTableOptionKeys.contains(key)
5865
}
59-
val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ coreOptions)
66+
val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions)
6067

6168
val isPartitionedFormatTable = {
6269
catalog match {

paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.spark.sql.execution.shim
2020

2121
import org.apache.paimon.CoreOptions
22+
import org.apache.paimon.iceberg.IcebergOptions
2223
import org.apache.paimon.spark.SparkCatalog
2324
import org.apache.paimon.spark.catalog.FormatTableCatalog
2425

@@ -51,10 +52,16 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
5152
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
5253
case _ =>
5354
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq
54-
val (coreOptions, writeOptions) = options.partition {
55-
case (key, _) => coreOptionKeys.contains(key)
55+
56+
// Include Iceberg compatibility options in table properties (fix for DataFrame writer options)
57+
val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq
58+
59+
val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
60+
61+
val (tableOptions, writeOptions) = options.partition {
62+
case (key, _) => allTableOptionKeys.contains(key)
5663
}
57-
val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ coreOptions)
64+
val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions)
5865

5966
val isPartitionedFormatTable = {
6067
catalog match {

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,45 @@ class PaimonOptionTest extends PaimonSparkTestBase {
5555
}
5656
}
5757

58+
test("Paimon Option: create table with Iceberg compatibility options via DataFrame writer") {
59+
Seq((1L, "x1"), (2L, "x2"))
60+
.toDF("a", "b")
61+
.write
62+
.format("paimon")
63+
.option("primary-key", "a")
64+
.option("bucket", "-1")
65+
.option("metadata.iceberg.database", "db_t")
66+
.option("metadata.iceberg.table", "t_ib")
67+
.option("metadata.iceberg.storage", "hadoop-catalog")
68+
.option("metadata.iceberg.storage-location", "table-location")
69+
.option("metadata.iceberg.manifest-legacy-version", "true")
70+
.option("metadata.iceberg.manifest-compression", "snappy")
71+
.option("metadata.iceberg.previous-versions-max", "5")
72+
.option("metadata.iceberg.uri", "")
73+
.saveAsTable("T_IB")
74+
75+
val table = loadTable("T_IB")
76+
77+
// Verify primary key is also stored (existing functionality still works)
78+
Assertions.assertEquals(1, table.primaryKeys().size())
79+
Assertions.assertEquals("a", table.primaryKeys().get(0))
80+
81+
// Verify bucket configuration
82+
Assertions.assertEquals("-1", table.options().get("bucket"))
83+
84+
// Verify Iceberg compatibility options are stored permanently
85+
Assertions.assertEquals("db_t", table.options().get("metadata.iceberg.database"))
86+
Assertions.assertEquals("t_ib", table.options().get("metadata.iceberg.table"))
87+
Assertions.assertEquals("hadoop-catalog", table.options().get("metadata.iceberg.storage"))
88+
Assertions.assertEquals(
89+
"table-location",
90+
table.options().get("metadata.iceberg.storage-location"))
91+
Assertions.assertEquals("true", table.options().get("metadata.iceberg.manifest-legacy-version"))
92+
Assertions.assertEquals("snappy", table.options().get("metadata.iceberg.manifest-compression"))
93+
Assertions.assertEquals("5", table.options().get("metadata.iceberg.previous-versions-max"))
94+
Assertions.assertEquals("", table.options().get("metadata.iceberg.uri"))
95+
}
96+
5897
test("Paimon Option: query table with sql conf") {
5998
sql("CREATE TABLE T (id INT)")
6099
sql("INSERT INTO T VALUES 1")

0 commit comments

Comments
 (0)