Skip to content

Commit 0b19122

Browse files
kiszkcloud-fan
authored andcommitted
[SPARK-23762][SQL] UTF8StringBuffer uses MemoryBlock
## What changes were proposed in this pull request? This PR tries to use `MemoryBlock` in `UTF8StringBuffer`. In general, there are two advantages to use `MemoryBlock`. 1. Has clean API calls rather than using a Java array or `PlatformMemory` 2. Improve runtime performance of memory access instead of using `Object`. ## How was this patch tested? Added `UTF8StringBufferSuite` Author: Kazuaki Ishizaki <[email protected]> Closes apache#20871 from kiszk/SPARK-23762.
1 parent 6a2289e commit 0b19122

File tree

2 files changed

+56
-21
lines changed

2 files changed

+56
-21
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.spark.unsafe.Platform;
2121
import org.apache.spark.unsafe.array.ByteArrayMethods;
22+
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
23+
import org.apache.spark.unsafe.memory.MemoryBlock;
2224
import org.apache.spark.unsafe.types.UTF8String;
2325

2426
/**
@@ -29,50 +31,41 @@ public class UTF8StringBuilder {
2931

3032
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
3133

32-
private byte[] buffer;
33-
private int cursor = Platform.BYTE_ARRAY_OFFSET;
34+
private ByteArrayMemoryBlock buffer;
35+
private int length = 0;
3436

3537
public UTF8StringBuilder() {
3638
// Since initial buffer size is 16 in `StringBuilder`, we set the same size here
37-
this.buffer = new byte[16];
39+
this.buffer = new ByteArrayMemoryBlock(16);
3840
}
3941

4042
// Grows the buffer by at least `neededSize`
4143
private void grow(int neededSize) {
42-
if (neededSize > ARRAY_MAX - totalSize()) {
44+
if (neededSize > ARRAY_MAX - length) {
4345
throw new UnsupportedOperationException(
4446
"Cannot grow internal buffer by size " + neededSize + " because the size after growing " +
4547
"exceeds size limitation " + ARRAY_MAX);
4648
}
47-
final int length = totalSize() + neededSize;
48-
if (buffer.length < length) {
49-
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
50-
final byte[] tmp = new byte[newLength];
51-
Platform.copyMemory(
52-
buffer,
53-
Platform.BYTE_ARRAY_OFFSET,
54-
tmp,
55-
Platform.BYTE_ARRAY_OFFSET,
56-
totalSize());
49+
final int requestedSize = length + neededSize;
50+
if (buffer.size() < requestedSize) {
51+
int newLength = requestedSize < ARRAY_MAX / 2 ? requestedSize * 2 : ARRAY_MAX;
52+
final ByteArrayMemoryBlock tmp = new ByteArrayMemoryBlock(newLength);
53+
MemoryBlock.copyMemory(buffer, tmp, length);
5754
buffer = tmp;
5855
}
5956
}
6057

61-
private int totalSize() {
62-
return cursor - Platform.BYTE_ARRAY_OFFSET;
63-
}
64-
6558
public void append(UTF8String value) {
6659
grow(value.numBytes());
67-
value.writeToMemory(buffer, cursor);
68-
cursor += value.numBytes();
60+
value.writeToMemory(buffer.getByteArray(), length + Platform.BYTE_ARRAY_OFFSET);
61+
length += value.numBytes();
6962
}
7063

7164
public void append(String value) {
7265
append(UTF8String.fromString(value));
7366
}
7467

7568
public UTF8String build() {
76-
return UTF8String.fromBytes(buffer, 0, totalSize());
69+
return UTF8String.fromBytes(buffer.getByteArray(), 0, length);
7770
}
7871
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions.codegen
19+
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.unsafe.types.UTF8String
22+
23+
class UTF8StringBuilderSuite extends SparkFunSuite {
24+
25+
test("basic test") {
26+
val sb = new UTF8StringBuilder()
27+
assert(sb.build() === UTF8String.EMPTY_UTF8)
28+
29+
sb.append("")
30+
assert(sb.build() === UTF8String.EMPTY_UTF8)
31+
32+
sb.append("abcd")
33+
assert(sb.build() === UTF8String.fromString("abcd"))
34+
35+
sb.append(UTF8String.fromString("1234"))
36+
assert(sb.build() === UTF8String.fromString("abcd1234"))
37+
38+
// expect to grow an internal buffer
39+
sb.append(UTF8String.fromString("efgijk567890"))
40+
assert(sb.build() === UTF8String.fromString("abcd1234efgijk567890"))
41+
}
42+
}

0 commit comments

Comments
 (0)