Skip to content

Commit 4187dc2

Browse files
jerryshaoFANNG1
andauthored
[HOTFIX][#3705] fix(spark-connector): write data to Iceberg table failed if drop and recreate Iceberg table (#3718)
Co-authored-by: FANNG <xiaojing@datastrato.com> Fix: #3705
1 parent 1e29438 commit 4187dc2

File tree

7 files changed

+34
-14
lines changed

7 files changed

+34
-14
lines changed

docs/spark-connector/spark-catalog-iceberg.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,8 @@ Gravitino spark connector will transform below property names which are defined
132132
| `warehouse` | `warehouse` | (none) | No | Catalog backend warehouse | 0.5.1 |
133133

134134
Gravitino catalog property names with the prefix `spark.bypass.` are passed to Spark Iceberg connector. For example, using `spark.bypass.io-impl` to pass the `io-impl` to the Spark Iceberg connector.
135+
136+
:::info
137+
Iceberg catalog property `cache-enabled` is setting to `false` internally and not allowed to change.
138+
:::
139+

spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,7 @@ public class IcebergPropertiesConstants {
7575
@VisibleForTesting
7676
public static final String ICEBERG_FORMAT_VERSION = IcebergTablePropertiesMetadata.FORMAT_VERSION;
7777

78+
static final String ICEBERG_CATALOG_CACHE_ENABLED = CatalogProperties.CACHE_ENABLED;
79+
7880
private IcebergPropertiesConstants() {}
7981
}

spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ public Map<String, String> toSparkCatalogProperties(Map<String, String> properti
5252
throw new IllegalArgumentException(
5353
"Unsupported Iceberg Catalog backend: " + catalogBackend);
5454
}
55+
56+
all.put(IcebergPropertiesConstants.ICEBERG_CATALOG_CACHE_ENABLED, "FALSE");
57+
5558
return all;
5659
}
5760

spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
1111
import com.datastrato.gravitino.spark.connector.SparkTypeConverter;
1212
import com.datastrato.gravitino.spark.connector.utils.GravitinoTableInfoHelper;
13-
import java.lang.reflect.Field;
1413
import java.util.Map;
1514
import org.apache.iceberg.spark.SparkCatalog;
1615
import org.apache.iceberg.spark.source.SparkTable;
@@ -37,7 +36,10 @@ public SparkIcebergTable(
3736
PropertiesConverter propertiesConverter,
3837
SparkTransformConverter sparkTransformConverter,
3938
SparkTypeConverter sparkTypeConverter) {
40-
super(sparkTable.table(), !isCacheEnabled(sparkCatalog));
39+
super(
40+
sparkTable.table(), true
41+
/** refreshEagerly */
42+
);
4143
this.gravitinoTableInfoHelper =
4244
new GravitinoTableInfoHelper(
4345
true,
@@ -78,14 +80,4 @@ public Transform[] partitioning() {
7880
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
7981
return ((SparkTable) sparkTable).newScanBuilder(options);
8082
}
81-
82-
private static boolean isCacheEnabled(SparkCatalog sparkCatalog) {
83-
try {
84-
Field cacheEnabled = sparkCatalog.getClass().getDeclaredField("cacheEnabled");
85-
cacheEnabled.setAccessible(true);
86-
return cacheEnabled.getBoolean(sparkCatalog);
87-
} catch (NoSuchFieldException | IllegalAccessException e) {
88-
throw new RuntimeException("Failed to get cacheEnabled field from SparkCatalog", e);
89-
}
90-
}
9183
}

spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ void testCatalogPropertiesWithHiveBackend() {
2929
"value1"));
3030
Assertions.assertEquals(
3131
ImmutableMap.of(
32+
IcebergPropertiesConstants.ICEBERG_CATALOG_CACHE_ENABLED,
33+
"FALSE",
3234
IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
3335
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE,
3436
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
@@ -57,6 +59,8 @@ void testCatalogPropertiesWithJdbcBackend() {
5759
"value1"));
5860
Assertions.assertEquals(
5961
ImmutableMap.of(
62+
IcebergPropertiesConstants.ICEBERG_CATALOG_CACHE_ENABLED,
63+
"FALSE",
6064
IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
6165
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_JDBC,
6266
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
@@ -85,6 +89,8 @@ void testCatalogPropertiesWithRestBackend() {
8589
"value1"));
8690
Assertions.assertEquals(
8791
ImmutableMap.of(
92+
IcebergPropertiesConstants.ICEBERG_CATALOG_CACHE_ENABLED,
93+
"FALSE",
8894
IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
8995
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
9096
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,

spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,19 @@ void testTableOptions() {
748748
checkTableReadWrite(tableInfo);
749749
}
750750

751+
@Test
752+
void testDropAndWriteTable() {
753+
String tableName = "drop_then_create_write_table";
754+
755+
dropTableIfExists(tableName);
756+
createSimpleTable(tableName);
757+
checkTableReadWrite(getTableInfo(tableName));
758+
759+
dropTableIfExists(tableName);
760+
createSimpleTable(tableName);
761+
checkTableReadWrite(getTableInfo(tableName));
762+
}
763+
751764
@Test
752765
@EnabledIf("supportsDelete")
753766
void testDeleteOperation() {

spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,6 @@ void testIcebergReservedProperties() throws NoSuchTableException {
333333
"CREATE TABLE %s (id INT NOT NULL COMMENT 'id comment', name STRING COMMENT '', age INT)",
334334
tableName));
335335

336-
SparkIcebergTable sparkIcebergTable = getSparkIcebergTableInstance(tableName);
337-
338336
SparkTableInfo tableInfo = getTableInfo(tableName);
339337
Map<String, String> tableProperties = tableInfo.getTableProperties();
340338
Assertions.assertNotNull(tableProperties);
@@ -360,6 +358,7 @@ void testIcebergReservedProperties() throws NoSuchTableException {
360358
// create a new snapshot
361359
sql(String.format("INSERT INTO %s VALUES(1, '1', 1)", tableName));
362360

361+
SparkIcebergTable sparkIcebergTable = getSparkIcebergTableInstance(tableName);
363362
// set Identifier fields
364363
sparkIcebergTable.table().updateSchema().setIdentifierFields("id").commit();
365364

0 commit comments

Comments
 (0)