Skip to content

Commit 45ee54e

Browse files
committed
[common] Add field_id for Nested Row.
1 parent 9180257 commit 45ee54e

File tree

5 files changed

+7
-13
lines changed

5 files changed

+7
-13
lines changed

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,6 @@ public Builder fromRowType(RowType rowType) {
344344
* Adopts the given field names and field data types as physical columns of the schema.
345345
*
346346
* <p>This method internally calls {@link #column(String, DataType)} for each field, which
347-
*
348-
* <p>This method internally calls {@link #column(String, DataType)} for each field, which
349347
* means: The original field IDs in the RowType will be ignored and replaced with new ones.
350348
* If you need to preserve existing field IDs, use {@link #fromColumns(List)} or {@link
351349
* #fromSchema(Schema)} instead.

fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.List;
2222
import java.util.concurrent.atomic.AtomicInteger;
2323

24-
/** Reassign field id by given field id. */
24+
/** Visitor that recursively reassigns field IDs in nested data types using a provided counter. */
2525
public class ReassignFieldId extends DataTypeDefaultVisitor<DataType> {
2626

2727
private final AtomicInteger highestFieldId;

fluss-common/src/main/java/org/apache/fluss/types/RowType.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,6 @@ private static List<DataField> validateFields(List<DataField> fields) {
250250
"Field IDs must be either all -1 or all non-negative. "
251251
+ "Mixed field IDs are not allowed.");
252252
} else {
253-
// All fields have non-negative IDs
254-
if (fieldIds.stream().anyMatch(id -> id < 0)) {
255-
throw new IllegalArgumentException("Field ID must not be negative.");
256-
}
257253
final Set<Integer> duplicateIds =
258254
fieldIds.stream()
259255
.filter(id -> Collections.frequency(fieldIds, id) > 1)

fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ protected String[] expectedJsons() {
143143
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
144144
"{\"type\":\"ARRAY\",\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
145145
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
146-
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\"},\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
147-
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
146+
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
147+
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
148148
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}",
149149
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":0},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}"
150150
};
@@ -200,8 +200,8 @@ void testCompatibilityFromJsonLackOfFieldId() {
200200
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
201201
"{\"type\":\"ARRAY\",\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
202202
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
203-
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\"},\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
204-
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
203+
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
204+
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
205205
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
206206
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
207207
};

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -625,8 +625,8 @@ void testProjectionAndAddColumnInLogTable() throws Exception {
625625

626626
// Currently, flink not supported push down nested row projection because
627627
// FlinkTableSource.supportsNestedProjection returns false.
628-
// Todo: supported nested row projection down in https://github.com/apache/fluss/issues/2311
629-
// later.
628+
// Todo: support nested row projection pushdown in
629+
// https://github.com/apache/fluss/issues/2311 later.
630630
String s = tEnv.explainSql("select id, simple_row.a, nested_row.y.z from row_log_test");
631631
assertThat(s)
632632
.contains(

0 commit comments

Comments
 (0)