Skip to content

Commit 8949e01

Browse files
ZouxxyyJingsongLi
authored andcommitted
[core] Use write null for uncompact decimal and timestamp in InternalRowSerialize (#5483)
1 parent 715d9a0 commit 8949e01

File tree

2 files changed

+51
-1
lines changed

2 files changed

+51
-1
lines changed

paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,20 @@
2424
import org.apache.paimon.data.BinaryRowWriter;
2525
import org.apache.paimon.data.BinaryWriter;
2626
import org.apache.paimon.data.BinaryWriter.ValueSetter;
27+
import org.apache.paimon.data.Decimal;
2728
import org.apache.paimon.data.GenericRow;
2829
import org.apache.paimon.data.InternalRow;
2930
import org.apache.paimon.data.InternalRow.FieldGetter;
3031
import org.apache.paimon.data.NestedRow;
32+
import org.apache.paimon.data.Timestamp;
3133
import org.apache.paimon.io.DataInputView;
3234
import org.apache.paimon.io.DataOutputView;
3335
import org.apache.paimon.types.DataType;
36+
import org.apache.paimon.types.DataTypeChecks;
37+
import org.apache.paimon.types.DecimalType;
38+
import org.apache.paimon.types.LocalZonedTimestampType;
3439
import org.apache.paimon.types.RowType;
40+
import org.apache.paimon.types.TimestampType;
3541

3642
import java.io.IOException;
3743
import java.util.Arrays;
@@ -46,6 +52,7 @@ public class InternalRowSerializer extends AbstractRowDataSerializer<InternalRow
4652
private final Serializer[] fieldSerializers;
4753
private final FieldGetter[] fieldGetters;
4854
private final ValueSetter[] valueSetters;
55+
private final boolean[] writeNulls;
4956

5057
private transient BinaryRow reuseRow;
5158
private transient BinaryRowWriter reuseWriter;
@@ -70,11 +77,18 @@ public InternalRowSerializer(DataType[] types, Serializer<?>[] fieldSerializers)
7077
this.binarySerializer = new BinaryRowSerializer(types.length);
7178
this.fieldGetters = new FieldGetter[types.length];
7279
this.valueSetters = new ValueSetter[types.length];
80+
this.writeNulls = new boolean[types.length];
7381
for (int i = 0; i < types.length; i++) {
7482
DataType type = types[i];
7583
fieldGetters[i] = InternalRow.createFieldGetter(type, i);
7684
// pass serializer to avoid infinite loop
7785
valueSetters[i] = BinaryWriter.createValueSetter(type, fieldSerializers[i]);
86+
// see reference: org.apache.paimon.codegen.GenerateUtils.binaryWriterWriteNull
87+
if (type instanceof DecimalType) {
88+
writeNulls[i] = !Decimal.isCompact(DataTypeChecks.getPrecision(type));
89+
} else if (type instanceof TimestampType || type instanceof LocalZonedTimestampType) {
90+
writeNulls[i] = !Timestamp.isCompact(DataTypeChecks.getPrecision(type));
91+
}
7892
}
7993
}
8094

@@ -157,7 +171,7 @@ public BinaryRow toBinaryRow(InternalRow row) {
157171
reuseWriter.writeRowKind(row.getRowKind());
158172
for (int i = 0; i < types.length; i++) {
159173
Object field = fieldGetters[i].getFieldOrNull(row);
160-
if (field == null) {
174+
if (field == null && !writeNulls[i]) {
161175
reuseWriter.setNullAt(i);
162176
} else {
163177
valueSetters[i].setValue(reuseWriter, i, field);

paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818

1919
package org.apache.paimon.table.sink;
2020

21+
import org.apache.paimon.data.BinaryRow;
2122
import org.apache.paimon.data.GenericRow;
2223
import org.apache.paimon.data.InternalRow;
24+
import org.apache.paimon.data.serializer.InternalRowSerializer;
2325
import org.apache.paimon.schema.TableSchema;
2426
import org.apache.paimon.types.DataField;
27+
import org.apache.paimon.types.DecimalType;
2528
import org.apache.paimon.types.IntType;
29+
import org.apache.paimon.types.LocalZonedTimestampType;
2630
import org.apache.paimon.types.RowType;
31+
import org.apache.paimon.types.TimestampType;
2732

2833
import org.junit.jupiter.api.Test;
2934

@@ -32,9 +37,11 @@
3237
import java.util.HashMap;
3338
import java.util.List;
3439
import java.util.Map;
40+
import java.util.concurrent.ThreadLocalRandom;
3541

3642
import static org.apache.paimon.CoreOptions.BUCKET;
3743
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
44+
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
3845
import static org.assertj.core.api.Assertions.assertThat;
3946
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4047

@@ -71,6 +78,30 @@ public void testIllegalBucket() {
7178
.hasMessageContaining("Num bucket is illegal");
7279
}
7380

81+
@Test
82+
public void testUnCompactDecimalAndTimestampNullValueBucketNumber() {
83+
GenericRow row = GenericRow.of(null, null, null, 1);
84+
int bucketNum = ThreadLocalRandom.current().nextInt(1, Integer.MAX_VALUE);
85+
86+
RowType rowType =
87+
new RowType(
88+
Arrays.asList(
89+
new DataField(0, "d", new DecimalType(38, 18)),
90+
new DataField(1, "ltz", new LocalZonedTimestampType()),
91+
new DataField(2, "ntz", new TimestampType()),
92+
new DataField(3, "k", new IntType())));
93+
94+
String[] bucketColsToTest = {"d", "ltz", "ntz"};
95+
for (String bucketCol : bucketColsToTest) {
96+
FixedBucketRowKeyExtractor extractor = extractor(rowType, "", bucketCol, "", bucketNum);
97+
BinaryRow binaryRow =
98+
new InternalRowSerializer(rowType.project(bucketCol)).toBinaryRow(row);
99+
assertThat(bucket(extractor, row))
100+
.isEqualTo(
101+
KeyAndBucketExtractor.bucket(bucketKeyHashCode(binaryRow), bucketNum));
102+
}
103+
}
104+
74105
private int bucket(FixedBucketRowKeyExtractor extractor, InternalRow row) {
75106
extractor.setRecord(row);
76107
return extractor.bucket();
@@ -92,6 +123,11 @@ private FixedBucketRowKeyExtractor extractor(
92123
new DataField(0, "a", new IntType()),
93124
new DataField(1, "b", new IntType()),
94125
new DataField(2, "c", new IntType())));
126+
return extractor(rowType, partK, bk, pk, numBucket);
127+
}
128+
129+
private FixedBucketRowKeyExtractor extractor(
130+
RowType rowType, String partK, String bk, String pk, int numBucket) {
95131
List<DataField> fields = TableSchema.newFields(rowType);
96132
Map<String, String> options = new HashMap<>();
97133
options.put(BUCKET_KEY.key(), bk);

0 commit comments

Comments
 (0)