Skip to content

Commit 0122c24

Browse files
committed
Refactor bucket functions.
1 parent f201bbf commit 0122c24

File tree

7 files changed

+523
-196
lines changed

7 files changed

+523
-196
lines changed

paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,20 @@
2424
import org.apache.paimon.data.BinaryRowWriter;
2525
import org.apache.paimon.data.BinaryWriter;
2626
import org.apache.paimon.data.BinaryWriter.ValueSetter;
27+
import org.apache.paimon.data.Decimal;
2728
import org.apache.paimon.data.GenericRow;
2829
import org.apache.paimon.data.InternalRow;
2930
import org.apache.paimon.data.InternalRow.FieldGetter;
3031
import org.apache.paimon.data.NestedRow;
32+
import org.apache.paimon.data.Timestamp;
3133
import org.apache.paimon.io.DataInputView;
3234
import org.apache.paimon.io.DataOutputView;
3335
import org.apache.paimon.types.DataType;
36+
import org.apache.paimon.types.DataTypeChecks;
37+
import org.apache.paimon.types.DecimalType;
38+
import org.apache.paimon.types.LocalZonedTimestampType;
3439
import org.apache.paimon.types.RowType;
40+
import org.apache.paimon.types.TimestampType;
3541

3642
import java.io.IOException;
3743
import java.util.Arrays;
@@ -46,10 +52,20 @@ public class InternalRowSerializer extends AbstractRowDataSerializer<InternalRow
4652
private final Serializer[] fieldSerializers;
4753
private final FieldGetter[] fieldGetters;
4854
private final ValueSetter[] valueSetters;
55+
private final boolean[] writeNulls;
4956

5057
private transient BinaryRow reuseRow;
5158
private transient BinaryRowWriter reuseWriter;
5259

60+
public InternalRowSerializer(RowType rowType, boolean isBucketKeySerializer) {
61+
this(
62+
rowType.getFieldTypes().toArray(new DataType[0]),
63+
rowType.getFieldTypes().stream()
64+
.map(InternalSerializers::create)
65+
.toArray(Serializer[]::new),
66+
isBucketKeySerializer);
67+
}
68+
5369
public InternalRowSerializer(RowType rowType) {
5470
this(
5571
rowType.getFieldTypes().toArray(new DataType[0]),
@@ -65,16 +81,34 @@ public InternalRowSerializer(DataType... types) {
6581
}
6682

6783
public InternalRowSerializer(DataType[] types, Serializer<?>[] fieldSerializers) {
84+
this(types, fieldSerializers, false);
85+
}
86+
87+
private InternalRowSerializer(
88+
DataType[] types, Serializer<?>[] fieldSerializers, boolean isBucketKeySerializer) {
6889
this.types = types;
6990
this.fieldSerializers = fieldSerializers;
7091
this.binarySerializer = new BinaryRowSerializer(types.length);
7192
this.fieldGetters = new FieldGetter[types.length];
7293
this.valueSetters = new ValueSetter[types.length];
94+
this.writeNulls = new boolean[types.length];
95+
Arrays.fill(writeNulls, true);
7396
for (int i = 0; i < types.length; i++) {
7497
DataType type = types[i];
7598
fieldGetters[i] = InternalRow.createFieldGetter(type, i);
7699
// pass serializer to avoid infinite loop
77100
valueSetters[i] = BinaryWriter.createValueSetter(type, fieldSerializers[i]);
101+
102+
// BucketKey projection use different write logic for non-compact type nulls,
103+
// reference: org.apache.paimon.codegen.GenerateUtils.binaryWriterWriteNull
104+
if (isBucketKeySerializer) {
105+
if (type instanceof DecimalType) {
106+
writeNulls[i] = Decimal.isCompact(DataTypeChecks.getPrecision(type));
107+
} else if (type instanceof TimestampType
108+
|| type instanceof LocalZonedTimestampType) {
109+
writeNulls[i] = Timestamp.isCompact(DataTypeChecks.getPrecision(type));
110+
}
111+
}
78112
}
79113
}
80114

@@ -157,7 +191,7 @@ public BinaryRow toBinaryRow(InternalRow row) {
157191
reuseWriter.writeRowKind(row.getRowKind());
158192
for (int i = 0; i < types.length; i++) {
159193
Object field = fieldGetters[i].getFieldOrNull(row);
160-
if (field == null) {
194+
if (field == null && writeNulls[i]) {
161195
reuseWriter.setNullAt(i);
162196
} else {
163197
valueSetters[i].setValue(reuseWriter, i, field);

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,13 @@
4040
import org.apache.spark.sql.types.TimestampNTZType;
4141
import org.apache.spark.sql.types.TimestampType;
4242

43+
import java.io.Serializable;
4344
import java.math.BigDecimal;
4445

4546
/** Wrapper to fetch value from the spark internal row. */
46-
public class SparkInternalRowWrapper implements InternalRow {
47+
public class SparkInternalRowWrapper implements InternalRow, Serializable {
4748

48-
private org.apache.spark.sql.catalyst.InternalRow internalRow;
49+
private transient org.apache.spark.sql.catalyst.InternalRow internalRow;
4950
private final int length;
5051
private final int rowKindIdx;
5152
private final StructType structType;

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java

Lines changed: 0 additions & 169 deletions
This file was deleted.

0 commit comments

Comments
 (0)