diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index a97bffac04..b5a03a1caf 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -530,7 +530,7 @@ Map prepareFetchLogRequests() { assert projection != null; reqForTable .setProjectionPushdownEnabled(true) - .setProjectedFields(projection.getProjectionIdInOrder()); + .setProjectedFields(projection.getProjectionInOrder()); } else { reqForTable.setProjectionPushdownEnabled(false); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index b50bc3622a..ebcae05c51 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -123,7 +123,7 @@ private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo t + tableRowType); } } - return Projection.of(projectedFields, tableInfo.getSchema()); + return Projection.of(projectedFields); } else { return null; } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index a7fb7e6ac0..aaf06592e5 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -71,6 +71,7 @@ import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.ServerTags; +import org.apache.fluss.types.DataTypeChecks; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.BeforeEach; @@ -394,7 +395,12 @@ void testAlterTableColumn() throws Exception { admin.alterTable( tablePath, - Collections.singletonList( + Arrays.asList( + TableChange.addColumn( + "nested_row", + DataTypes.ROW(DataTypes.STRING(), DataTypes.INT()), + "new nested column", + TableChange.ColumnPosition.last()), TableChange.addColumn( "c1", DataTypes.STRING(), @@ -408,23 +414,28 @@ void testAlterTableColumn() throws Exception { .primaryKey("id") .fromColumns( Arrays.asList( + new Schema.Column("id", DataTypes.INT(), "person id", 0), new Schema.Column( - "id", DataTypes.INT(), "person id", (short) 0), + "name", DataTypes.STRING(), "person name", 1), + new Schema.Column("age", DataTypes.INT(), "person age", 2), new Schema.Column( - "name", - DataTypes.STRING(), - "person name", - (short) 1), + "nested_row", + DataTypes.ROW( + DataTypes.FIELD( + "f0", DataTypes.STRING(), 4), + DataTypes.FIELD("f1", DataTypes.INT(), 5)), + "new nested column", + 3), new Schema.Column( - "age", DataTypes.INT(), "person age", (short) 2), - new Schema.Column( - "c1", - DataTypes.STRING(), - "new column c1", - (short) 3))) + "c1", DataTypes.STRING(), "new column c1", 6))) .build(); SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get(); assertThat(schemaInfo).isEqualTo(new SchemaInfo(expectedSchema, 2)); + // test field_id of rowType + assertThat( + DataTypeChecks.equalsWithFieldId( + schemaInfo.getSchema().getRowType(), expectedSchema.getRowType())) + .isTrue(); } @Test diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java index 81463004c3..9f50fe59e4 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java @@ -174,7 +174,7 @@ void testProjection(LogFormat logFormat, byte magic) throws Exception { long fetchOffset = 0L; int bucketId = 0; // records for 0-10. TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId); - Projection projection = Projection.of(new int[] {0, 2}, schema); + Projection projection = Projection.of(new int[] {0, 2}); MemoryLogRecords memoryLogRecords; if (logFormat == LogFormat.ARROW) { memoryLogRecords = genRecordsWithProjection(DATA2, projection, magic); @@ -210,7 +210,7 @@ void testProjection(LogFormat logFormat, byte magic) throws Exception { // test projection reorder. defaultCompletedFetch = makeCompletedFetch( - tb, resultForBucket0, fetchOffset, Projection.of(new int[] {2, 0}, schema)); + tb, resultForBucket0, fetchOffset, Projection.of(new int[] {2, 0})); scanRecords = defaultCompletedFetch.fetchRecords(8); assertThat(scanRecords.size()).isEqualTo(8); for (int i = 0; i < scanRecords.size(); i++) { @@ -427,7 +427,7 @@ private MemoryLogRecords genRecordsWithProjection( DATA2_TABLE_ID, testingSchemaGetter, DEFAULT_COMPRESSION, - projection.getProjectionIdInOrder()); + projection.getProjectionInOrder()); ByteBuffer buffer = toByteBuffer( fileLogProjection diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java index edc47a7ecb..f3022c91b4 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java @@ -226,10 +226,7 @@ void testProjection(String format) throws Exception { createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2, logFormat); RemoteCompletedFetch completedFetch = makeCompletedFetch( - tableBucket, - fileLogRecords, - fetchOffset, - Projection.of(new int[] {0, 2}, schema)); + tableBucket, fileLogRecords, fetchOffset, Projection.of(new int[] {0, 2})); List scanRecords = completedFetch.fetchRecords(8); List expectedObjects = @@ -255,10 +252,7 @@ void testProjection(String format) throws Exception { completedFetch = makeCompletedFetch( - tableBucket, - fileLogRecords, - fetchOffset, - Projection.of(new int[] {2, 0}, schema)); + tableBucket, fileLogRecords, fetchOffset, Projection.of(new int[] {2, 0})); scanRecords = completedFetch.fetchRecords(8); assertThat(scanRecords.size()).isEqualTo(8); for (int i = 0; i < scanRecords.size(); i++) { diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index 63066849b5..21a3a3c93f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -19,9 +19,12 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.annotation.PublicStable; +import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataField; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.types.MapType; +import org.apache.fluss.types.ReassignFieldId; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.EncodingUtils; import org.apache.fluss.utils.StringUtils; @@ -75,7 +78,8 @@ private Schema( @Nullable PrimaryKey primaryKey, int highestFieldId, List autoIncrementColumnNames) { - this.columns = normalizeColumns(columns, primaryKey, autoIncrementColumnNames); + this.columns = + normalizeColumns(columns, primaryKey, autoIncrementColumnNames, highestFieldId); this.primaryKey = primaryKey; this.autoIncrementColumnNames = autoIncrementColumnNames; // pre-create the row type as it is the most frequently used part of the schema @@ -85,7 +89,9 @@ private Schema( .map( column -> new DataField( - column.getName(), column.getDataType())) + column.getName(), + column.getDataType(), + column.columnId)) .collect(Collectors.toList())); this.highestFieldId = highestFieldId; } @@ -248,34 +254,32 @@ public Builder fromSchema(Schema schema) { return this; } - public Builder highestFieldId(int highestFieldId) { - this.highestFieldId = new AtomicInteger(highestFieldId); - return this; - } - - public Builder fromRowType(RowType rowType) { - checkNotNull(rowType, "rowType must not be null."); - final List fieldDataTypes = rowType.getChildren(); - final List fieldNames = rowType.getFieldNames(); - IntStream.range(0, fieldDataTypes.size()) - .forEach(i -> column(fieldNames.get(i), fieldDataTypes.get(i))); - return this; - } - - /** Adopts the given field names and field data types as physical columns of the schema. */ - public Builder fromFields( - List fieldNames, List fieldDataTypes) { - checkNotNull(fieldNames, "Field names must not be null."); - checkNotNull(fieldDataTypes, "Field data types must not be null."); - checkArgument( - fieldNames.size() == fieldDataTypes.size(), - "Field names and field data types must have the same length."); - IntStream.range(0, fieldNames.size()) - .forEach(i -> column(fieldNames.get(i), fieldDataTypes.get(i))); - return this; - } - - /** Adopts all columns from the given list. */ + /** + * Adopts all columns from the given list. + * + *

This method directly uses the columns as-is, preserving their existing column IDs and + * all nested field IDs within their data types (e.g., field IDs in {@link RowType}, {@link + * ArrayType}, {@link MapType}). No field ID reassignment will occur. + * + *

This behavior is different from {@link #column(String, DataType)}, which automatically + * assigns new column IDs and reassigns all nested field IDs to ensure global uniqueness. + * + *

Use this method when: + * + *

    + *
  • Loading existing schema from storage where IDs are already assigned + *
  • Preserving schema identity during schema evolution + *
  • Reconstructing schema from serialized format + *
+ * + *

Note: All input columns must either have column IDs set or none of them should have + * column IDs. Mixed states are not allowed. + * + * @param inputColumns the list of columns to adopt + * @return this builder for fluent API + * @throws IllegalStateException if columns have inconsistent column ID states (some set, + * some not set) + */ public Builder fromColumns(List inputColumns) { boolean nonSetColumnId = inputColumns.stream() @@ -289,9 +293,9 @@ public Builder fromColumns(List inputColumns) { if (allSetColumnId) { columns.addAll(inputColumns); + List allFieldIds = collectAllFieldIds(inputColumns); highestFieldId = - new AtomicInteger( - columns.stream().mapToInt(Column::getColumnId).max().orElse(-1)); + new AtomicInteger(allFieldIds.stream().max(Integer::compareTo).orElse(-1)); } else { // if all columnId is not set, this maybe from old version schema. Just use its // position as columnId. @@ -310,6 +314,56 @@ public Builder fromColumns(List inputColumns) { return this; } + public Builder highestFieldId(int highestFieldId) { + this.highestFieldId = new AtomicInteger(highestFieldId); + return this; + } + + /** + * Adopts the field names and data types from the given {@link RowType} as physical columns + * of the schema. + * + *

This method internally calls {@link #column(String, DataType)} for each field, which + * means: The original field IDs in the RowType will be ignored and replaced with new ones. + * If you need to preserve existing field IDs, use {@link #fromColumns(List)} or {@link + * #fromSchema(Schema)} instead. + * + * @param rowType the row type to adopt fields from + * @return this builder for fluent API + */ + public Builder fromRowType(RowType rowType) { + checkNotNull(rowType, "rowType must not be null."); + final List fieldDataTypes = rowType.getChildren(); + final List fieldNames = rowType.getFieldNames(); + IntStream.range(0, fieldDataTypes.size()) + .forEach(i -> column(fieldNames.get(i), fieldDataTypes.get(i))); + return this; + } + + /** + * Adopts the given field names and field data types as physical columns of the schema. + * + *

This method internally calls {@link #column(String, DataType)} for each field, which + * means: The original field IDs in the RowType will be ignored and replaced with new ones. + * If you need to preserve existing field IDs, use {@link #fromColumns(List)} or {@link + * #fromSchema(Schema)} instead. + * + * @param fieldNames the list of field names + * @param fieldDataTypes the list of field data types + * @return this builder for fluent API + */ + public Builder fromFields( + List fieldNames, List fieldDataTypes) { + checkNotNull(fieldNames, "Field names must not be null."); + checkNotNull(fieldDataTypes, "Field data types must not be null."); + checkArgument( + fieldNames.size() == fieldDataTypes.size(), + "Field names and field data types must have the same length."); + IntStream.range(0, fieldNames.size()) + .forEach(i -> column(fieldNames.get(i), fieldDataTypes.get(i))); + return this; + } + /** * Declares a column that is appended to this schema. * @@ -317,13 +371,23 @@ public Builder fromColumns(List inputColumns) { * and the order of fields in the data. Thus, columns represent the payload that is read * from and written to an external system. * + *

Note: If the data type contains nested types (e.g., {@link RowType}, {@link + * ArrayType}, {@link MapType}), all nested field IDs will be automatically reassigned to + * ensure global uniqueness. This is essential for schema evolution support. If you need to + * preserve existing field IDs, use {@link #fromColumns(List)} or {@link + * #fromSchema(Schema)} instead. + * * @param columnName column name + * @param dataType column data type + * @return this builder for fluent API */ public Builder column(String columnName, DataType dataType) { checkNotNull(columnName, "Column name must not be null."); checkNotNull(dataType, "Data type must not be null."); - columns.add( - new Column(columnName, dataType, null, highestFieldId.incrementAndGet(), null)); + int id = highestFieldId.incrementAndGet(); + // Reassign field id especially for nested types. + DataType reassignDataType = ReassignFieldId.reassign(dataType, highestFieldId); + columns.add(new Column(columnName, reassignDataType, null, id, null)); return this; } @@ -346,13 +410,10 @@ public Builder column(String columnName, DataType dataType, AggFunction aggFunct checkNotNull(dataType, "Data type must not be null."); checkNotNull(aggFunction, "Aggregation function must not be null."); - columns.add( - new Column( - columnName, - dataType, - null, - highestFieldId.incrementAndGet(), - aggFunction)); + int id = highestFieldId.incrementAndGet(); + // Reassign field id especially for nested types. + DataType reassignDataType = ReassignFieldId.reassign(dataType, highestFieldId); + columns.add(new Column(columnName, reassignDataType, null, id, aggFunction)); return this; } @@ -441,17 +502,6 @@ public Builder enableAutoIncrement(String columnName) { /** Returns an instance of an {@link Schema}. */ public Schema build() { - Integer maximumColumnId = - columns.stream().map(Column::getColumnId).max(Integer::compareTo).orElse(0); - - checkState( - columns.isEmpty() || highestFieldId.get() >= maximumColumnId, - "Highest field id must be greater than or equal to the maximum column id."); - - checkState( - columns.stream().map(Column::getColumnId).distinct().count() == columns.size(), - "Column ids must be unique."); - return new Schema(columns, primaryKey, highestFieldId.get(), autoIncrementColumnNames); } } @@ -629,8 +679,10 @@ public int hashCode() { private static List normalizeColumns( List columns, @Nullable PrimaryKey primaryKey, - List autoIncrementColumnNames) { + List autoIncrementColumnNames, + int highestFieldId) { + checkFieldIds(columns, highestFieldId); List columnNames = columns.stream().map(Column::getName).collect(Collectors.toList()); @@ -728,4 +780,74 @@ public static RowType getKeyRowType(Schema schema, int[] keyIndexes) { } return new RowType(keyRowFields); } + + /** + * Validates field IDs in the schema, including both top-level column IDs and nested field IDs. + * + *

This method performs the following checks: + * + *

    + *
  • Ensures all top-level column IDs are unique + *
  • Ensures all field IDs (including nested fields in ROW, ARRAY, MAP types) are globally + * unique + *
  • Verifies that the highest field ID is greater than or equal to all existing field IDs + *
+ * + * @param columns the list of columns to validate + * @param highestFieldId the highest field ID that should be greater than or equal to all field + * IDs + * @throws IllegalStateException if any validation fails + */ + private static void checkFieldIds(List columns, int highestFieldId) { + + // Collect all field IDs (including nested fields) for validation + List allFieldIds = collectAllFieldIds(columns); + + // Validate all field IDs (including nested fields) are unique + long uniqueFieldIdsCount = allFieldIds.stream().distinct().count(); + checkState( + uniqueFieldIdsCount == allFieldIds.size(), + "All field IDs (including nested fields) must be unique. Found %s unique IDs but expected %s.", + uniqueFieldIdsCount, + allFieldIds.size()); + + // Validate the highest field ID is greater than or equal to all field IDs + Integer maximumFieldId = allFieldIds.stream().max(Integer::compareTo).orElse(-1); + checkState( + columns.isEmpty() || highestFieldId >= maximumFieldId, + "Highest field ID (%s) must be greater than or equal to the maximum field ID (%s) including nested fields. Current columns is %s", + highestFieldId, + maximumFieldId, + columns); + } + + /** + * Recursively collects all field IDs from a data type, including nested fields in ROW, ARRAY, + * and MAP types. + */ + private static List collectAllFieldIds(List columns) { + List allFieldIds = new ArrayList<>(); + for (Column column : columns) { + allFieldIds.add(column.getColumnId()); + collectAllFieldIds(column.getDataType(), allFieldIds); + } + return allFieldIds; + } + + private static void collectAllFieldIds(DataType dataType, List fieldIds) { + if (dataType instanceof RowType) { + RowType rowType = (RowType) dataType; + for (DataField field : rowType.getFields()) { + fieldIds.add(field.getFieldId()); + collectAllFieldIds(field.getType(), fieldIds); + } + } else if (dataType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) dataType; + collectAllFieldIds(arrayType.getElementType(), fieldIds); + } else if (dataType instanceof MapType) { + MapType mapType = (MapType) dataType; + collectAllFieldIds(mapType.getKeyType(), fieldIds); + collectAllFieldIds(mapType.getValueType(), fieldIds); + } + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java index 9f227adb12..4717271c24 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java @@ -51,9 +51,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static org.apache.fluss.record.DefaultLogRecordBatch.APPEND_ONLY_FLAG_MASK; import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET; @@ -101,7 +99,7 @@ public class FileLogProjection { private SchemaGetter schemaGetter; private long tableId; private ArrowCompressionInfo compressionInfo; - private int[] selectedFieldIds; + private int[] selectedFieldPositions; public FileLogProjection(ProjectionPushdownCache projectionsCache) { this.projectionsCache = projectionsCache; @@ -117,11 +115,11 @@ public void setCurrentProjection( long tableId, SchemaGetter schemaGetter, ArrowCompressionInfo compressionInfo, - int[] selectedFieldIds) { + int[] selectedFieldPositions) { this.tableId = tableId; this.schemaGetter = schemaGetter; this.compressionInfo = compressionInfo; - this.selectedFieldIds = selectedFieldIds; + this.selectedFieldPositions = selectedFieldPositions; } /** @@ -405,19 +403,17 @@ ByteBuffer getLogHeaderBuffer() { private ProjectionInfo getOrCreateProjectionInfo(short schemaId) { ProjectionInfo cachedProjection = - projectionsCache.getProjectionInfo(tableId, schemaId, selectedFieldIds); + projectionsCache.getProjectionInfo(tableId, schemaId, selectedFieldPositions); if (cachedProjection == null) { - cachedProjection = createProjectionInfo(schemaId, selectedFieldIds); + cachedProjection = createProjectionInfo(schemaId, selectedFieldPositions); projectionsCache.setProjectionInfo( - tableId, schemaId, selectedFieldIds, cachedProjection); + tableId, schemaId, selectedFieldPositions, cachedProjection); } return cachedProjection; } - private ProjectionInfo createProjectionInfo(short schemaId, int[] selectedFieldIds) { + private ProjectionInfo createProjectionInfo(short schemaId, int[] selectedFieldPositions) { org.apache.fluss.metadata.Schema schema = schemaGetter.getSchema(schemaId); - int[] selectedFieldPositions = - selectedFieldPositions(schemaGetter.getSchema(schemaId), selectedFieldIds); RowType rowType = schema.getRowType(); // initialize the projection util information @@ -460,37 +456,6 @@ private ProjectionInfo createProjectionInfo(short schemaId, int[] selectedFieldI selectedFieldPositions); } - int[] selectedFieldPositions(org.apache.fluss.metadata.Schema schema, int[] projectedFields) { - Map columnIdPositions = new HashMap<>(); - List columnIds = schema.getColumnIds(); - for (int i = 0; i < columnIds.size(); i++) { - columnIdPositions.put(columnIds.get(i), i); - } - - int prev = -1; - int[] selectedFieldPositions = new int[projectedFields.length]; - for (int i = 0; i < projectedFields.length; i++) { - int fieldId = projectedFields[i]; - Integer position = columnIdPositions.get(fieldId); - if (position == null) { - throw new InvalidColumnProjectionException( - String.format( - "Projected field id %s is not contained in %s", - fieldId, columnIds)); - } - - selectedFieldPositions[i] = position; - if (position < prev) { - throw new InvalidColumnProjectionException( - "The projection indexes should be in field order, but is " - + Arrays.toString(projectedFields)); - } - - prev = position; - } - return selectedFieldPositions; - } - /** Projection pushdown information for a specific schema and selected fields. */ public static final class ProjectionInfo { final BitSet nodesProjection; diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 6d3ee99af9..2b0f695ecf 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -75,17 +75,14 @@ public static LogRecordReadContext createReadContext( int schemaId = tableInfo.getSchemaId(); if (projection == null) { // set a default dummy projection to simplify code - projection = - Projection.of( - IntStream.range(0, rowType.getFieldCount()).toArray(), - tableInfo.getSchema()); + projection = Projection.of(IntStream.range(0, rowType.getFieldCount()).toArray()); } if (logFormat == LogFormat.ARROW) { if (readFromRemote) { // currently, for remote read, arrow log doesn't support projection pushdown, // so set the rowType as is. - int[] selectedFields = projection.getProjectionPositions(); + int[] selectedFields = projection.getProjection(); return createArrowReadContext( rowType, schemaId, selectedFields, false, schemaGetter); } else { @@ -101,10 +98,10 @@ public static LogRecordReadContext createReadContext( schemaGetter); } } else if (logFormat == LogFormat.INDEXED) { - int[] selectedFields = projection.getProjectionPositions(); + int[] selectedFields = projection.getProjection(); return createIndexedReadContext(rowType, schemaId, selectedFields, schemaGetter); } else if (logFormat == LogFormat.COMPACTED) { - int[] selectedFields = projection.getProjectionPositions(); + int[] selectedFields = projection.getProjection(); return createCompactedRowReadContext(rowType, schemaId, selectedFields); } else { throw new IllegalArgumentException("Unsupported log format: " + logFormat); @@ -145,6 +142,17 @@ public static LogRecordReadContext createArrowReadContext( return createArrowReadContext(rowType, schemaId, selectedFields, false, schemaGetter); } + @VisibleForTesting + public static LogRecordReadContext createArrowReadContext( + RowType rowType, + int schemaId, + SchemaGetter schemaGetter, + boolean projectionPushDowned) { + int[] selectedFields = IntStream.range(0, rowType.getFieldCount()).toArray(); + return createArrowReadContext( + rowType, schemaId, selectedFields, projectionPushDowned, schemaGetter); + } + /** * Creates a LogRecordReadContext for INDEXED log format. * diff --git a/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java index a2d3da1ae9..71f12192b2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java @@ -49,8 +49,9 @@ public ProjectionPushdownCache() { } @Nullable - public ProjectionInfo getProjectionInfo(long tableId, short schemaId, int[] selectedColumnIds) { - ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedColumnIds); + public ProjectionInfo getProjectionInfo( + long tableId, short schemaId, int[] selectedFieldPositions) { + ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedFieldPositions); return projectionCache.getIfPresent(key); } diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataField.java b/fluss-common/src/main/java/org/apache/fluss/types/DataField.java index c4f935680d..296f6215b7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/types/DataField.java +++ b/fluss-common/src/main/java/org/apache/fluss/types/DataField.java @@ -47,14 +47,25 @@ public class DataField implements Serializable { private final @Nullable String description; - public DataField(String name, DataType type, @Nullable String description) { + private final int fieldId; + + public DataField(String name, DataType type, @Nullable String description, int fieldId) { this.name = checkNotNull(name, "Field name must not be null."); this.type = checkNotNull(type, "Field type must not be null."); this.description = description; + this.fieldId = fieldId; + } + + public DataField(String name, DataType type, Integer fieldId) { + this(name, type, null, fieldId); } public DataField(String name, DataType type) { - this(name, type, null); + this(name, type, -1); + } + + public DataField(String name, DataType type, @Nullable String description) { + this(name, type, description, -1); } public String getName() { @@ -65,12 +76,16 @@ public DataType getType() { return type; } + public int getFieldId() { + return fieldId; + } + public Optional getDescription() { return Optional.ofNullable(description); } public DataField copy() { - return new DataField(name, type.copy(), description); + return new DataField(name, type.copy(), description, fieldId); } public String asSummaryString() { @@ -90,6 +105,9 @@ public boolean equals(Object o) { return false; } DataField rowField = (DataField) o; + // ignore field id in equality check, because field id is not part of type definition, + // use RowType# + // we may ignore description too in the future. return name.equals(rowField.name) && type.equals(rowField.type) && Objects.equals(description, rowField.description); diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java index 76e399568c..15ec476a31 100644 --- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java +++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java @@ -57,6 +57,11 @@ public static List getFieldTypes(DataType dataType) { return dataType.accept(FIELD_TYPES_EXTRACTOR); } + /** Checks whether two data types are equal including field ids for row types. */ + public static boolean equalsWithFieldId(DataType original, DataType that) { + return that.accept(new DataTypeEqualsWithFieldId(original)); + } + private DataTypeChecks() { // no instantiation } @@ -155,4 +160,38 @@ public List visit(RowType rowType) { return rowType.getFieldTypes(); } } + + private static class DataTypeEqualsWithFieldId extends DataTypeDefaultVisitor { + private final DataType original; + + private DataTypeEqualsWithFieldId(DataType original) { + this.original = original; + } + + @Override + public Boolean visit(RowType that) { + if (!original.equals(that)) { + return false; + } + + // compare field ids. + List originalFields = ((RowType) original).getFields(); + List thatFields = that.getFields(); + for (int i = 0; i < that.getFieldCount(); i++) { + DataField originalField = originalFields.get(i); + DataField thatField = thatFields.get(i); + if (originalField.getFieldId() != thatField.getFieldId() + || !equalsWithFieldId(originalField.getType(), thatField.getType())) { + return false; + } + } + + return true; + } + + @Override + protected Boolean defaultMethod(DataType that) { + return original.equals(that); + } + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java b/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java index 8993b146f3..d0bc0d6278 100644 --- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java +++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java @@ -301,16 +301,39 @@ public static MapType MAP(DataType keyType, DataType valueType) { return new MapType(keyType, valueType); } - /** Field definition with field name and data type. */ public static DataField FIELD(String name, DataType type) { return new DataField(name, type); } - /** Field definition with field name, data type, and a description. */ + /** + * Creates a field definition with field name, data type, and field ID. + * + * @param name the field name + * @param type the field data type + * @param fieldId the field ID for schema evolution + * @return a new data field without description + */ + public static DataField FIELD(String name, DataType type, int fieldId) { + return new DataField(name, type, fieldId); + } + public static DataField FIELD(String name, DataType type, String description) { return new DataField(name, type, description); } + /** + * Creates a field definition with field name, data type, description, and field ID. + * + * @param name the field name + * @param type the field data type + * @param description the field description + * @param fieldId the field ID for schema evolution + * @return a new data field with all properties + */ + public static DataField FIELD(String name, DataType type, String description, int fieldId) { + return new DataField(name, type, description, fieldId); + } + /** * Data type of a sequence of fields. A field consists of a field name, field type, and an * optional description. The most specific type of a row of a table is a row type. In this case, diff --git a/fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java b/fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java new file mode 100644 index 0000000000..3c095f6dfd --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.types; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** Visitor that recursively reassigns field IDs in nested data types using a provided counter. */ +public class ReassignFieldId extends DataTypeDefaultVisitor { + + private final AtomicInteger highestFieldId; + + public ReassignFieldId(AtomicInteger highestFieldId) { + this.highestFieldId = highestFieldId; + } + + public static DataType reassign(DataType input, AtomicInteger highestFieldId) { + return input.accept(new ReassignFieldId(highestFieldId)); + } + + @Override + public DataType visit(ArrayType arrayType) { + return new ArrayType(arrayType.isNullable(), arrayType.getElementType().accept(this)); + } + + @Override + public DataType visit(MapType mapType) { + return new MapType( + mapType.isNullable(), + mapType.getKeyType().accept(this), + mapType.getValueType().accept(this)); + } + + @Override + public DataType visit(RowType rowType) { + List originalDataFields = rowType.getFields(); + + List dataFields = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + DataField dataField = originalDataFields.get(i); + int id = highestFieldId.incrementAndGet(); + DataType dataType = dataField.getType().accept(this); + dataFields.add( + new DataField( + dataField.getName(), + dataType, + dataField.getDescription().orElse(null), + id)); + } + + return new RowType(rowType.isNullable(), dataFields); + } + + @Override + protected DataType defaultMethod(DataType dataType) { + return dataType; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/types/RowType.java b/fluss-common/src/main/java/org/apache/fluss/types/RowType.java index 090fe96553..fe2cfae051 100644 --- a/fluss-common/src/main/java/org/apache/fluss/types/RowType.java +++ b/fluss-common/src/main/java/org/apache/fluss/types/RowType.java @@ -167,6 +167,11 @@ public boolean equals(Object o) { return fields.equals(rowType.fields); } + /** Checks whether two data types are equal including field ids for row types. */ + public boolean equalsWithFieldId(DataType other) { + return DataTypeChecks.equalsWithFieldId(this, other); + } + @Override public int hashCode() { return Objects.hash(super.hashCode(), fields); diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java b/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java index 7c392b0554..c16c6b10b8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java @@ -17,11 +17,9 @@ package org.apache.fluss.utils; -import org.apache.fluss.metadata.Schema; import org.apache.fluss.types.RowType; import java.util.Arrays; -import java.util.List; import static org.apache.fluss.utils.Preconditions.checkState; @@ -30,14 +28,12 @@ * projection includes both reducing the accessible fields and reordering them. Currently, this only * supports top-level projection. Nested projection will be supported in the future. * - *

For example, given a row with fields [a, b, c, d, e] with id [0, 1, 3, 4, 5], a projection [2, - * 0, 3] will project the row to [c, a, d] with id [3, 0, 4]. The projection indexes are 0-based. + *

For example, given a row with fields [a, b, c, d, e], a projection [2, 0, 3] will project the + * row to [c, a, d]. The projection indexes are 0-based. * *

    - *
  • The {@link #projectionPositions} indexes will be [2, 0, 3] - *
  • The {@link #projectionPositions} id will be [3, 0, 4] - *
  • The {@link #projectionIdsInOrder} indexes will be [0, 3, 4] - *
  • The {@link #projectionPositionsOrderById} indexes will be [0, 2, 3] + *
  • The {@link #projection} indexes will be [2, 0, 3] + *
  • The {@link #projectionInOrder} indexes will be [0, 2, 3] *
  • The {@link #reorderingIndexes} indexes will be [1, 0, 2] *
* @@ -45,76 +41,42 @@ */ public class Projection { /** the projection indexes including both selected fields and reordering them. */ - final int[] projectionPositions; + final int[] projection; /** the projection indexes that only select fields but not reordering them. */ - final int[] projectionPositionsOrderById; - /** the projection indexes that only select fields but not reordering them. */ - final int[] projectionIdsInOrder; - /** - * the indexes to reorder the fields of {@link #projectionPositionsOrderById} to {@link - * #projectionPositions}. - */ + final int[] projectionInOrder; + /** the indexes to reorder the fields of {@link #projectionInOrder} to {@link #projection}. */ final int[] reorderingIndexes; /** the flag to indicate whether reordering is needed. */ final boolean reorderingNeeded; - public static Projection of(int[] indexes, Schema schema) { - int[] ids = new int[indexes.length]; - List columnIds = schema.getColumnIds(); - for (int i = 0; i < indexes.length; i++) { - ids[i] = columnIds.get(indexes[i]); - } - - return new Projection(indexes, ids); + /** Create a {@link Projection} of the provided {@code indexes}. */ + public static Projection of(int[] indexes) { + return new Projection(indexes); } - /** - * Create a {@link Projection} of the provided {@code indexes} and {@code projectionIds}. - * - *

Typically, {@code projectionIndexes} and {@code projectionIds} are the same. But when - * removing a middle column or reordering columns, they won't be the same. In this case, when - * the schema on the fluss server side differs from the schema on the client side during user - * queries, using {@code projectionIds} ensures correctness by mapping to the actual server-side - * column positions. - * - * @param projectionIndexes the indexes of the projection, which is used for user to read data - * from client side. These indexes are based on the schema visible to the client side and - * are used to parse the data returned by the fluss server. - * @param projectionIds the ids of the projection, which is used for fluss server scan. These - * ids are based on the actual schema on the fluss server side and are pushed down to the - * server for querying. - */ - private Projection(int[] projectionIndexes, int[] projectionIds) { - checkState( - projectionIds.length == projectionIndexes.length, - "The number of ids must be equal to the number of indexes."); - this.projectionPositions = projectionIndexes; - - // reorder the projection id for lookup. - this.projectionIdsInOrder = Arrays.copyOf(projectionIds, projectionIds.length); - Arrays.sort(projectionIdsInOrder); - this.reorderingNeeded = !Arrays.equals(projectionIds, projectionIdsInOrder); - this.reorderingIndexes = new int[projectionPositions.length]; - this.projectionPositionsOrderById = new int[projectionPositions.length]; - for (int i = 0; i < projectionIds.length; i++) { - int index = Arrays.binarySearch(projectionIdsInOrder, projectionIds[i]); + private Projection(int[] projection) { + this.projection = projection; + this.projectionInOrder = Arrays.copyOf(projection, projection.length); + Arrays.sort(projectionInOrder); + this.reorderingNeeded = !Arrays.equals(projection, projectionInOrder); + this.reorderingIndexes = new int[projection.length]; + for (int i = 0; i < projection.length; i++) { + int index = Arrays.binarySearch(projectionInOrder, projection[i]); checkState(index >= 0, "The projection index is invalid."); reorderingIndexes[i] = index; - projectionPositionsOrderById[i] = projectionIndexes[index]; } } public RowType projectInOrder(RowType rowType) { - return rowType.project(projectionPositionsOrderById); + return rowType.project(projectionInOrder); } - public int[] getProjectionPositions() { - return projectionPositions; + public int[] getProjection() { + return projection; } - /** The id of the projection, which is used for fluss server scan. */ - public int[] getProjectionIdInOrder() { - return projectionIdsInOrder; + public int[] getProjectionInOrder() { + return projectionInOrder; } public boolean isReorderingNeeded() { diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java b/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java index 9bcd3262b9..12e6fb1c54 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java @@ -70,6 +70,7 @@ public class DataTypeJsonSerde implements JsonSerializer, JsonDeserial static final String FIELD_NAME_FIELDS = "fields"; static final String FIELD_NAME_FIELD_NAME = "name"; static final String FIELD_NAME_FIELD_TYPE = "field_type"; + static final String FIELD_NAME_FIELD_ID = "field_id"; static final String FIELD_NAME_FIELD_DESCRIPTION = "description"; @Override @@ -190,6 +191,8 @@ private static void serializeRow(RowType dataType, JsonGenerator jsonGenerator) jsonGenerator.writeStringField( FIELD_NAME_FIELD_DESCRIPTION, dataField.getDescription().get()); } + + jsonGenerator.writeNumberField(FIELD_NAME_FIELD_ID, dataField.getFieldId()); jsonGenerator.writeEndObject(); } jsonGenerator.writeEndArray(); @@ -291,18 +294,23 @@ private static DataType deserializeMap(JsonNode dataTypeNode) { private static DataType deserializeRow(JsonNode dataTypeNode) { final ArrayNode fieldNodes = (ArrayNode) dataTypeNode.get(FIELD_NAME_FIELDS); final List fields = new ArrayList<>(); + for (JsonNode fieldNode : fieldNodes) { final String fieldName = fieldNode.get(FIELD_NAME_FIELD_NAME).asText(); final DataType fieldType = DataTypeJsonSerde.INSTANCE.deserialize(fieldNode.get(FIELD_NAME_FIELD_TYPE)); - final String fieldDescription; - if (fieldNode.has(FIELD_NAME_FIELD_DESCRIPTION)) { - fieldDescription = fieldNode.get(FIELD_NAME_FIELD_DESCRIPTION).asText(); - } else { - fieldDescription = null; - } - fields.add(new DataField(fieldName, fieldType, fieldDescription)); + final String fieldDescription = + fieldNode.has(FIELD_NAME_FIELD_DESCRIPTION) + ? fieldNode.get(FIELD_NAME_FIELD_DESCRIPTION).asText() + : null; + + final int fieldId = + fieldNode.has(FIELD_NAME_FIELD_ID) + ? fieldNode.get(FIELD_NAME_FIELD_ID).asInt() + : -1; + fields.add(new DataField(fieldName, fieldType, fieldDescription, fieldId)); } + return new RowType(fields); } } diff --git a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java index 32ba62b069..82e2fd76ec 100644 --- a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java @@ -18,10 +18,13 @@ package org.apache.fluss.metadata; import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -122,6 +125,140 @@ void testAutoIncrementColumnSchema() { .hasMessage("Auto increment column can only be used in primary-key table."); } + @Test + void testReassignFieldId() { + // Schema.Builder.column will reassign field id in flatten order. + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.STRING().copy(false)) + .column( + "f1", + DataTypes.ROW( + DataTypes.FIELD("n0", DataTypes.TINYINT(), 0), + DataTypes.FIELD("n1", DataTypes.STRING(), 1), + DataTypes.FIELD( + "n2", + DataTypes.ROW( + DataTypes.FIELD( + "m1", DataTypes.TINYINT(), 0)), + 2))) + .column( + "f2", + DataTypes.ROW( + DataTypes.FIELD("n0", DataTypes.TINYINT(), 0), + DataTypes.FIELD("n1", DataTypes.STRING(), 1))) + .column("f3", DataTypes.STRING()) + .primaryKey("f0") + .build(); + assertThat(schema.getColumnIds()).containsExactly(0, 1, 6, 9); + RowType expectedType = + new RowType( + true, + Arrays.asList( + DataTypes.FIELD("f0", DataTypes.STRING().copy(false), 0), + DataTypes.FIELD( + "f1", + DataTypes.ROW( + DataTypes.FIELD("n0", DataTypes.TINYINT(), 2), + DataTypes.FIELD("n1", DataTypes.STRING(), 3), + DataTypes.FIELD( + "n2", + DataTypes.ROW( + DataTypes.FIELD( + "m1", + DataTypes.TINYINT(), + 5)), + 4)), + 1), + DataTypes.FIELD( + "f2", + DataTypes.ROW( + DataTypes.FIELD("n0", DataTypes.TINYINT(), 7), + DataTypes.FIELD("n1", DataTypes.STRING(), 8)), + 6), + DataTypes.FIELD("f3", DataTypes.STRING(), 9))); + assertThat(schema.getRowType().equalsWithFieldId(expectedType)).isTrue(); + + // Schema.Builder.fromColumns won't reassign field id. + List columns = + Arrays.asList( + new Schema.Column("f0", DataTypes.STRING().copy(false), null, 0), + new Schema.Column( + "f1", + DataTypes.ROW( + DataTypes.FIELD("n0", DataTypes.TINYINT(), 0), + DataTypes.FIELD("n1", DataTypes.STRING(), 1), + DataTypes.FIELD( + "n2", + DataTypes.ROW( + DataTypes.FIELD( + "m1", DataTypes.TINYINT(), 1)), + 2)), + null, + 1), + new Schema.Column( + "f2", + DataTypes.ROW( + DataTypes.FIELD("n0", DataTypes.TINYINT(), 0), + DataTypes.FIELD("n1", DataTypes.STRING(), 1)), + null, + 2)); + assertThatThrownBy(() -> Schema.newBuilder().fromColumns(columns).build()) + .hasMessageContaining( + "All field IDs (including nested fields) must be unique. Found 3 unique IDs but expected 9"); + List columns2 = + Arrays.asList( + new Schema.Column("f0", DataTypes.STRING().copy(false), null, 0), + new Schema.Column( + "f1", + DataTypes.ROW( + DataTypes.FIELD("n0", DataTypes.TINYINT(), 6), + DataTypes.FIELD("n1", DataTypes.STRING(), 7), + DataTypes.FIELD( + "n2", + DataTypes.ROW( + DataTypes.FIELD( + "m1", DataTypes.TINYINT(), 11)), + 8)), + null, + 1), + new Schema.Column( + "f2", + DataTypes.ROW( + DataTypes.FIELD("n0", DataTypes.TINYINT(), 9), + DataTypes.FIELD("n1", DataTypes.STRING(), 10)), + null, + 2)); + schema = Schema.newBuilder().fromColumns(columns2).build(); + assertThat(schema.getColumnIds()).containsExactly(0, 1, 2); + expectedType = + new RowType( + true, + Arrays.asList( + DataTypes.FIELD("f0", DataTypes.STRING().copy(false), 0), + DataTypes.FIELD( + "f1", + DataTypes.ROW( + DataTypes.FIELD("n0", DataTypes.TINYINT(), 6), + DataTypes.FIELD("n1", DataTypes.STRING(), 7), + DataTypes.FIELD( + "n2", + DataTypes.ROW( + DataTypes.FIELD( + "m1", + DataTypes.TINYINT(), + 11)), + 8)), + 1), + DataTypes.FIELD( + "f2", + DataTypes.ROW( + DataTypes.FIELD("n0", DataTypes.TINYINT(), 9), + DataTypes.FIELD("n1", DataTypes.STRING(), 10)), + 2))); + assertThat(schema.getRowType().equalsWithFieldId(expectedType)).isTrue(); + } + @Test void testSchemaBuilderColumnWithAggFunction() { Schema schema = diff --git a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java index 9a27089ac1..b85e4db32f 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java @@ -134,7 +134,7 @@ void testIllegalSetCurrentProjection() throws Exception { new int[] {3}, recordsOfData2RowType.sizeInBytes())) .isInstanceOf(InvalidColumnProjectionException.class) - .hasMessage("Projected field id 3 is not contained in [0, 1, 2]"); + .hasMessage("Projected fields [3] is out of bound for schema with 3 fields."); assertThatThrownBy( () -> @@ -162,6 +162,51 @@ void testIllegalSetCurrentProjection() throws Exception { "The projection indexes should not contain duplicated fields, but is [0, 0, 0]"); } + @Test + void testProjectionOldDataWithNewSchema() throws Exception { + // Currently, we only support add column at last. + short schemaId = 1; + try (FileLogRecords records = + createFileLogRecords( + schemaId, LOG_MAGIC_VALUE_V1, TestData.DATA1_ROW_TYPE, TestData.DATA1)) { + + ProjectionPushdownCache cache = new ProjectionPushdownCache(); + FileLogProjection projection = new FileLogProjection(cache); + assertThat( + doProjection( + 2L, + 2, + projection, + records, + new int[] {1}, + records.sizeInBytes())) + .containsExactly( + new Object[] {"a"}, + new Object[] {"b"}, + new Object[] {"c"}, + new Object[] {"d"}, + new Object[] {"e"}, + new Object[] {"f"}, + new Object[] {"g"}, + new Object[] {"h"}, + new Object[] {"i"}, + new Object[] {"j"}); + + assertThatThrownBy( + () -> + doProjection( + 1L, + 2, + projection, + records, + new int[] {0, 2}, + records.sizeInBytes())) + .isInstanceOf(InvalidColumnProjectionException.class) + .hasMessage( + "Projected fields [0, 2] is out of bound for schema with 2 fields."); + } + } + static Stream projectedFieldsArgs() { return Stream.of( Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V0, (short) 1), @@ -420,7 +465,7 @@ private List doProjection( private List doProjection( long tableId, - short schemaId, + int schemaId, FileLogProjection projection, FileLogRecords fileLogRecords, int[] projectedFields, @@ -437,7 +482,7 @@ private List doProjection( List results = new ArrayList<>(); long expectedOffset = 0L; try (LogRecordReadContext context = - createArrowReadContext(projectedType, schemaId, testingSchemaGetter)) { + createArrowReadContext(projectedType, schemaId, testingSchemaGetter, true)) { for (LogRecordBatch batch : project.batches()) { try (CloseableIterator records = batch.records(context)) { while (records.hasNext()) { diff --git a/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java b/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java index e9f0dc8cc5..309f0fabd0 100644 --- a/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java @@ -608,6 +608,16 @@ void testRowType() { new DataField("a1", new IntType(), "column a1"), new DataField("b", new CharType(5), "column b")))); + // test ignore field_id + dataTypeBaseAssert( + rowType, + true, + "ROW<`a` INT 'column a', `b` CHAR(5) 'column b'>", + new RowType( + Arrays.asList( + new DataField("a1", new IntType(), "column a1"), + new DataField("b", new CharType(5), "column b")))); + rowType = new RowType( false, @@ -654,7 +664,7 @@ void testRowType() { new RowType( Arrays.asList( new DataField("a1", new IntType(), "column a1"), - new DataField("b", new CharType(5))))); + new DataField("b", new CharType(5), 1)))); rowType = RowType.of(DataTypes.CHAR(1), DataTypes.CHAR(2)); dataTypeBaseAssert( diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java index 48fc2ac670..4f85e95166 100644 --- a/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java @@ -17,7 +17,6 @@ package org.apache.fluss.utils; -import org.apache.fluss.metadata.Schema; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.ProjectedRow; @@ -33,15 +32,8 @@ class ProjectionTest { @Test void testProjection() { - Schema schema = - Schema.newBuilder() - .column("f0", DataTypes.INT()) - .column("f1", DataTypes.BIGINT()) - .column("f2", DataTypes.STRING()) - .column("f3", DataTypes.DOUBLE()) - .build(); - Projection projection = Projection.of(new int[] {2, 0, 3}, schema); - assertThat(projection.getProjectionIdInOrder()).isEqualTo(new int[] {0, 2, 3}); + Projection projection = Projection.of(new int[] {2, 0, 3}); + assertThat(projection.getProjectionInOrder()).isEqualTo(new int[] {0, 2, 3}); RowType rowType = projection.projectInOrder( @@ -53,21 +45,21 @@ void testProjection() { assertThat(rowType) .isEqualTo( DataTypes.ROW( - DataTypes.FIELD("f0", DataTypes.INT()), - DataTypes.FIELD("f2", DataTypes.STRING()), - DataTypes.FIELD("f3", DataTypes.DOUBLE()))); + DataTypes.FIELD("f0", DataTypes.INT(), 0), + DataTypes.FIELD("f2", DataTypes.STRING(), 2), + DataTypes.FIELD("f3", DataTypes.DOUBLE(), 3))); assertThat(projection.isReorderingNeeded()).isTrue(); assertThat(projection.getReorderingIndexes()).isEqualTo(new int[] {1, 0, 2}); assertThat(rowType.project(projection.getReorderingIndexes())) .isEqualTo( DataTypes.ROW( - DataTypes.FIELD("f2", DataTypes.STRING()), - DataTypes.FIELD("f0", DataTypes.INT()), - DataTypes.FIELD("f3", DataTypes.DOUBLE()))); + DataTypes.FIELD("f2", DataTypes.STRING(), 2), + DataTypes.FIELD("f0", DataTypes.INT(), 0), + DataTypes.FIELD("f3", DataTypes.DOUBLE(), 3))); GenericRow row = GenericRow.of(0, 1L, BinaryString.fromString("2"), 3.0d); - ProjectedRow p1 = ProjectedRow.from(projection.getProjectionIdInOrder()); + ProjectedRow p1 = ProjectedRow.from(projection.getProjectionInOrder()); p1.replaceRow(row); ProjectedRow p2 = ProjectedRow.from(projection.getReorderingIndexes()); p2.replaceRow(p1); diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java index 716e043475..eba8159fb5 100644 --- a/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java @@ -20,11 +20,13 @@ import org.apache.fluss.metadata.Schema; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.IntType; +import org.apache.fluss.types.RowType; import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import static org.assertj.core.api.Assertions.assertThat; @@ -36,11 +38,21 @@ protected ColumnJsonSerdeTest() { @Override protected Schema.Column[] createObjects() { - Schema.Column[] columns = new Schema.Column[4]; + Schema.Column[] columns = new Schema.Column[5]; columns[0] = new Schema.Column("a", DataTypes.STRING()); columns[1] = new Schema.Column("b", DataTypes.INT(), "hello b"); columns[2] = new Schema.Column("c", new IntType(false), "hello c"); columns[3] = new Schema.Column("d", new IntType(false), "hello c", (short) 2); + columns[4] = + new Schema.Column( + "e", + new RowType( + true, + Arrays.asList( + DataTypes.FIELD("f", DataTypes.STRING()), + DataTypes.FIELD("g", DataTypes.STRING(), 1))), + "hello c", + (short) 2); return columns; } @@ -50,7 +62,8 @@ protected String[] expectedJsons() { "{\"name\":\"a\",\"data_type\":{\"type\":\"STRING\"},\"id\":-1}", "{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"hello b\",\"id\":-1}", "{\"name\":\"c\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello c\",\"id\":-1}", - "{\"name\":\"d\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello c\",\"id\":2}" + "{\"name\":\"d\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello c\",\"id\":2}", + "{\"name\":\"e\",\"data_type\":{\"type\":\"ROW\",\"fields\":[{\"name\":\"f\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":-1},{\"name\":\"g\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":1}]},\"comment\":\"hello c\",\"id\":2}" }; } diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java index 82d6ca4e8b..20e338f605 100644 --- a/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java @@ -24,6 +24,8 @@ import org.apache.fluss.types.BytesType; import org.apache.fluss.types.CharType; import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeChecks; +import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.DateType; import org.apache.fluss.types.DecimalType; import org.apache.fluss.types.DoubleType; @@ -38,10 +40,15 @@ import org.apache.fluss.types.TimestampType; import org.apache.fluss.types.TinyIntType; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; + /** Test for {@link DataTypeJsonSerde}. */ public class DataTypeJsonSerdeTest extends JsonSerdeTestBase { @@ -49,6 +56,12 @@ public class DataTypeJsonSerdeTest extends JsonSerdeTestBase { super(DataTypeJsonSerde.INSTANCE); } + @Override + protected void assertEquals(DataType actual, DataType expected) { + // compare with field_id. + assertThat(DataTypeChecks.equalsWithFieldId(actual, expected)).isTrue(); + } + @Override protected DataType[] createObjects() { final List types = @@ -77,7 +90,12 @@ protected DataType[] createObjects() { new LocalZonedTimestampType(3), new ArrayType(new IntType(false)), new MapType(new BigIntType(false), new IntType(false)), - RowType.of(new BigIntType(), new IntType(false), new StringType())); + new RowType( + true, + Arrays.asList( + DataTypes.FIELD("f0", new BigIntType(), null), + DataTypes.FIELD("f1", new IntType(false), null, 1), + DataTypes.FIELD("f2", new StringType(), null, 2)))); final List allTypes = new ArrayList<>(); // consider nullable @@ -139,8 +157,27 @@ protected String[] expectedJsons() { "{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}", "{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}", "{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}", - "{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}", - "{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}", + "{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":-1},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}", + "{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":-1},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}" }; } + + @Test + void testJsonLackOfFieldId() { + // some fields with field_id while others without field_id. + String testJsonWithInconsistencyFieldId = + "{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}"; + DataType dataType = + JsonSerdeUtils.readValue( + testJsonWithInconsistencyFieldId.getBytes(StandardCharsets.UTF_8), + DataTypeJsonSerde.INSTANCE); + assertThat(dataType).isInstanceOf(RowType.class); + assertEquals( + dataType, + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.BIGINT()), + DataTypes.FIELD("f1", DataTypes.INT().copy(false), 1), + DataTypes.FIELD("f2", DataTypes.STRING(), 2)) + .copy(false)); + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java index 018214b3fb..54c5f69ea3 100644 --- a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java @@ -79,6 +79,18 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase { .enableAutoIncrement("b") .build(); + static final Schema SCHEMA_5 = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is first column") + .column( + "b", + DataTypes.ROW( + DataTypes.FIELD("c", DataTypes.INT(), "a is first column", 0), + DataTypes.FIELD("d", DataTypes.INT(), "a is first column", 1))) + .withComment("b is second column") + .build(); + static final String SCHEMA_JSON_0 = "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"highest_field_id\":2}"; static final String SCHEMA_JSON_1 = @@ -90,6 +102,9 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase { static final String SCHEMA_JSON_4 = "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b is second column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c is third column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}"; + static final String SCHEMA_JSON_5 = + "{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"a is first column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"ROW\",\"fields\":[{\"name\":\"c\",\"field_type\":{\"type\":\"INTEGER\"},\"description\":\"a is first column\",\"field_id\":2},{\"name\":\"d\",\"field_type\":{\"type\":\"INTEGER\"},\"description\":\"a is first column\",\"field_id\":3}]},\"comment\":\"b is second column\",\"id\":1}],\"highest_field_id\":3}"; + static final Schema SCHEMA_WITH_AGG = Schema.newBuilder() .column("product_id", DataTypes.BIGINT().copy(false)) @@ -112,9 +127,18 @@ public class SchemaJsonSerdeTest extends JsonSerdeTestBase { super(SchemaJsonSerde.INSTANCE); } + @Override + protected void assertEquals(Schema actual, Schema expected) { + assertThat(actual).isEqualTo(expected); + // compare field ids. + assertThat(actual.getRowType().equalsWithFieldId(expected.getRowType())).isTrue(); + } + @Override protected Schema[] createObjects() { - return new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3, SCHEMA_4, SCHEMA_WITH_AGG}; + return new Schema[] { + SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3, SCHEMA_4, SCHEMA_5, SCHEMA_WITH_AGG + }; } @Override @@ -125,6 +149,7 @@ protected String[] expectedJsons() { SCHEMA_JSON_1, SCHEMA_JSON_3, SCHEMA_JSON_4, + SCHEMA_JSON_5, SCHEMA_JSON_WITH_AGG }; } @@ -132,13 +157,12 @@ protected String[] expectedJsons() { @Test void testCompatibilityFromJsonLackOfColumnId() { String[] jsons = jsonLackOfColumnId(); - Schema[] expectedSchema = new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3}; + Schema[] expectedSchema = new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3, SCHEMA_5}; for (int i = 0; i < jsons.length; i++) { - assertThat( - JsonSerdeUtils.readValue( - jsons[i].getBytes(StandardCharsets.UTF_8), - SchemaJsonSerde.INSTANCE)) - .isEqualTo(expectedSchema[i]); + assertEquals( + JsonSerdeUtils.readValue( + jsons[i].getBytes(StandardCharsets.UTF_8), SchemaJsonSerde.INSTANCE), + expectedSchema[i]); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index 3088c9d308..9aef6c931b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -198,16 +198,16 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa } // first build schema with physical columns - schemBuilder.fromColumns( - resolvedSchema.getColumns().stream() - .filter(Column::isPhysical) - .map( - column -> - new Schema.Column( - column.getName(), - FlinkConversions.toFlussType(column.getDataType()), - column.getComment().orElse(null))) - .collect(Collectors.toList())); + resolvedSchema.getColumns().stream() + .filter(Column::isPhysical) + .forEachOrdered( + column -> { + schemBuilder + .column( + column.getName(), + FlinkConversions.toFlussType(column.getDataType())) + .withComment(column.getComment().orElse(null)); + }); // convert some flink options to fluss table configs. Map storageProperties = diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java index 0c44329865..70d581fda3 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java @@ -17,7 +17,11 @@ package org.apache.fluss.flink.sink; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.table.Table; import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -46,6 +50,7 @@ import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Integration tests for Array type support in Flink connector. */ @@ -592,4 +597,74 @@ void testExceptionsForComplexTypesUsage() { "Bucket key column 'info' has unsupported data type ROW<`name` STRING, `age` INT>. " + "Currently, bucket key column does not support types: [ARRAY, MAP, ROW]."); } + + @Test + void testProjectionAndAddColumnInLogTable() throws Exception { + tEnv.executeSql( + "create table row_log_test (" + + "id int, " + + "simple_row row, " + + "nested_row row, v string>, " + + "array_of_rows array>, " + + "data string " + + ") with ('bucket.num' = '3')"); + + tEnv.executeSql( + "INSERT INTO row_log_test VALUES " + + "(1, ROW(10, 'hello'), ROW(20, ROW(30, 'nested'), 'row1'), " + + "ARRAY[ROW(1, 'a'), ROW(2, 'b')], 'aa'), " + + "(2, ROW(40, 'world'), ROW(50, ROW(60, 'test'), 'row2'), " + + "ARRAY[ROW(3, 'c')], 'bb')") + .await(); + + CloseableIterator rowIter = tEnv.executeSql("select * from row_log_test").collect(); + List expectedRows = + Arrays.asList( + "+I[1, +I[10, hello], +I[20, +I[30, nested], row1], [+I[1, a], +I[2, b]], aa]", + "+I[2, +I[40, world], +I[50, +I[60, test], row2], [+I[3, c]], bb]"); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + + // Currently, flink not supported push down nested row projection because + // FlinkTableSource.supportsNestedProjection returns false. + // Todo: support nested row projection pushdown in + // https://github.com/apache/fluss/issues/2311 later. + String s = + tEnv.explainSql("select id, simple_row.a, nested_row.y.z, data from row_log_test"); + assertThat(s) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, row_log_test, project=[id, simple_row, nested_row, data]]]"); + rowIter = + tEnv.executeSql("select id, simple_row.a, nested_row.y.z, data from row_log_test") + .collect(); + expectedRows = Arrays.asList("+I[1, 10, 30, aa]", "+I[2, 40, 60, bb]"); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + // Test add columns + tEnv.executeSql( + "alter table row_log_test add (" + + "simple_row2 row, " + + "nested_row2 row, v string>, " + + "array_of_rows2 array>)"); + + tEnv.executeSql( + "INSERT INTO row_log_test VALUES " + + "(1, ROW(10, 'hello'), ROW(20, ROW(30, 'nested'), 'row1'), ARRAY[ROW(1, 'a'), ROW(2, 'b')], 'aa', ROW(10, 'hello'), ROW(20, ROW(30, 'nested'), 'row1'), ARRAY[ROW(1, 'a'), ROW(2, 'b')])," + + "(2, ROW(40, 'world'), ROW(50, ROW(60, 'test'), 'row2'), ARRAY[ROW(3, 'c')], 'bb', ROW(40, 'world'), ROW(50, ROW(60, 'test'), 'row2'), ARRAY[ROW(3, 'c')])") + .await(); + rowIter = tEnv.executeSql("select * from row_log_test").collect(); + expectedRows = + Arrays.asList( + "+I[1, +I[10, hello], +I[20, +I[30, nested], row1], [+I[1, a], +I[2, b]], aa, null, null, null]", + "+I[2, +I[40, world], +I[50, +I[60, test], row2], [+I[3, c]], bb, null, null, null]", + "+I[1, +I[10, hello], +I[20, +I[30, nested], row1], [+I[1, a], +I[2, b]], aa, +I[10, hello], +I[20, +I[30, nested], row1], [+I[1, a], +I[2, b]]]", + "+I[2, +I[40, world], +I[50, +I[60, test], row2], [+I[3, c]], bb, +I[40, world], +I[50, +I[60, test], row2], [+I[3, c]]]"); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + try (Connection conn = + ConnectionFactory.createConnection( + FLUSS_CLUSTER_EXTENSION.getClientConfig()); + Table table = conn.getTable(TablePath.of(DEFAULT_DB, "row_log_test"))) { + // check field id + org.apache.fluss.metadata.Schema schema = table.getTableInfo().getSchema(); + assertThat(schema.getColumnIds()).containsExactly(0, 1, 4, 10, 13, 14, 17, 23); + } + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java index fffb0c8785..ad142a6c5d 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java @@ -24,7 +24,9 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeChecks; import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.api.Schema; @@ -55,6 +57,7 @@ import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER; import static org.apache.fluss.flink.utils.CatalogTableTestUtils.addOptions; import static org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema; +import static org.apache.fluss.types.DataTypes.FIELD; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -85,8 +88,8 @@ void testTypeConversion() { DataTypes.ARRAY(DataTypes.STRING()), DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), DataTypes.ROW( - DataTypes.FIELD("a", DataTypes.STRING().copy(false)), - DataTypes.FIELD("b", DataTypes.INT()))); + FIELD("a", DataTypes.STRING().copy(false)), + FIELD("b", DataTypes.INT()))); // flink types List flinkTypes = @@ -133,6 +136,13 @@ void testTypeConversion() { } assertThat(actualFlussTypes).isEqualTo(flussTypes); + // check the field id of rowType. + assertThat( + DataTypeChecks.equalsWithFieldId( + actualFlussTypes.get(actualFlussTypes.size() - 1), + flussTypes.get(flinkTypes.size() - 1))) + .isTrue(); + // test conversion for data types not supported in Fluss assertThatThrownBy(() -> FlinkConversions.toFlussType(VARCHAR(10))) .isInstanceOf(UnsupportedOperationException.class) @@ -158,6 +168,34 @@ void testTableConversion() { Column.physical( "order_id", org.apache.flink.table.api.DataTypes.STRING().notNull()), + Column.physical( + "item", + org.apache.flink.table.api.DataTypes.ROW( + org.apache.flink.table.api.DataTypes.FIELD( + "item_id", + org.apache.flink.table.api.DataTypes + .STRING()), + org.apache.flink.table.api.DataTypes.FIELD( + "item_price", + org.apache.flink.table.api.DataTypes + .STRING()), + org.apache.flink.table.api.DataTypes.FIELD( + "item_details", + org.apache.flink.table.api.DataTypes.ROW( + org.apache.flink.table.api.DataTypes + .FIELD( + "category", + org.apache.flink + .table.api + .DataTypes + .STRING()), + org.apache.flink.table.api.DataTypes + .FIELD( + "specifications", + org.apache.flink + .table.api + .DataTypes + .STRING()))))), Column.physical( "orig_ts", org.apache.flink.table.api.DataTypes.TIMESTAMP()), @@ -181,18 +219,37 @@ void testTableConversion() { String expectFlussTableString = "TableDescriptor{schema=(" + "order_id STRING NOT NULL," + + "item ROW<`item_id` STRING, `item_price` STRING, `item_details` ROW<`category` STRING, `specifications` STRING>>," + "orig_ts TIMESTAMP(6)," + "CONSTRAINT PK_order_id PRIMARY KEY (order_id)" + "), comment='test comment', partitionKeys=[], " + "tableDistribution={bucketKeys=[order_id] bucketCount=null}, " + "properties={}, " - + "customProperties={schema.watermark.0.strategy.expr=orig_ts, " - + "schema.2.expr=orig_ts, schema.2.data-type=TIMESTAMP(3), " + + "customProperties={" + + "schema.3.data-type=TIMESTAMP(3), " + + "schema.watermark.0.strategy.expr=orig_ts, " + + "schema.3.name=compute_ts, " + "schema.watermark.0.rowtime=orig_ts, " + "schema.watermark.0.strategy.data-type=TIMESTAMP(3), " + "k1=v1, k2=v2, " - + "schema.2.name=compute_ts}}"; + + "schema.3.expr=orig_ts}}"; assertThat(flussTable.toString()).isEqualTo(expectFlussTableString); + assertThat(flussTable.getSchema().getColumnIds()).containsExactly(0, 1, 7); + // check the nested row column "item" + org.apache.fluss.metadata.Schema.Column column = flussTable.getSchema().getColumns().get(1); + assertThat(column.getName()).isEqualTo("item"); + assertThat(column.getDataType()).isInstanceOf(RowType.class); + RowType rowType = (RowType) column.getDataType(); + assertThat(rowType.getFields()) + .containsExactly( + FIELD("item_id", DataTypes.STRING(), 2), + FIELD("item_price", DataTypes.STRING(), 3), + FIELD( + "item_details", + DataTypes.ROW( + FIELD("category", DataTypes.STRING(), 5), + FIELD("specifications", DataTypes.STRING(), 6)), + 4)); // test convert fluss table to flink table TablePath tablePath = TablePath.of("db", "table"); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index 3162e4bed1..eff79605a4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -21,6 +21,8 @@ import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.ReassignFieldId; import java.util.ArrayList; import java.util.HashMap; @@ -106,12 +108,11 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { "Column " + addColumn.getName() + " must be nullable."); } + int columnId = highestFieldId.incrementAndGet(); + DataType dataType = ReassignFieldId.reassign(addColumn.getDataType(), highestFieldId); + Schema.Column newColumn = - new Schema.Column( - addColumn.getName(), - addColumn.getDataType(), - addColumn.getComment(), - (byte) highestFieldId.incrementAndGet()); + new Schema.Column(addColumn.getName(), dataType, addColumn.getComment(), columnId); columns.add(newColumn); existedColumns.put(newColumn.getName(), newColumn); return this; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java index 8f12e43c47..f21d5bdd86 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java @@ -63,7 +63,9 @@ import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.BucketAssignment; import org.apache.fluss.server.zk.data.TableAssignment; +import org.apache.fluss.types.DataTypeChecks; import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; import org.apache.fluss.utils.json.DataTypeJsonSerde; import org.apache.fluss.utils.json.JsonSerdeUtils; @@ -699,7 +701,19 @@ void testSchemaEvolution() throws Exception { pbTableMetadata.getCreatedTime(), pbTableMetadata.getModifiedTime()); List columns = tableInfo.getSchema().getColumns(); - assertThat(columns.size()).isEqualTo(3); + assertThat(columns.size()).isEqualTo(4); + assertThat(tableInfo.getSchema().getColumnIds()).containsExactly(0, 1, 2, 5); + // check nested row's field_id. + assertThat(columns.get(2).getName()).isEqualTo("new_nested_column"); + assertThat( + DataTypeChecks.equalsWithFieldId( + columns.get(2).getDataType(), + new RowType( + true, + Arrays.asList( + DataTypes.FIELD("f0", DataTypes.STRING(), 3), + DataTypes.FIELD("f1", DataTypes.INT(), 4))))) + .isTrue(); } private void checkBucketMetadata(int expectBucketCount, List bucketMetadata) { @@ -830,6 +844,17 @@ private static List alterTableProperties( private static List alterTableAddColumns() { List addColumns = new ArrayList<>(); + PbAddColumn newNestedColumn = new PbAddColumn(); + newNestedColumn + .setColumnName("new_nested_column") + .setDataTypeJson( + JsonSerdeUtils.writeValueAsBytes( + DataTypes.ROW(DataTypes.STRING(), DataTypes.INT()), + DataTypeJsonSerde.INSTANCE)) + .setComment("new_nested_column") + .setColumnPositionType(0); + addColumns.add(newNestedColumn); + PbAddColumn newColumn = new PbAddColumn(); newColumn .setColumnName("new_column") @@ -839,6 +864,7 @@ private static List alterTableAddColumns() { .setComment("new_column") .setColumnPositionType(0); addColumns.add(newColumn); + return addColumns; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 01cf2701a4..5a28f9e2be 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -330,7 +330,7 @@ void testFetchLog(byte recordBatchMagic) throws Exception { tableId, 0, Errors.INVALID_COLUMN_PROJECTION.code(), - "Projected field id 2 is not contained in [0, 1]"); + "Projected fields [2, 3] is out of bound for schema with 2 fields."); } @Test