diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index f8cb46ce4f..500546e641 100644
--- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -68,7 +68,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
// for consistent behavior
SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, DataTypes.INT());
SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
- SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+ SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_LTZ_MILLIS());
}
private final Catalog paimonCatalog;
diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
index 0a050f362f..94580df179 100644
--- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
+++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
@@ -25,12 +25,18 @@
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
/** Utils to verify whether the existing Paimon table is compatible with the table to be created. */
public class PaimonTableValidation {
@@ -61,6 +67,13 @@ public static void validatePaimonSchemaCompatible(
existingOptions.entrySet().removeIf(entry -> !newOptions.containsKey(entry.getKey()));
if (!existingSchema.equals(newSchema)) {
+ // Allow different precisions for __timestamp column for backward compatibility,
+ // old cluster will use precision 6, but new cluster will use precision 3,
+ // we allow such precision difference
+ if (equalIgnoreSystemColumnTimestampPrecision(existingSchema, newSchema)) {
+ return;
+ }
+
throw new TableAlreadyExistException(
String.format(
"The table %s already exists in Paimon catalog, but the table schema is not compatible. "
@@ -70,6 +83,39 @@ public static void validatePaimonSchemaCompatible(
}
}
+ /**
+ * Check if the {@code existingSchema} is compatible with {@code newSchema} by ignoring the
+ * precision difference of the system column {@code __timestamp}.
+ *
+ *
This is crucial for backward compatibility during cluster upgrades or configuration
+ * changes (e.g., transitioning from precision 6 to 3). Without this relaxed check, users would
+ * be unable to re-enable lake synchronization for existing tables if the cluster-wide default
+ * timestamp precision has evolved.
+ *
+ * @param existingSchema the schema currently persisted in the Paimon catalog
+ * @param newSchema the new schema descriptor generated by the current Fluss cluster
+ * @return true if the schemas are identical, disregarding the precision of the system timestamp
+ */
+ private static boolean equalIgnoreSystemColumnTimestampPrecision(
+ Schema existingSchema, Schema newSchema) {
+ List existingFields = new ArrayList<>(existingSchema.fields());
+ DataField systemTimestampField = existingFields.get(existingFields.size() - 1);
+ if (systemTimestampField.name().equals(TIMESTAMP_COLUMN_NAME)
+ && systemTimestampField
+ .type()
+ .equalsIgnoreFieldId(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())) {
+ existingFields.set(
+ existingFields.size() - 1,
+ new DataField(
+ systemTimestampField.id(),
+ systemTimestampField.name(),
+ DataTypes.TIMESTAMP_LTZ_MILLIS(),
+ systemTimestampField.description()));
+ }
+ existingSchema = existingSchema.copy(RowType.of(existingFields.toArray(new DataField[0])));
+ return existingSchema.equals(newSchema);
+ }
+
private static void removeChangeableOptions(Map options) {
options.entrySet()
.removeIf(
diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 9491918b4c..6fb7003223 100644
--- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -168,7 +168,7 @@ void testCreateLakeEnabledTable() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
- org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
@@ -206,7 +206,7 @@ void testCreateLakeEnabledTable() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
- org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
@@ -245,7 +245,7 @@ void testCreateLakeEnabledTable() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
- org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"pk_c1",
@@ -288,7 +288,7 @@ void testCreateLakeEnabledTable() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
- org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
@@ -355,7 +355,7 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
- org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
@@ -456,8 +456,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
.isInstanceOf(LakeTableAlreadyExistException.class)
.hasMessage(
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. "
- + "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
- + "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ + "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
+ + "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ "Please first drop the table in Paimon catalog or use a new table name.");
// create log table with different fields will throw exception
@@ -475,8 +475,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
.isInstanceOf(LakeTableAlreadyExistException.class)
.hasMessage(
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. "
- + "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
- + "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ + "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
+ + "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ "Please first drop the table in Paimon catalog or use a new table name.");
// add an insignificant option to Paimon table will be ok
@@ -615,7 +615,7 @@ void testAlterLakeEnabledLogTable() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
- org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
@@ -711,7 +711,7 @@ void testAlterLakeEnabledTableProperties() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
- org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
@@ -742,7 +742,7 @@ void testAlterLakeEnabledTableProperties() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
- org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
@@ -832,7 +832,7 @@ void testEnableLakeTableAfterAlterTableProperties() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
- org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
@@ -885,7 +885,7 @@ void testAlterLakeEnabledTableSchema() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
- org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
@@ -927,6 +927,50 @@ void testAlterLakeEnabledTableSchema() throws Exception {
assertThat(alteredRowType.getField("c3").description()).isEqualTo("c3 comment");
}
+ @Test
+ void testEnableLakeTableWithLegacySystemTimestampColumn() throws Exception {
+ TablePath tablePath = TablePath.of(DATABASE, "timestamp_precision_compat");
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(Schema.newBuilder().column("c1", DataTypes.INT()).build())
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .build();
+
+ admin.createTable(tablePath, tableDescriptor, false).get();
+
+ Identifier paimonIdentifier = Identifier.create(DATABASE, tablePath.getTableName());
+
+ // alter to TIMESTAMP_WITH_LOCAL_TIME_ZONE to mock the legacy behavior
+ paimonCatalog.alterTable(
+ paimonIdentifier,
+ SchemaChange.updateColumnType(
+ TIMESTAMP_COLUMN_NAME,
+ org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
+ false);
+
+ // disable data lake
+ admin.alterTable(
+ tablePath,
+ Collections.singletonList(
+ TableChange.set(
+ ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false")),
+ false)
+ .get();
+ assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled())
+ .isFalse();
+
+ // enable data lake again, should still enable it
+ admin.alterTable(
+ tablePath,
+ Collections.singletonList(
+ TableChange.set(
+ ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")),
+ false)
+ .get();
+ assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled())
+ .isTrue();
+ }
+
private void verifyPaimonTable(
Table paimonTable,
TableDescriptor flussTable,
diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index cadd2a46ad..56b3f533d5 100644
--- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -438,7 +438,7 @@ private void verifyLogTableRecordsMultiPartition(
// check system columns: __bucket, __offset, __timestamp
assertThat(actualRow.getInt(4)).isEqualTo(expectBucket);
assertThat(actualRow.getLong(5)).isEqualTo(expectRecord.logOffset());
- assertThat(actualRow.getTimestamp(6, 6).getMillisecond())
+ assertThat(actualRow.getTimestamp(6, 3).getMillisecond())
.isEqualTo(expectRecord.timestamp());
}
assertThat(actualRecords.hasNext()).isFalse();
@@ -468,7 +468,7 @@ private void verifyLogTableRecordsThreePartition(
// check system columns: __bucket, __offset, __timestamp
assertThat(actualRow.getInt(5)).isEqualTo(expectBucket);
assertThat(actualRow.getLong(6)).isEqualTo(expectRecord.logOffset());
- assertThat(actualRow.getTimestamp(7, 6).getMillisecond())
+ assertThat(actualRow.getTimestamp(7, 3).getMillisecond())
.isEqualTo(expectRecord.timestamp());
}
assertThat(actualRecords.hasNext()).isFalse();
@@ -496,7 +496,7 @@ private void verifyTableRecords(
// check system columns: __bucket, __offset, __timestamp
assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
- assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
+ assertThat(actualRow.getTimestamp(5, 3).getMillisecond())
.isEqualTo(expectRecord.timestamp());
}
assertThat(actualRecords.hasNext()).isFalse();
@@ -814,8 +814,7 @@ private void doCreatePaimonTable(TablePath tablePath, Schema.Builder paimonSchem
throws Exception {
paimonSchemaBuilder.column(BUCKET_COLUMN_NAME, DataTypes.INT());
paimonSchemaBuilder.column(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
- paimonSchemaBuilder.column(
- TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+ paimonSchemaBuilder.column(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_LTZ_MILLIS());
paimonSchemaBuilder.option(
CoreOptions.COMMIT_CALLBACKS.key(),
PaimonLakeCommitter.PaimonCommitCallback.class.getName());