Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
// for consistent behavior
SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, DataTypes.INT());
SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_LTZ_MILLIS());
}

private final Catalog paimonCatalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;

/** Utils to verify whether the existing Paimon table is compatible with the table to be created. */
public class PaimonTableValidation {
Expand Down Expand Up @@ -61,6 +67,13 @@ public static void validatePaimonSchemaCompatible(
existingOptions.entrySet().removeIf(entry -> !newOptions.containsKey(entry.getKey()));

if (!existingSchema.equals(newSchema)) {
// Allow different precisions for __timestamp column for backward compatibility,
// old cluster will use precision 6, but new cluster will use precision 3,
// we allow such precision difference
if (equalIgnoreSystemColumnTimestampPrecision(existingSchema, newSchema)) {
return;
}

throw new TableAlreadyExistException(
String.format(
"The table %s already exists in Paimon catalog, but the table schema is not compatible. "
Expand All @@ -70,6 +83,39 @@ public static void validatePaimonSchemaCompatible(
}
}

/**
* Check if the {@code existingSchema} is compatible with {@code newSchema} by ignoring the
* precision difference of the system column {@code __timestamp}.
*
* <p>This is crucial for backward compatibility during cluster upgrades or configuration
* changes (e.g., transitioning from precision 6 to 3). Without this relaxed check, users would
* be unable to re-enable lake synchronization for existing tables if the cluster-wide default
* timestamp precision has evolved.
*
* @param existingSchema the schema currently persisted in the Paimon catalog
* @param newSchema the new schema descriptor generated by the current Fluss cluster
* @return true if the schemas are identical, disregarding the precision of the system timestamp
*/
private static boolean equalIgnoreSystemColumnTimestampPrecision(
Schema existingSchema, Schema newSchema) {
List<DataField> existingFields = new ArrayList<>(existingSchema.fields());
DataField systemTimestampField = existingFields.get(existingFields.size() - 1);
if (systemTimestampField.name().equals(TIMESTAMP_COLUMN_NAME)
&& systemTimestampField
.type()
.equalsIgnoreFieldId(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())) {
existingFields.set(
existingFields.size() - 1,
new DataField(
systemTimestampField.id(),
systemTimestampField.name(),
DataTypes.TIMESTAMP_LTZ_MILLIS(),
systemTimestampField.description()));
}
existingSchema = existingSchema.copy(RowType.of(existingFields.toArray(new DataField[0])));
return existingSchema.equals(newSchema);
}

private static void removeChangeableOptions(Map<String, String> options) {
options.entrySet()
.removeIf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void testCreateLakeEnabledTable() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
Expand Down Expand Up @@ -206,7 +206,7 @@ void testCreateLakeEnabledTable() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
Expand Down Expand Up @@ -245,7 +245,7 @@ void testCreateLakeEnabledTable() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"pk_c1",
Expand Down Expand Up @@ -288,7 +288,7 @@ void testCreateLakeEnabledTable() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
Expand Down Expand Up @@ -355,7 +355,7 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
Expand Down Expand Up @@ -456,8 +456,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
.isInstanceOf(LakeTableAlreadyExistException.class)
.hasMessage(
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. "
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ "Please first drop the table in Paimon catalog or use a new table name.");

// create log table with different fields will throw exception
Expand All @@ -475,8 +475,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
.isInstanceOf(LakeTableAlreadyExistException.class)
.hasMessage(
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. "
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+ "Please first drop the table in Paimon catalog or use a new table name.");

// add an insignificant option to Paimon table will be ok
Expand Down Expand Up @@ -615,7 +615,7 @@ void testAlterLakeEnabledLogTable() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"log_c1",
Expand Down Expand Up @@ -711,7 +711,7 @@ void testAlterLakeEnabledTableProperties() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
Expand Down Expand Up @@ -742,7 +742,7 @@ void testAlterLakeEnabledTableProperties() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
Expand Down Expand Up @@ -832,7 +832,7 @@ void testEnableLakeTableAfterAlterTableProperties() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
Expand Down Expand Up @@ -885,7 +885,7 @@ void testAlterLakeEnabledTableSchema() throws Exception {
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
},
new String[] {
"c1",
Expand Down Expand Up @@ -927,6 +927,50 @@ void testAlterLakeEnabledTableSchema() throws Exception {
assertThat(alteredRowType.getField("c3").description()).isEqualTo("c3 comment");
}

@Test
void testEnableLakeTableWithLegacySystemTimestampColumn() throws Exception {
TablePath tablePath = TablePath.of(DATABASE, "timestamp_precision_compat");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(Schema.newBuilder().column("c1", DataTypes.INT()).build())
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
.build();

admin.createTable(tablePath, tableDescriptor, false).get();

Identifier paimonIdentifier = Identifier.create(DATABASE, tablePath.getTableName());

// alter to TIMESTAMP_WITH_LOCAL_TIME_ZONE to mock the legacy behavior
paimonCatalog.alterTable(
paimonIdentifier,
SchemaChange.updateColumnType(
TIMESTAMP_COLUMN_NAME,
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
false);

// disable data lake
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.set(
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false")),
false)
.get();
assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled())
.isFalse();

// enable data lake again, should still enable it
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.set(
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")),
false)
.get();
assertThat(admin.getTableInfo(tablePath).get().getTableConfig().isDataLakeEnabled())
.isTrue();
}

private void verifyPaimonTable(
Table paimonTable,
TableDescriptor flussTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ private void verifyLogTableRecordsMultiPartition(
// check system columns: __bucket, __offset, __timestamp
assertThat(actualRow.getInt(4)).isEqualTo(expectBucket);
assertThat(actualRow.getLong(5)).isEqualTo(expectRecord.logOffset());
assertThat(actualRow.getTimestamp(6, 6).getMillisecond())
assertThat(actualRow.getTimestamp(6, 3).getMillisecond())
.isEqualTo(expectRecord.timestamp());
}
assertThat(actualRecords.hasNext()).isFalse();
Expand Down Expand Up @@ -468,7 +468,7 @@ private void verifyLogTableRecordsThreePartition(
// check system columns: __bucket, __offset, __timestamp
assertThat(actualRow.getInt(5)).isEqualTo(expectBucket);
assertThat(actualRow.getLong(6)).isEqualTo(expectRecord.logOffset());
assertThat(actualRow.getTimestamp(7, 6).getMillisecond())
assertThat(actualRow.getTimestamp(7, 3).getMillisecond())
.isEqualTo(expectRecord.timestamp());
}
assertThat(actualRecords.hasNext()).isFalse();
Expand Down Expand Up @@ -496,7 +496,7 @@ private void verifyTableRecords(
// check system columns: __bucket, __offset, __timestamp
assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
assertThat(actualRow.getTimestamp(5, 3).getMillisecond())
.isEqualTo(expectRecord.timestamp());
}
assertThat(actualRecords.hasNext()).isFalse();
Expand Down Expand Up @@ -814,8 +814,7 @@ private void doCreatePaimonTable(TablePath tablePath, Schema.Builder paimonSchem
throws Exception {
paimonSchemaBuilder.column(BUCKET_COLUMN_NAME, DataTypes.INT());
paimonSchemaBuilder.column(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
paimonSchemaBuilder.column(
TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
paimonSchemaBuilder.column(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_LTZ_MILLIS());
paimonSchemaBuilder.option(
CoreOptions.COMMIT_CALLBACKS.key(),
PaimonLakeCommitter.PaimonCommitCallback.class.getName());
Expand Down