Skip to content

Commit 50d658d

Browse files
authored
[iceberg] Improved ByteBuffer string conversion for Iceberg manifests (#5008)
1 parent 5135d15 commit 50d658d

File tree

3 files changed

+299
-1
lines changed

3 files changed

+299
-1
lines changed

paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,19 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) {
7373
case VARCHAR:
7474
CharBuffer buffer = CharBuffer.wrap(value.toString());
7575
try {
76-
return ENCODER.get().encode(buffer);
76+
ByteBuffer encoded = ENCODER.get().encode(buffer);
77+
// ByteBuffer and CharBuffer allocate space based on capacity
78+
// not actual content length. so we need to create a new ByteBuffer
79+
// with the exact length of the encoded content
80+
// to avoid padding the output with \u0000
81+
if (encoded.limit() != encoded.capacity()) {
82+
ByteBuffer exact = ByteBuffer.allocate(encoded.limit());
83+
encoded.position(0);
84+
exact.put(encoded);
85+
exact.flip();
86+
return exact;
87+
}
88+
return encoded;
7789
} catch (CharacterCodingException e) {
7890
throw new RuntimeException("Failed to encode value as UTF-8: " + value, e);
7991
}

paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@
5353
import org.apache.paimon.types.RowKind;
5454
import org.apache.paimon.types.RowType;
5555

56+
import org.apache.avro.file.DataFileReader;
57+
import org.apache.avro.file.SeekableFileInput;
58+
import org.apache.avro.generic.GenericData;
59+
import org.apache.avro.generic.GenericDatumReader;
60+
import org.apache.avro.generic.GenericRecord;
5661
import org.apache.hadoop.conf.Configuration;
5762
import org.apache.iceberg.catalog.TableIdentifier;
5863
import org.apache.iceberg.data.IcebergGenerics;
@@ -63,7 +68,10 @@
6368
import org.junit.jupiter.api.Test;
6469
import org.junit.jupiter.api.io.TempDir;
6570

71+
import java.io.File;
6672
import java.math.BigDecimal;
73+
import java.nio.ByteBuffer;
74+
import java.nio.charset.StandardCharsets;
6775
import java.time.LocalDate;
6876
import java.time.LocalDateTime;
6977
import java.util.ArrayList;
@@ -595,6 +603,103 @@ public void testNestedTypes() throws Exception {
595603
"Record(2, {20=[Record(cherry, 200), Record(pear, 201)]})");
596604
}
597605

606+
@Test
607+
public void testStringPartitionNullPadding() throws Exception {
608+
RowType rowType =
609+
RowType.of(
610+
new DataType[] {DataTypes.INT(), DataTypes.VARCHAR(20)},
611+
new String[] {"k", "country"});
612+
FileStoreTable table =
613+
createPaimonTable(
614+
rowType,
615+
Collections.singletonList("country"),
616+
Collections.singletonList("k"),
617+
-1);
618+
619+
String commitUser = UUID.randomUUID().toString();
620+
TableWriteImpl<?> write = table.newWrite(commitUser);
621+
TableCommitImpl commit = table.newCommit(commitUser);
622+
623+
write.write(GenericRow.of(1, BinaryString.fromString("Switzerland")), 1);
624+
write.write(GenericRow.of(2, BinaryString.fromString("Australia")), 1);
625+
write.write(GenericRow.of(3, BinaryString.fromString("Brazil")), 1);
626+
write.write(GenericRow.of(4, BinaryString.fromString("Grand Duchy of Luxembourg")), 1);
627+
commit.commit(1, write.prepareCommit(false, 1));
628+
assertThat(getIcebergResult())
629+
.containsExactlyInAnyOrder(
630+
"Record(1, Switzerland)",
631+
"Record(2, Australia)",
632+
"Record(3, Brazil)",
633+
"Record(4, Grand Duchy of Luxembourg)");
634+
635+
FileIO fileIO = table.fileIO();
636+
IcebergMetadata metadata =
637+
IcebergMetadata.fromPath(
638+
fileIO, new Path(table.location(), "metadata/v1.metadata.json"));
639+
640+
IcebergPathFactory pathFactory =
641+
new IcebergPathFactory(new Path(table.location(), "metadata"));
642+
IcebergManifestList manifestList = IcebergManifestList.create(table, pathFactory);
643+
String currentSnapshotManifest = metadata.currentSnapshot().manifestList();
644+
645+
File snapShotAvroFile = new File(currentSnapshotManifest);
646+
String expectedPartitionSummary =
647+
"[{\"contains_null\": false, \"contains_nan\": false, \"lower_bound\": \"Australia\", \"upper_bound\": \"Switzerland\"}]";
648+
try (DataFileReader<GenericRecord> dataFileReader =
649+
new DataFileReader<>(
650+
new SeekableFileInput(snapShotAvroFile), new GenericDatumReader<>())) {
651+
while (dataFileReader.hasNext()) {
652+
GenericRecord record = dataFileReader.next();
653+
String partitionSummary = record.get("partitions").toString();
654+
assertThat(partitionSummary).doesNotContain("\\u0000");
655+
assertThat(partitionSummary).isEqualTo(expectedPartitionSummary);
656+
}
657+
}
658+
659+
String tableManifest = manifestList.read(snapShotAvroFile.getName()).get(0).manifestPath();
660+
661+
try (DataFileReader<GenericRecord> dataFileReader =
662+
new DataFileReader<>(
663+
new SeekableFileInput(new File(tableManifest)),
664+
new GenericDatumReader<>())) {
665+
666+
while (dataFileReader.hasNext()) {
667+
GenericRecord record = dataFileReader.next();
668+
GenericRecord dataFile = (GenericRecord) record.get("data_file");
669+
670+
// Check lower bounds
671+
GenericData.Array<?> lowerBounds =
672+
(GenericData.Array<?>) dataFile.get("lower_bounds");
673+
if (lowerBounds != null) {
674+
for (Object bound : lowerBounds) {
675+
GenericRecord boundRecord = (GenericRecord) bound;
676+
int key = (Integer) boundRecord.get("key");
677+
if (key == 1) { // key = 1 is the partition key
678+
ByteBuffer value = (ByteBuffer) boundRecord.get("value");
679+
String boundValue = new String(value.array(), StandardCharsets.UTF_8);
680+
assertThat(boundValue).doesNotContain("\u0000");
681+
}
682+
}
683+
}
684+
685+
// Check upper bounds
686+
GenericData.Array<?> upperBounds =
687+
(GenericData.Array<?>) dataFile.get("upper_bounds");
688+
if (upperBounds != null) {
689+
for (Object bound : upperBounds) {
690+
GenericRecord boundRecord = (GenericRecord) bound;
691+
int key = (Integer) boundRecord.get("key");
692+
if (key == 1) { // key = 1 is the partition key
693+
ByteBuffer value = (ByteBuffer) boundRecord.get("value");
694+
String boundValue = new String(value.array(), StandardCharsets.UTF_8);
695+
assertThat(boundValue).doesNotContain("\u0000");
696+
}
697+
}
698+
}
699+
}
700+
}
701+
}
702+
598703
// ------------------------------------------------------------------------
599704
// Random Tests
600705
// ------------------------------------------------------------------------
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.iceberg.manifest;
20+
21+
import org.apache.paimon.types.DataTypes;
22+
23+
import org.junit.jupiter.api.DisplayName;
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.Arguments;
27+
import org.junit.jupiter.params.provider.MethodSource;
28+
29+
import java.nio.ByteBuffer;
30+
import java.nio.charset.StandardCharsets;
31+
import java.util.stream.Stream;
32+
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
35+
36+
class IcebergConversionsVarcharTest {
37+
38+
@Test
39+
void testEmptyString() {
40+
String empty = "";
41+
ByteBuffer result = IcebergConversions.toByteBuffer(DataTypes.VARCHAR(10), empty);
42+
String decodedString = new String(result.array(), StandardCharsets.UTF_8);
43+
assertThat(result.array()).isEmpty();
44+
assertThat(empty).isEqualTo(decodedString);
45+
}
46+
47+
@Test
48+
void testNullHandling() {
49+
assertThatThrownBy(() -> IcebergConversions.toByteBuffer(DataTypes.VARCHAR(10), null))
50+
.isInstanceOf(NullPointerException.class);
51+
}
52+
53+
@ParameterizedTest
54+
@MethodSource("provideSpecialStrings")
55+
@DisplayName("Test special string cases")
56+
void testSpecialStrings(String input) {
57+
ByteBuffer result = IcebergConversions.toByteBuffer(DataTypes.VARCHAR(100), input);
58+
String decoded = new String(result.array(), 0, result.limit(), StandardCharsets.UTF_8);
59+
assertThat(decoded).isEqualTo(input);
60+
}
61+
62+
private static Stream<Arguments> provideSpecialStrings() {
63+
return Stream.of(
64+
Arguments.of("Hello\u0000World"), // Embedded null
65+
Arguments.of("\n\r\t"), // Control characters
66+
Arguments.of(" "), // Single space
67+
Arguments.of(" "), // Multiple spaces
68+
Arguments.of("①②③"), // Unicode numbers
69+
Arguments.of("🌟🌞🌝"), // Emojis
70+
Arguments.of("Hello\uD83D\uDE00World"), // Surrogate pairs
71+
Arguments.of("\uFEFF"), // Byte Order Mark
72+
Arguments.of("Hello\\World"), // Backslashes
73+
Arguments.of("Hello\"World"), // Quotes
74+
Arguments.of("Hello'World"), // Single quotes
75+
Arguments.of("Hello\bWorld"), // Backspace
76+
Arguments.of("Hello\fWorld") // Form feed
77+
);
78+
}
79+
80+
@ParameterizedTest
81+
@MethodSource("provideLongStrings")
82+
void testLongStrings(String input) {
83+
ByteBuffer result =
84+
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length()), input);
85+
String decoded = new String(result.array(), 0, result.limit(), StandardCharsets.UTF_8);
86+
assertThat(decoded).isEqualTo(input).hasSize(input.length());
87+
}
88+
89+
private static Stream<Arguments> provideLongStrings() {
90+
return Stream.of(
91+
Arguments.of(createString(1)),
92+
Arguments.of(createString(10)),
93+
Arguments.of(createString(100)),
94+
Arguments.of(createString(1000)),
95+
Arguments.of(createString(10000)));
96+
}
97+
98+
private static String createString(int length) {
99+
StringBuilder sb = new StringBuilder(length);
100+
for (int i = 0; i < length; i++) {
101+
sb.append('a');
102+
}
103+
return sb.toString();
104+
}
105+
106+
@Test
107+
void testMultiByteCharacters() {
108+
String[] inputs = {
109+
"中文", // Chinese
110+
"한글", // Korean
111+
"日本語", // Japanese
112+
"🌟", // Emoji (4 bytes)
113+
"Café", // Latin-1 Supplement
114+
"Привет", // Cyrillic
115+
"שָׁלוֹם", // Hebrew with combining marks
116+
"ᄀᄁᄂᄃᄄ", // Hangul Jamo
117+
"बहुत बढ़िया", // Devanagari
118+
"العربية" // Arabic
119+
};
120+
121+
for (String input : inputs) {
122+
ByteBuffer result =
123+
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length() * 4), input);
124+
String decoded = new String(result.array(), 0, result.limit(), StandardCharsets.UTF_8);
125+
assertThat(decoded).isEqualTo(input);
126+
assertThat(result.limit()).isGreaterThanOrEqualTo(input.length());
127+
}
128+
}
129+
130+
@Test
131+
void testBufferProperties() {
132+
String input = "Hello, World!";
133+
ByteBuffer result =
134+
IcebergConversions.toByteBuffer(DataTypes.VARCHAR(input.length()), input);
135+
136+
assertThat(result.limit()).isEqualTo(result.array().length);
137+
assertThat(containsTrailingZeros(result)).isFalse();
138+
}
139+
140+
@Test
141+
void testConcurrentAccess() throws InterruptedException {
142+
int threadCount = 10;
143+
Thread[] threads = new Thread[threadCount];
144+
String[] inputs = new String[threadCount];
145+
ByteBuffer[] results = new ByteBuffer[threadCount];
146+
147+
for (int i = 0; i < threadCount; i++) {
148+
final int index = i;
149+
inputs[index] = "Thread" + index;
150+
threads[index] =
151+
new Thread(
152+
() -> {
153+
results[index] =
154+
IcebergConversions.toByteBuffer(
155+
DataTypes.VARCHAR(inputs[index].length()),
156+
inputs[index]);
157+
});
158+
threads[index].start();
159+
}
160+
161+
for (Thread thread : threads) {
162+
thread.join();
163+
}
164+
165+
for (int i = 0; i < threadCount; i++) {
166+
String decoded =
167+
new String(results[i].array(), 0, results[i].limit(), StandardCharsets.UTF_8);
168+
assertThat(decoded).isEqualTo(inputs[i]);
169+
}
170+
}
171+
172+
private boolean containsTrailingZeros(ByteBuffer buffer) {
173+
byte[] array = buffer.array();
174+
for (int i = buffer.limit(); i < array.length; i++) {
175+
if (array[i] != 0) {
176+
return true;
177+
}
178+
}
179+
return false;
180+
}
181+
}

0 commit comments

Comments
 (0)