Skip to content

Commit cd5829f

Browse files
authored
[core] Use write null for uncompact decimal and timestamp in InternalRowSerialize (#5483)
1 parent 94e5265 commit cd5829f

File tree

2 files changed

+51
-1
lines changed

2 files changed

+51
-1
lines changed

paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,20 @@
2424
import org.apache.paimon.data.BinaryRowWriter;
2525
import org.apache.paimon.data.BinaryWriter;
2626
import org.apache.paimon.data.BinaryWriter.ValueSetter;
27+
import org.apache.paimon.data.Decimal;
2728
import org.apache.paimon.data.GenericRow;
2829
import org.apache.paimon.data.InternalRow;
2930
import org.apache.paimon.data.InternalRow.FieldGetter;
3031
import org.apache.paimon.data.NestedRow;
32+
import org.apache.paimon.data.Timestamp;
3133
import org.apache.paimon.io.DataInputView;
3234
import org.apache.paimon.io.DataOutputView;
3335
import org.apache.paimon.types.DataType;
36+
import org.apache.paimon.types.DataTypeChecks;
37+
import org.apache.paimon.types.DecimalType;
38+
import org.apache.paimon.types.LocalZonedTimestampType;
3439
import org.apache.paimon.types.RowType;
40+
import org.apache.paimon.types.TimestampType;
3541

3642
import java.io.IOException;
3743
import java.util.Arrays;
@@ -46,6 +52,7 @@ public class InternalRowSerializer extends AbstractRowDataSerializer<InternalRow
4652
private final Serializer[] fieldSerializers;
4753
private final FieldGetter[] fieldGetters;
4854
private final ValueSetter[] valueSetters;
55+
private final boolean[] writeNulls;
4956

5057
private transient BinaryRow reuseRow;
5158
private transient BinaryRowWriter reuseWriter;
@@ -70,11 +77,18 @@ public InternalRowSerializer(DataType[] types, Serializer<?>[] fieldSerializers)
7077
this.binarySerializer = new BinaryRowSerializer(types.length);
7178
this.fieldGetters = new FieldGetter[types.length];
7279
this.valueSetters = new ValueSetter[types.length];
80+
this.writeNulls = new boolean[types.length];
7381
for (int i = 0; i < types.length; i++) {
7482
DataType type = types[i];
7583
fieldGetters[i] = InternalRow.createFieldGetter(type, i);
7684
// pass serializer to avoid infinite loop
7785
valueSetters[i] = BinaryWriter.createValueSetter(type, fieldSerializers[i]);
86+
// see reference: org.apache.paimon.codegen.GenerateUtils.binaryWriterWriteNull
87+
if (type instanceof DecimalType) {
88+
writeNulls[i] = !Decimal.isCompact(DataTypeChecks.getPrecision(type));
89+
} else if (type instanceof TimestampType || type instanceof LocalZonedTimestampType) {
90+
writeNulls[i] = !Timestamp.isCompact(DataTypeChecks.getPrecision(type));
91+
}
7892
}
7993
}
8094

@@ -157,7 +171,7 @@ public BinaryRow toBinaryRow(InternalRow row) {
157171
reuseWriter.writeRowKind(row.getRowKind());
158172
for (int i = 0; i < types.length; i++) {
159173
Object field = fieldGetters[i].getFieldOrNull(row);
160-
if (field == null) {
174+
if (field == null && !writeNulls[i]) {
161175
reuseWriter.setNullAt(i);
162176
} else {
163177
valueSetters[i].setValue(reuseWriter, i, field);

paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818

1919
package org.apache.paimon.table.sink;
2020

21+
import org.apache.paimon.data.BinaryRow;
2122
import org.apache.paimon.data.GenericRow;
2223
import org.apache.paimon.data.InternalRow;
24+
import org.apache.paimon.data.serializer.InternalRowSerializer;
2325
import org.apache.paimon.schema.TableSchema;
2426
import org.apache.paimon.types.DataField;
27+
import org.apache.paimon.types.DecimalType;
2528
import org.apache.paimon.types.IntType;
29+
import org.apache.paimon.types.LocalZonedTimestampType;
2630
import org.apache.paimon.types.RowType;
31+
import org.apache.paimon.types.TimestampType;
2732

2833
import org.junit.jupiter.api.Test;
2934

@@ -32,9 +37,11 @@
3237
import java.util.HashMap;
3338
import java.util.List;
3439
import java.util.Map;
40+
import java.util.concurrent.ThreadLocalRandom;
3541

3642
import static org.apache.paimon.CoreOptions.BUCKET;
3743
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
44+
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
3845
import static org.assertj.core.api.Assertions.assertThat;
3946
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4047

@@ -70,6 +77,30 @@ public void testIllegalBucket() {
7077
assertThatThrownBy(() -> bucket(extractor("", "", "a", -1), row));
7178
}
7279

80+
@Test
81+
public void testUnCompactDecimalAndTimestampNullValueBucketNumber() {
82+
GenericRow row = GenericRow.of(null, null, null, 1);
83+
int bucketNum = ThreadLocalRandom.current().nextInt(1, Integer.MAX_VALUE);
84+
85+
RowType rowType =
86+
new RowType(
87+
Arrays.asList(
88+
new DataField(0, "d", new DecimalType(38, 18)),
89+
new DataField(1, "ltz", new LocalZonedTimestampType()),
90+
new DataField(2, "ntz", new TimestampType()),
91+
new DataField(3, "k", new IntType())));
92+
93+
String[] bucketColsToTest = {"d", "ltz", "ntz"};
94+
for (String bucketCol : bucketColsToTest) {
95+
FixedBucketRowKeyExtractor extractor = extractor(rowType, "", bucketCol, "", bucketNum);
96+
BinaryRow binaryRow =
97+
new InternalRowSerializer(rowType.project(bucketCol)).toBinaryRow(row);
98+
assertThat(bucket(extractor, row))
99+
.isEqualTo(
100+
KeyAndBucketExtractor.bucket(bucketKeyHashCode(binaryRow), bucketNum));
101+
}
102+
}
103+
73104
private int bucket(FixedBucketRowKeyExtractor extractor, InternalRow row) {
74105
extractor.setRecord(row);
75106
return extractor.bucket();
@@ -91,6 +122,11 @@ private FixedBucketRowKeyExtractor extractor(
91122
new DataField(0, "a", new IntType()),
92123
new DataField(1, "b", new IntType()),
93124
new DataField(2, "c", new IntType())));
125+
return extractor(rowType, partK, bk, pk, numBucket);
126+
}
127+
128+
private FixedBucketRowKeyExtractor extractor(
129+
RowType rowType, String partK, String bk, String pk, int numBucket) {
94130
List<DataField> fields = TableSchema.newFields(rowType);
95131
Map<String, String> options = new HashMap<>();
96132
options.put(BUCKET_KEY.key(), bk);

0 commit comments

Comments
 (0)