Skip to content

Commit 905563e

Browse files
[Feature] Support for VARIANT data type
1 parent 96aee59 commit 905563e

File tree

64 files changed

+858
-10
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+858
-10
lines changed

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.converter;
1919

2020
import org.apache.fluss.row.GenericRow;
21+
import org.apache.fluss.row.Variant;
2122
import org.apache.fluss.types.DataType;
2223
import org.apache.fluss.types.DataTypeChecks;
2324
import org.apache.fluss.types.DecimalType;
@@ -119,6 +120,8 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
119120
case BINARY:
120121
case BYTES:
121122
return prop::read;
123+
case VARIANT:
124+
return (obj) -> convertVariantValue(prop, prop.read(obj));
122125
case CHAR:
123126
case STRING:
124127
return (obj) ->
@@ -165,6 +168,28 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
165168
}
166169
}
167170

171+
/**
172+
* Converts a byte array value from a POJO property to a Variant. If the value is already a
173+
* Variant, it is returned directly. If it is a byte[], it is interpreted as the combined format
174+
* [4-byte value length (big-endian)][value][metadata] and converted to a Variant.
175+
*/
176+
private static @Nullable Variant convertVariantValue(
177+
PojoType.Property prop, @Nullable Object v) {
178+
if (v == null) {
179+
return null;
180+
}
181+
if (v instanceof Variant) {
182+
return (Variant) v;
183+
}
184+
if (v instanceof byte[]) {
185+
return Variant.bytesToVariant((byte[]) v);
186+
}
187+
throw new IllegalArgumentException(
188+
String.format(
189+
"Field %s is not a byte[] or Variant. Cannot convert to Variant.",
190+
prop.name));
191+
}
192+
168193
private interface FieldToRow {
169194
Object readAndConvert(Object pojo) throws Exception;
170195
}

fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.row.InternalArray;
2121
import org.apache.fluss.row.InternalRow;
22+
import org.apache.fluss.row.Variant;
2223
import org.apache.fluss.types.ArrayType;
2324
import org.apache.fluss.types.DataType;
2425
import org.apache.fluss.types.DataTypeChecks;
@@ -140,6 +141,8 @@ private static RowToField createRowReader(DataType fieldType, PojoType.Property
140141
case BINARY:
141142
case BYTES:
142143
return InternalRow::getBytes;
144+
case VARIANT:
145+
return (row, pos) -> Variant.variantToBytes(row.getVariant(pos));
143146
case DECIMAL:
144147
DecimalType decimalType = (DecimalType) fieldType;
145148
return (row, pos) ->

fluss-common/src/main/java/org/apache/fluss/row/AbstractBinaryWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,22 @@ public void writeRow(int pos, InternalRow value, RowSerializer serializer) {
104104
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
105105
}
106106

107+
@Override
108+
public void writeVariant(int pos, Variant variant) {
109+
byte[] value = variant.value();
110+
byte[] metadata = variant.metadata();
111+
int totalSize = 4 + value.length + metadata.length;
112+
byte[] combined = new byte[totalSize];
113+
// Write value length as big-endian 4-byte integer
114+
combined[0] = (byte) ((value.length >> 24) & 0xFF);
115+
combined[1] = (byte) ((value.length >> 16) & 0xFF);
116+
combined[2] = (byte) ((value.length >> 8) & 0xFF);
117+
combined[3] = (byte) (value.length & 0xFF);
118+
System.arraycopy(value, 0, combined, 4, value.length);
119+
System.arraycopy(metadata, 0, combined, 4 + value.length, metadata.length);
120+
writeBytes(pos, combined);
121+
}
122+
107123
@Override
108124
public void writeChar(int pos, BinaryString value, int length) {
109125
// TODO: currently, we encoding CHAR(length) as the same with STRING, the length info can

fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.lang.reflect.Array;
2626

2727
import static org.apache.fluss.memory.MemoryUtils.UNSAFE;
28+
import static org.apache.fluss.row.Variant.bytesToVariant;
2829
import static org.apache.fluss.utils.Preconditions.checkArgument;
2930

3031
/**
@@ -82,6 +83,7 @@ public static int calculateFixLengthPartSize(DataType type) {
8283
case STRING:
8384
case BINARY:
8485
case BYTES:
86+
case VARIANT:
8587
case DECIMAL:
8688
case BIGINT:
8789
case DOUBLE:
@@ -233,6 +235,12 @@ public byte[] getBytes(int pos) {
233235
return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndSize);
234236
}
235237

238+
@Override
239+
public Variant getVariant(int pos) {
240+
byte[] bytes = getBytes(pos);
241+
return bytesToVariant(bytes);
242+
}
243+
236244
@Override
237245
public TimestampNtz getTimestampNtz(int pos, int precision) {
238246
assertIndexIsValid(pos);

fluss-common/src/main/java/org/apache/fluss/row/BinaryArrayWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ public static NullSetter createNullSetter(DataType elementType) {
213213
case STRING:
214214
case BINARY:
215215
case BYTES:
216+
case VARIANT:
216217
case DECIMAL:
217218
case BIGINT:
218219
case TIMESTAMP_WITHOUT_TIME_ZONE:

fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public interface BinaryWriter {
8484

8585
void writeRow(int pos, InternalRow value, RowSerializer serializer);
8686

87+
void writeVariant(int pos, Variant value);
88+
8789
/** Finally, complete write to set real size to binary. */
8890
void complete();
8991

@@ -135,6 +137,8 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(
135137
writer.writeBinary(pos, (byte[]) value, binaryLength);
136138
case BYTES:
137139
return (writer, pos, value) -> writer.writeBytes(pos, (byte[]) value);
140+
case VARIANT:
141+
return (writer, pos, value) -> writer.writeVariant(pos, (Variant) value);
138142
case DECIMAL:
139143
final int decimalPrecision = getPrecision(elementType);
140144
return (writer, pos, value) ->

fluss-common/src/main/java/org/apache/fluss/row/DataGetters.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ public interface DataGetters {
9292
/** Returns the binary value at the given position. */
9393
byte[] getBytes(int pos);
9494

95+
/** Returns the variant value at the given position. */
96+
Variant getVariant(int pos);
97+
9598
/** Returns the array value at the given position. */
9699
InternalArray getArray(int pos);
97100

fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,11 @@ public byte[] getBytes(int pos) {
213213
return (byte[]) getObject(pos);
214214
}
215215

216+
@Override
217+
public Variant getVariant(int pos) {
218+
return (Variant) getObject(pos);
219+
}
220+
216221
@Override
217222
public InternalArray getArray(int pos) {
218223
return (InternalArray) getObject(pos);

fluss-common/src/main/java/org/apache/fluss/row/GenericRow.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ public byte[] getBytes(int pos) {
166166
return (byte[]) this.fields[pos];
167167
}
168168

169+
@Override
170+
public Variant getVariant(int pos) {
171+
return (Variant) this.fields[pos];
172+
}
173+
169174
@Override
170175
public InternalArray getArray(int pos) {
171176
return (InternalArray) this.fields[pos];
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.row;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import java.io.Serializable;
23+
import java.util.Arrays;
24+
import java.util.Objects;
25+
26+
/**
27+
* An internal data structure implementing {@link Variant}.
28+
*
29+
* <p>A Variant consists of two byte arrays:
30+
*
31+
* <ul>
32+
* <li><b>value</b>: the binary-encoded variant value (header + data), supports nested JSON
33+
* structures.
34+
* <li><b>metadata</b>: the string dictionary (version number + deduplicated list of all object
35+
* key names).
36+
* </ul>
37+
*
38+
* @since 0.9
39+
*/
40+
@PublicEvolving
41+
public final class GenericVariant implements Variant, Serializable {
42+
43+
private static final long serialVersionUID = 1L;
44+
45+
private final byte[] value;
46+
private final byte[] metadata;
47+
48+
public GenericVariant(byte[] value, byte[] metadata) {
49+
this.value = value;
50+
this.metadata = metadata;
51+
}
52+
53+
@Override
54+
public byte[] value() {
55+
return value;
56+
}
57+
58+
@Override
59+
public byte[] metadata() {
60+
return metadata;
61+
}
62+
63+
@Override
64+
public long sizeInBytes() {
65+
return value.length + metadata.length;
66+
}
67+
68+
@Override
69+
public Variant copy() {
70+
return new GenericVariant(
71+
Arrays.copyOf(value, value.length), Arrays.copyOf(metadata, metadata.length));
72+
}
73+
74+
@Override
75+
public boolean equals(Object o) {
76+
if (this == o) {
77+
return true;
78+
}
79+
if (o == null || getClass() != o.getClass()) {
80+
return false;
81+
}
82+
GenericVariant that = (GenericVariant) o;
83+
return Objects.deepEquals(value, that.value) && Objects.deepEquals(metadata, that.metadata);
84+
}
85+
86+
@Override
87+
public int hashCode() {
88+
return Objects.hash(Arrays.hashCode(value), Arrays.hashCode(metadata));
89+
}
90+
91+
@Override
92+
public String toString() {
93+
return "GenericVariant{value="
94+
+ Arrays.toString(value)
95+
+ ", metadata="
96+
+ Arrays.toString(metadata)
97+
+ "}";
98+
}
99+
}

0 commit comments

Comments
 (0)