Skip to content

Commit a460c93

Browse files
committed
[hotfix] use ltz_millis 3 as system column timestamp datatype
1 parent eb75d55 commit a460c93

File tree

3 files changed

+16
-17
lines changed

3 files changed

+16
-17
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
6868
// for consistent behavior
6969
SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, DataTypes.INT());
7070
SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
71-
SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
71+
SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_LTZ_MILLIS());
7272
}
7373

7474
private final Catalog paimonCatalog;

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ void testCreateLakeEnabledTable() throws Exception {
168168
// for __bucket, __offset, __timestamp
169169
org.apache.paimon.types.DataTypes.INT(),
170170
org.apache.paimon.types.DataTypes.BIGINT(),
171-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
171+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
172172
},
173173
new String[] {
174174
"log_c1",
@@ -206,7 +206,7 @@ void testCreateLakeEnabledTable() throws Exception {
206206
// for __bucket, __offset, __timestamp
207207
org.apache.paimon.types.DataTypes.INT(),
208208
org.apache.paimon.types.DataTypes.BIGINT(),
209-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
209+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
210210
},
211211
new String[] {
212212
"log_c1",
@@ -245,7 +245,7 @@ void testCreateLakeEnabledTable() throws Exception {
245245
// for __bucket, __offset, __timestamp
246246
org.apache.paimon.types.DataTypes.INT(),
247247
org.apache.paimon.types.DataTypes.BIGINT(),
248-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
248+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
249249
},
250250
new String[] {
251251
"pk_c1",
@@ -288,7 +288,7 @@ void testCreateLakeEnabledTable() throws Exception {
288288
// for __bucket, __offset, __timestamp
289289
org.apache.paimon.types.DataTypes.INT(),
290290
org.apache.paimon.types.DataTypes.BIGINT(),
291-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
291+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
292292
},
293293
new String[] {
294294
"c1",
@@ -355,7 +355,7 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception {
355355
// for __bucket, __offset, __timestamp
356356
org.apache.paimon.types.DataTypes.INT(),
357357
org.apache.paimon.types.DataTypes.BIGINT(),
358-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
358+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
359359
},
360360
new String[] {
361361
"log_c1",
@@ -456,8 +456,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
456456
.isInstanceOf(LakeTableAlreadyExistException.class)
457457
.hasMessage(
458458
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. "
459-
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
460-
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
459+
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
460+
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
461461
+ "Please first drop the table in Paimon catalog or use a new table name.");
462462

463463
// create log table with different fields will throw exception
@@ -475,8 +475,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
475475
.isInstanceOf(LakeTableAlreadyExistException.class)
476476
.hasMessage(
477477
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. "
478-
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
479-
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
478+
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
479+
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
480480
+ "Please first drop the table in Paimon catalog or use a new table name.");
481481

482482
// add an insignificant option to Paimon table will be ok
@@ -615,7 +615,7 @@ void testAlterLakeEnabledLogTable() throws Exception {
615615
// for __bucket, __offset, __timestamp
616616
org.apache.paimon.types.DataTypes.INT(),
617617
org.apache.paimon.types.DataTypes.BIGINT(),
618-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
618+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
619619
},
620620
new String[] {
621621
"log_c1",
@@ -711,7 +711,7 @@ void testAlterLakeEnabledTableProperties() throws Exception {
711711
// for __bucket, __offset, __timestamp
712712
org.apache.paimon.types.DataTypes.INT(),
713713
org.apache.paimon.types.DataTypes.BIGINT(),
714-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
714+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
715715
},
716716
new String[] {
717717
"c1",
@@ -742,7 +742,7 @@ void testAlterLakeEnabledTableProperties() throws Exception {
742742
// for __bucket, __offset, __timestamp
743743
org.apache.paimon.types.DataTypes.INT(),
744744
org.apache.paimon.types.DataTypes.BIGINT(),
745-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
745+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
746746
},
747747
new String[] {
748748
"c1",
@@ -832,7 +832,7 @@ void testEnableLakeTableAfterAlterTableProperties() throws Exception {
832832
// for __bucket, __offset, __timestamp
833833
org.apache.paimon.types.DataTypes.INT(),
834834
org.apache.paimon.types.DataTypes.BIGINT(),
835-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
835+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
836836
},
837837
new String[] {
838838
"c1",
@@ -885,7 +885,7 @@ void testAlterLakeEnabledTableSchema() throws Exception {
885885
// for __bucket, __offset, __timestamp
886886
org.apache.paimon.types.DataTypes.INT(),
887887
org.apache.paimon.types.DataTypes.BIGINT(),
888-
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
888+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
889889
},
890890
new String[] {
891891
"c1",

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -814,8 +814,7 @@ private void doCreatePaimonTable(TablePath tablePath, Schema.Builder paimonSchem
814814
throws Exception {
815815
paimonSchemaBuilder.column(BUCKET_COLUMN_NAME, DataTypes.INT());
816816
paimonSchemaBuilder.column(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
817-
paimonSchemaBuilder.column(
818-
TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
817+
paimonSchemaBuilder.column(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_LTZ_MILLIS());
819818
paimonSchemaBuilder.option(
820819
CoreOptions.COMMIT_CALLBACKS.key(),
821820
PaimonLakeCommitter.PaimonCommitCallback.class.getName());

0 commit comments

Comments
 (0)