Skip to content

Commit da01e2e

Browse files
authored
[hotfix] use ltz_millis as paimon system column for timestamp datatype (#2320)
1 parent 22b30ce commit da01e2e

File tree

4 files changed

+109
-20
lines changed

4 files changed

+109
-20
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
6868
// for consistent behavior
6969
SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, DataTypes.INT());
7070
SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
71-
SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
71+
SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_LTZ_MILLIS());
7272
}
7373

7474
private final Catalog paimonCatalog;

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,18 @@
2525
import org.apache.paimon.options.ConfigOption;
2626
import org.apache.paimon.schema.Schema;
2727
import org.apache.paimon.table.FileStoreTable;
28+
import org.apache.paimon.types.DataField;
29+
import org.apache.paimon.types.DataTypes;
30+
import org.apache.paimon.types.RowType;
2831

2932
import java.lang.reflect.Field;
33+
import java.util.ArrayList;
3034
import java.util.HashMap;
35+
import java.util.List;
3136
import java.util.Map;
3237

3338
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
39+
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
3440

3541
/** Utils to verify whether the existing Paimon table is compatible with the table to be created. */
3642
public class PaimonTableValidation {
@@ -61,6 +67,13 @@ public static void validatePaimonSchemaCompatible(
6167
existingOptions.entrySet().removeIf(entry -> !newOptions.containsKey(entry.getKey()));
6268

6369
if (!existingSchema.equals(newSchema)) {
70+
// Allow different precisions for __timestamp column for backward compatibility,
71+
// old cluster will use precision 6, but new cluster will use precision 3,
72+
// we allow such precision difference
73+
if (equalIgnoreSystemColumnTimestampPrecision(existingSchema, newSchema)) {
74+
return;
75+
}
76+
6477
throw new TableAlreadyExistException(
6578
String.format(
6679
"The table %s already exists in Paimon catalog, but the table schema is not compatible. "
@@ -70,6 +83,39 @@ public static void validatePaimonSchemaCompatible(
7083
}
7184
}
7285

86+
/**
87+
* Check if the {@code existingSchema} is compatible with {@code newSchema} by ignoring the
88+
* precision difference of the system column {@code __timestamp}.
89+
*
90+
* <p>This is crucial for backward compatibility during cluster upgrades or configuration
91+
* changes (e.g., transitioning from precision 6 to 3). Without this relaxed check, users would
92+
* be unable to re-enable lake synchronization for existing tables if the cluster-wide default
93+
* timestamp precision has evolved.
94+
*
95+
* @param existingSchema the schema currently persisted in the Paimon catalog
96+
* @param newSchema the new schema descriptor generated by the current Fluss cluster
97+
* @return true if the schemas are identical, disregarding the precision of the system timestamp
98+
*/
99+
private static boolean equalIgnoreSystemColumnTimestampPrecision(
100+
Schema existingSchema, Schema newSchema) {
101+
List<DataField> existingFields = new ArrayList<>(existingSchema.fields());
102+
DataField systemTimestampField = existingFields.get(existingFields.size() - 1);
103+
if (systemTimestampField.name().equals(TIMESTAMP_COLUMN_NAME)
104+
&& systemTimestampField
105+
.type()
106+
.equalsIgnoreFieldId(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())) {
107+
existingFields.set(
108+
existingFields.size() - 1,
109+
new DataField(
110+
systemTimestampField.id(),
111+
systemTimestampField.name(),
112+
DataTypes.TIMESTAMP_LTZ_MILLIS(),
113+
systemTimestampField.description()));
114+
}
115+
existingSchema = existingSchema.copy(RowType.of(existingFields.toArray(new DataField[0])));
116+
return existingSchema.equals(newSchema);
117+
}
118+
73119
private static void removeChangeableOptions(Map<String, String> options) {
74120
options.entrySet()
75121
.removeIf(

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ void testCreateLakeEnabledTable() throws Exception {
168168
// for __bucket, __offset, __timestamp
169169
org.apache.paimon.types.DataTypes.INT(),
170170
org.apache.paimon.types.DataTypes.BIGINT(),
171-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
171+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
172172
},
173173
new String[] {
174174
"log_c1",
@@ -206,7 +206,7 @@ void testCreateLakeEnabledTable() throws Exception {
206206
// for __bucket, __offset, __timestamp
207207
org.apache.paimon.types.DataTypes.INT(),
208208
org.apache.paimon.types.DataTypes.BIGINT(),
209-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
209+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
210210
},
211211
new String[] {
212212
"log_c1",
@@ -245,7 +245,7 @@ void testCreateLakeEnabledTable() throws Exception {
245245
// for __bucket, __offset, __timestamp
246246
org.apache.paimon.types.DataTypes.INT(),
247247
org.apache.paimon.types.DataTypes.BIGINT(),
248-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
248+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
249249
},
250250
new String[] {
251251
"pk_c1",
@@ -288,7 +288,7 @@ void testCreateLakeEnabledTable() throws Exception {
288288
// for __bucket, __offset, __timestamp
289289
org.apache.paimon.types.DataTypes.INT(),
290290
org.apache.paimon.types.DataTypes.BIGINT(),
291-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
291+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
292292
},
293293
new String[] {
294294
"c1",
@@ -355,7 +355,7 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception {
355355
// for __bucket, __offset, __timestamp
356356
org.apache.paimon.types.DataTypes.INT(),
357357
org.apache.paimon.types.DataTypes.BIGINT(),
358-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
358+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
359359
},
360360
new String[] {
361361
"log_c1",
@@ -456,8 +456,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
456456
.isInstanceOf(LakeTableAlreadyExistException.class)
457457
.hasMessage(
458458
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. "
459-
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
460-
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
459+
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
460+
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
461461
+ "Please first drop the table in Paimon catalog or use a new table name.");
462462

463463
// create log table with different fields will throw exception
@@ -475,8 +475,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
475475
.isInstanceOf(LakeTableAlreadyExistException.class)
476476
.hasMessage(
477477
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. "
478-
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
479-
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
478+
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
479+
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
480480
+ "Please first drop the table in Paimon catalog or use a new table name.");
481481

482482
// add an insignificant option to Paimon table will be ok
@@ -615,7 +615,7 @@ void testAlterLakeEnabledLogTable() throws Exception {
615615
// for __bucket, __offset, __timestamp
616616
org.apache.paimon.types.DataTypes.INT(),
617617
org.apache.paimon.types.DataTypes.BIGINT(),
618-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
618+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
619619
},
620620
new String[] {
621621
"log_c1",
@@ -711,7 +711,7 @@ void testAlterLakeEnabledTableProperties() throws Exception {
711711
// for __bucket, __offset, __timestamp
712712
org.apache.paimon.types.DataTypes.INT(),
713713
org.apache.paimon.types.DataTypes.BIGINT(),
714-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
714+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
715715
},
716716
new String[] {
717717
"c1",
@@ -742,7 +742,7 @@ void testAlterLakeEnabledTableProperties() throws Exception {
742742
// for __bucket, __offset, __timestamp
743743
org.apache.paimon.types.DataTypes.INT(),
744744
org.apache.paimon.types.DataTypes.BIGINT(),
745-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
745+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
746746
},
747747
new String[] {
748748
"c1",
@@ -832,7 +832,7 @@ void testEnableLakeTableAfterAlterTableProperties() throws Exception {
832832
// for __bucket, __offset, __timestamp
833833
org.apache.paimon.types.DataTypes.INT(),
834834
org.apache.paimon.types.DataTypes.BIGINT(),
835-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
835+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
836836
},
837837
new String[] {
838838
"c1",
@@ -885,7 +885,7 @@ void testAlterLakeEnabledTableSchema() throws Exception {
885885
// for __bucket, __offset, __timestamp
886886
org.apache.paimon.types.DataTypes.INT(),
887887
org.apache.paimon.types.DataTypes.BIGINT(),
888-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
888+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
889889
},
890890
new String[] {
891891
"c1",
@@ -927,6 +927,50 @@ void testAlterLakeEnabledTableSchema() throws Exception {
927927
assertThat(alteredRowType.getField("c3").description()).isEqualTo("c3 comment");
928928
}
929929

930+
@Test
931+
void testEnableLakeTableWithLegacySystemTimestampColumn() throws Exception {
932+
TablePath tablePath = TablePath.of(DATABASE, "timestamp_precision_compat");
933+
TableDescriptor tableDescriptor =
934+
TableDescriptor.builder()
935+
.schema(Schema.newBuilder().column("c1", DataTypes.INT()).build())
936+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
937+
.build();
938+
939+
admin.createTable(tablePath, tableDescriptor, false).get();
940+
941+
Identifier paimonIdentifier = Identifier.create(DATABASE, tablePath.getTableName());
942+
943+
// alter to TIMESTAMP_WITH_LOCAL_TIME_ZONE to mock the legacy behavior
944+
paimonCatalog.alterTable(
945+
paimonIdentifier,
946+
SchemaChange.updateColumnType(
947+
TIMESTAMP_COLUMN_NAME,
948+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
949+
false);
950+
951+
// disable data lake
952+
admin.alterTable(
953+
tablePath,
954+
Collections.singletonList(
955+
TableChange.set(
956+
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false")),
957+
false)
958+
.get();
959+
assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled())
960+
.isFalse();
961+
962+
// enable data lake again, should still enable it
963+
admin.alterTable(
964+
tablePath,
965+
Collections.singletonList(
966+
TableChange.set(
967+
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")),
968+
false)
969+
.get();
970+
assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled())
971+
.isTrue();
972+
}
973+
930974
private void verifyPaimonTable(
931975
Table paimonTable,
932976
TableDescriptor flussTable,

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ private void verifyLogTableRecordsMultiPartition(
438438
// check system columns: __bucket, __offset, __timestamp
439439
assertThat(actualRow.getInt(4)).isEqualTo(expectBucket);
440440
assertThat(actualRow.getLong(5)).isEqualTo(expectRecord.logOffset());
441-
assertThat(actualRow.getTimestamp(6, 6).getMillisecond())
441+
assertThat(actualRow.getTimestamp(6, 3).getMillisecond())
442442
.isEqualTo(expectRecord.timestamp());
443443
}
444444
assertThat(actualRecords.hasNext()).isFalse();
@@ -468,7 +468,7 @@ private void verifyLogTableRecordsThreePartition(
468468
// check system columns: __bucket, __offset, __timestamp
469469
assertThat(actualRow.getInt(5)).isEqualTo(expectBucket);
470470
assertThat(actualRow.getLong(6)).isEqualTo(expectRecord.logOffset());
471-
assertThat(actualRow.getTimestamp(7, 6).getMillisecond())
471+
assertThat(actualRow.getTimestamp(7, 3).getMillisecond())
472472
.isEqualTo(expectRecord.timestamp());
473473
}
474474
assertThat(actualRecords.hasNext()).isFalse();
@@ -496,7 +496,7 @@ private void verifyTableRecords(
496496
// check system columns: __bucket, __offset, __timestamp
497497
assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
498498
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
499-
assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
499+
assertThat(actualRow.getTimestamp(5, 3).getMillisecond())
500500
.isEqualTo(expectRecord.timestamp());
501501
}
502502
assertThat(actualRecords.hasNext()).isFalse();
@@ -814,8 +814,7 @@ private void doCreatePaimonTable(TablePath tablePath, Schema.Builder paimonSchem
814814
throws Exception {
815815
paimonSchemaBuilder.column(BUCKET_COLUMN_NAME, DataTypes.INT());
816816
paimonSchemaBuilder.column(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
817-
paimonSchemaBuilder.column(
818-
TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
817+
paimonSchemaBuilder.column(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_LTZ_MILLIS());
819818
paimonSchemaBuilder.option(
820819
CoreOptions.COMMIT_CALLBACKS.key(),
821820
PaimonLakeCommitter.PaimonCommitCallback.class.getName());

0 commit comments

Comments
 (0)