diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
index e6b510ce6a8..940bbc0acbb 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
flink-cdc-pipeline-connector-paimon
- 0.8.2
+ 0.9.0
2.8.5
2.3.9
3.4.6
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
index 302ba629ac9..388658fe653 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
@@ -69,6 +69,8 @@ public DataSink createDataSink(Context context) {
}
});
Options options = Options.fromMap(catalogOptions);
+ // Avoid using previous table schema.
+ options.setString("cache-enabled", "false");
try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options)) {
Preconditions.checkNotNull(
catalog.listDatabases(), "catalog option of Paimon is invalid.");
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
index 07abb03bf64..03c0be6be15 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
@@ -45,8 +45,8 @@ public PaimonCommitter(Options catalogOptions, String commitUser) {
storeMultiCommitter =
new StoreMultiCommitter(
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions),
- commitUser,
- null);
+ org.apache.paimon.flink.sink.Committer.createContext(
+ commitUser, null, true, false, null));
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
index fb748954209..21b21d50df7 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
@@ -142,6 +142,9 @@ public void withCompactExecutor(ExecutorService compactExecutor) {
write.withCompactExecutor(compactExecutor);
}
+ @Override
+ public void withInsertOnly(boolean b) {}
+
@Override
public SinkRecord write(InternalRow internalRow) throws Exception {
return write.writeAndReturn(internalRow);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
index 9b3b3afb96c..b528f53aacf 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
@@ -151,7 +151,7 @@ public void processElement(StreamRecord streamRecord) throws Exception {
dataChangeEvent,
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
switch (tuple4.f0) {
- case DYNAMIC:
+ case HASH_DYNAMIC:
{
bucket =
tuple4.f2.assign(
@@ -159,18 +159,18 @@ public void processElement(StreamRecord streamRecord) throws Exception {
tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
break;
}
- case FIXED:
+ case HASH_FIXED:
{
tuple4.f1.setRecord(genericRow);
bucket = tuple4.f1.bucket();
break;
}
- case UNAWARE:
+ case BUCKET_UNAWARE:
{
bucket = 0;
break;
}
- case GLOBAL_DYNAMIC:
+ case CROSS_PARTITION:
default:
{
throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 9f3cd806c47..7b362ee0d40 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -87,6 +87,7 @@ private void initialize(String metastore)
}
catalogOptions.setString("metastore", metastore);
catalogOptions.setString("warehouse", warehouse);
+ catalogOptions.setString("cache-enabled", "false");
this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
this.catalog.dropDatabase(TEST_DATABASE, true, true);
}
@@ -206,6 +207,30 @@ public void testApplySchemaChange(String metastore)
catalog.getTable(Identifier.fromString("test.table_with_partition"));
Assertions.assertEquals(tableSchema, tableWithPartition.rowType());
Assertions.assertEquals(Arrays.asList("col1", "dt"), tableWithPartition.primaryKeys());
+ // Create table with upper case.
+ catalogOptions.setString("allow-upper-case", "true");
+ metadataApplier = new PaimonMetadataApplier(catalogOptions);
+ createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.table_with_upper_case"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "COL1",
+ org.apache.flink.cdc.common.types.DataTypes.STRING()
+ .notNull())
+ .physicalColumn(
+ "col2", org.apache.flink.cdc.common.types.DataTypes.INT())
+ .primaryKey("COL1")
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+ tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "COL1", DataTypes.STRING().notNull()),
+ new DataField(1, "col2", DataTypes.INT())));
+ Assertions.assertEquals(
+ tableSchema,
+ catalog.getTable(Identifier.fromString("test.table_with_upper_case")).rowType());
}
@ParameterizedTest
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index 5635dcfd886..0dd60c2f7a4 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -127,7 +127,7 @@ private void initialize(String metastore)
+ "'metastore'='hive', "
+ "'hadoop-conf-dir'='%s', "
+ "'hive-conf-dir'='%s', "
- + "'cache-enabled'='false' "
+ + "'cache-enabled'='false'"
+ ")",
warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR));
} else {