Skip to content

Commit 86551fa

Browse files
[fluss-client] Support Complex Data Types on the Java Typed API (NestedRow/ROW)
- Add ROW type support in PojoToRowConverter (POJO -> GenericRow) - Add ROW type support in RowToPojoConverter (InternalRow -> POJO) - Add ROW type validation in ConverterCommons - Add ROW element support in PojoTypeToFlussTypeConverter (for Array/Map) - Add ROW element support in FlussArrayToPojoArray (for Array<ROW>) - Add comprehensive tests for nested ROW, deeply nested ROW, ROW with Array/Map fields, Array<ROW>, Map<K, ROW>, null handling, and validation - Update java-client.md documentation with ROW type mapping and usage examples
1 parent 0825d68 commit 86551fa

File tree

8 files changed

+560
-3
lines changed

8 files changed

+560
-3
lines changed

fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,22 @@ static void validateCompatibility(DataType fieldType, PojoType.Property prop) {
133133
return;
134134
}
135135

136+
if (typeRoot == DataTypeRoot.ROW) {
137+
// ROW type maps to a nested POJO. The POJO class must be a valid POJO (public class
138+
// with public default constructor). Detailed field-level validation is deferred to
139+
// the nested PojoToRowConverter / RowToPojoConverter.
140+
if (actual.isPrimitive()
141+
|| actual.isArray()
142+
|| Collection.class.isAssignableFrom(actual)
143+
|| Map.class.isAssignableFrom(actual)) {
144+
throw new IllegalArgumentException(
145+
String.format(
146+
"Field '%s' must be a POJO class for ROW type, got %s",
147+
prop.name, actual.getName()));
148+
}
149+
return;
150+
}
151+
136152
Set<Class<?>> supported = SUPPORTED_TYPES.get(fieldType.getTypeRoot());
137153
if (supported == null) {
138154
throw new UnsupportedOperationException(

fluss-client/src/main/java/org/apache/fluss/client/converter/FlussArrayToPojoArray.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020

2121
import org.apache.fluss.row.Decimal;
2222
import org.apache.fluss.row.InternalArray;
23+
import org.apache.fluss.row.InternalRow;
2324
import org.apache.fluss.types.ArrayType;
2425
import org.apache.fluss.types.DataType;
2526
import org.apache.fluss.types.DataTypeChecks;
2627
import org.apache.fluss.types.DecimalType;
2728
import org.apache.fluss.types.MapType;
29+
import org.apache.fluss.types.RowType;
2830

2931
import java.lang.reflect.Array;
3032
import java.time.Instant;
@@ -169,6 +171,24 @@ private static ElementConverter buildElementConverter(
169171
return (arr, i) ->
170172
new FlussMapToPojoMap(arr.getMap(i), (MapType) elementType, fieldName)
171173
.convertMap();
174+
case ROW:
175+
{
176+
RowType nestedRowType = (RowType) elementType;
177+
int nestedFieldCount = nestedRowType.getFieldCount();
178+
if (pojoType == Object.class) {
179+
// When the target type is unknown (e.g. ROW values in a Map),
180+
// return InternalRow directly
181+
return (arr, i) -> arr.getRow(i, nestedFieldCount);
182+
}
183+
@SuppressWarnings("unchecked")
184+
RowToPojoConverter<Object> nestedConverter =
185+
RowToPojoConverter.of(
186+
(Class<Object>) pojoType, nestedRowType, nestedRowType);
187+
return (arr, i) -> {
188+
InternalRow nestedRow = arr.getRow(i, nestedFieldCount);
189+
return nestedRow == null ? null : nestedConverter.fromRow(nestedRow);
190+
};
191+
}
172192
default:
173193
throw new UnsupportedOperationException(
174194
String.format(

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,18 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
157157
new PojoMapToFlussMap(
158158
(Map<?, ?>) prop.read(obj), (MapType) fieldType, prop.name)
159159
.convertMap();
160+
case ROW:
161+
{
162+
RowType nestedRowType = (RowType) fieldType;
163+
@SuppressWarnings("unchecked")
164+
PojoToRowConverter<Object> nestedConverter =
165+
PojoToRowConverter.of(
166+
(Class<Object>) prop.type, nestedRowType, nestedRowType);
167+
return (obj) -> {
168+
Object nested = prop.read(obj);
169+
return nested == null ? null : nestedConverter.toRow(nested);
170+
};
171+
}
160172
default:
161173
throw new UnsupportedOperationException(
162174
String.format(

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoTypeToFlussTypeConverter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.fluss.types.DataTypeChecks;
2727
import org.apache.fluss.types.DecimalType;
2828
import org.apache.fluss.types.MapType;
29+
import org.apache.fluss.types.RowType;
2930

3031
import javax.annotation.Nullable;
3132

@@ -260,6 +261,15 @@ private static int truncateNanos(int nanos, int precision) {
260261
case MAP:
261262
return new PojoMapToFlussMap((Map<?, ?>) obj, (MapType) elementType, fieldName)
262263
.convertMap();
264+
case ROW:
265+
{
266+
RowType nestedRowType = (RowType) elementType;
267+
@SuppressWarnings("unchecked")
268+
PojoToRowConverter<Object> nestedConverter =
269+
PojoToRowConverter.of(
270+
(Class<Object>) obj.getClass(), nestedRowType, nestedRowType);
271+
return nestedConverter.toRow(obj);
272+
}
263273
default:
264274
throw new UnsupportedOperationException(
265275
String.format(

fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,19 @@ private static RowToField createRowReader(DataType fieldType, PojoType.Property
196196
return (row, pos) ->
197197
new FlussMapToPojoMap(row.getMap(pos), (MapType) fieldType, prop.name)
198198
.convertMap();
199+
case ROW:
200+
{
201+
RowType nestedRowType = (RowType) fieldType;
202+
int nestedFieldCount = nestedRowType.getFieldCount();
203+
@SuppressWarnings("unchecked")
204+
RowToPojoConverter<Object> nestedConverter =
205+
RowToPojoConverter.of(
206+
(Class<Object>) prop.type, nestedRowType, nestedRowType);
207+
return (row, pos) -> {
208+
InternalRow nestedRow = row.getRow(pos, nestedFieldCount);
209+
return nestedRow == null ? null : nestedConverter.fromRow(nestedRow);
210+
};
211+
}
199212
default:
200213
throw new UnsupportedOperationException(
201214
String.format(

0 commit comments

Comments
 (0)