Skip to content

Commit 57deef6

Browse files
authored
Introduce CharArray pooling for InputStream decoding (#2100)
* Add benchmark * Introduce pooling of CharArray for InputStream decoding: by default, 16k buffer is allocated for char-related operations, and it may create a non-trivial GC pressure for small objects decoding * The estimated performance improvement is tens of percents, reaching 70% on our ASCII benchmarks Fixes #1893
1 parent b749dd1 commit 57deef6

File tree

11 files changed

+169
-43
lines changed

11 files changed

+169
-43
lines changed

benchmark/src/jmh/kotlin/kotlinx/benchmarks/json/JacksonComparisonBenchmark.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.serialization.json.okio.encodeToBufferedSink
88
import okio.blackholeSink
99
import okio.buffer
1010
import org.openjdk.jmh.annotations.*
11+
import java.io.ByteArrayInputStream
1112
import java.io.OutputStream
1213
import java.util.concurrent.*
1314

@@ -75,6 +76,7 @@ open class JacksonComparisonBenchmark {
7576
}
7677

7778
private val stringData = Json.encodeToString(DefaultPixelEvent.serializer(), data)
79+
private val utf8BytesData = stringData.toByteArray()
7880

7981
@Serializable
8082
private class SmallDataClass(val id: Int, val name: String)
@@ -96,6 +98,9 @@ open class JacksonComparisonBenchmark {
9698
@Benchmark
9799
fun kotlinToStream() = Json.encodeToStream(DefaultPixelEvent.serializer(), data, devNullStream)
98100

101+
@Benchmark
102+
fun kotlinFromStream() = Json.decodeFromStream(DefaultPixelEvent.serializer(), ByteArrayInputStream(utf8BytesData))
103+
99104
@Benchmark
100105
fun kotlinToOkio() = Json.encodeToBufferedSink(DefaultPixelEvent.serializer(), data, devNullSink)
101106

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package kotlinx.serialization.json
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.serialization.Serializable
5+
import kotlinx.serialization.builtins.*
6+
import org.junit.Test
7+
import java.io.ByteArrayInputStream
8+
import java.io.ByteArrayOutputStream
9+
import kotlin.random.*
10+
import kotlin.test.*
11+
12+
// Stresses out that JSON decoded in parallel does not interfere (mostly via caching of various buffers)
13+
class JsonConcurrentStressTest : JsonTestBase() {
14+
private val charset = "ABCDEFGHIJKLMNOPQRSTUVWXTZabcdefghiklmnopqrstuvwxyz0123456789"
15+
16+
@Test
17+
fun testDecodeInParallelSimpleList() = doTest(100) { mode ->
18+
val value = (1..10000).map { Random.nextDouble() }
19+
val string = Json.encodeToString(ListSerializer(Double.serializer()), value, mode)
20+
assertEquals(value, Json.decodeFromString(ListSerializer(Double.serializer()), string, mode))
21+
}
22+
23+
@Serializable
24+
data class Foo(val s: String, val f: Foo?)
25+
26+
@Test
27+
fun testDecodeInParallelListOfPojo() = doTest(1_000) { mode ->
28+
val value = (1..100).map {
29+
val randomString = getRandomString()
30+
val nestedFoo = Foo("null抢\u000E鋽윝䑜厼\uF70A紲ᢨ䣠null⛾䉻嘖緝ᯧnull쎶\u0005null" + randomString, null)
31+
Foo(getRandomString(), nestedFoo)
32+
}
33+
val string = Json.encodeToString(ListSerializer(Foo.serializer()), value, mode)
34+
assertEquals(value, Json.decodeFromString(ListSerializer(Foo.serializer()), string, mode))
35+
}
36+
37+
@Test
38+
fun testDecodeInParallelPojo() = doTest(100_000) { mode ->
39+
val randomString = getRandomString()
40+
val nestedFoo = Foo("null抢\u000E鋽윝䑜厼\uF70A紲ᢨ䣠null⛾䉻嘖緝ᯧnull쎶\u0005null" + randomString, null)
41+
val randomFoo = Foo(getRandomString(), nestedFoo)
42+
val string = Json.encodeToString(Foo.serializer(), randomFoo, mode)
43+
assertEquals(randomFoo, Json.decodeFromString(Foo.serializer(), string, mode))
44+
}
45+
46+
@Test
47+
fun testDecodeInParallelSequencePojo() = runBlocking<Unit> {
48+
for (i in 1 until 1_000) {
49+
launch(Dispatchers.Default) {
50+
val values = (1..100).map {
51+
val randomString = getRandomString()
52+
val nestedFoo = Foo("null抢\u000E鋽윝䑜厼\uF70A紲ᢨ䣠null⛾䉻嘖緝ᯧnull쎶\u0005null" + randomString, null)
53+
Foo(getRandomString(), nestedFoo)
54+
}
55+
val baos = ByteArrayOutputStream()
56+
for (value in values) {
57+
Json.encodeToStream(Foo.serializer(), value, baos)
58+
}
59+
val bais = ByteArrayInputStream(baos.toByteArray())
60+
assertEquals(values, Json.decodeToSequence(bais, Foo.serializer()).toList())
61+
}
62+
}
63+
}
64+
65+
private fun getRandomString() = (1..Random.nextInt(0, charset.length)).map { charset[it] }.joinToString(separator = "")
66+
67+
private fun doTest(iterations: Int, block: (JsonTestingMode) -> Unit) {
68+
runBlocking<Unit> {
69+
for (i in 1 until iterations) {
70+
launch(Dispatchers.Default) {
71+
parametrizedTest {
72+
block(it)
73+
}
74+
}
75+
}
76+
}
77+
}
78+
}

formats/json-tests/jvmTest/src/kotlinx/serialization/json/ParallelJsonStressTest.kt

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
package kotlinx.serialization.json.internal
5+
6+
internal expect object CharArrayPoolBatchSize {
7+
fun take(): CharArray
8+
fun release(array: CharArray)
9+
}

formats/json/commonMain/src/kotlinx/serialization/json/internal/JsonStreams.kt

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,14 @@ internal fun <T> Json.decodeByReader(
3434
reader: SerialReader
3535
): T {
3636
val lexer = ReaderJsonLexer(reader)
37-
val input = StreamingJsonDecoder(this, WriteMode.OBJ, lexer, deserializer.descriptor, null)
38-
val result = input.decodeSerializableValue(deserializer)
39-
lexer.expectEof()
40-
return result
37+
try {
38+
val input = StreamingJsonDecoder(this, WriteMode.OBJ, lexer, deserializer.descriptor, null)
39+
val result = input.decodeSerializableValue(deserializer)
40+
lexer.expectEof()
41+
return result
42+
} finally {
43+
lexer.release()
44+
}
4145
}
4246

4347
@PublishedApi
@@ -47,7 +51,7 @@ internal fun <T> Json.decodeToSequenceByReader(
4751
deserializer: DeserializationStrategy<T>,
4852
format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT
4953
): Sequence<T> {
50-
val lexer = ReaderJsonLexer(reader)
54+
val lexer = ReaderJsonLexer(reader, CharArray(BATCH_SIZE)) // Unpooled buffer due to lazy nature of sequence
5155
val iter = JsonIterator(format, this, lexer, deserializer)
5256
return Sequence { iter }.constrainOnce()
5357
}

formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/JsonLexer.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ private const val DEFAULT_THRESHOLD = 128
1414
* For some reason this hand-rolled implementation is faster than
1515
* fun ArrayAsSequence(s: CharArray): CharSequence = java.nio.CharBuffer.wrap(s, 0, length)
1616
*/
17-
internal class ArrayAsSequence(val buffer: CharArray) : CharSequence {
17+
internal class ArrayAsSequence(private val buffer: CharArray) : CharSequence {
1818
override var length: Int = buffer.size
1919

2020
override fun get(index: Int): Char = buffer[index]
@@ -34,11 +34,11 @@ internal class ArrayAsSequence(val buffer: CharArray) : CharSequence {
3434

3535
internal class ReaderJsonLexer(
3636
private val reader: SerialReader,
37-
charsBuffer: CharArray = CharArray(BATCH_SIZE)
37+
private val buffer: CharArray = CharArrayPoolBatchSize.take()
3838
) : AbstractJsonLexer() {
3939
private var threshold: Int = DEFAULT_THRESHOLD // chars
4040

41-
override val source: ArrayAsSequence = ArrayAsSequence(charsBuffer)
41+
override val source: ArrayAsSequence = ArrayAsSequence(buffer)
4242

4343
init {
4444
preload(0)
@@ -177,4 +177,8 @@ internal class ReaderJsonLexer(
177177

178178
// Can be carefully implemented but postponed for now
179179
override fun consumeLeadingMatchingValue(keyToMatch: String, isLenient: Boolean): String? = null
180+
181+
fun release() {
182+
CharArrayPoolBatchSize.release(buffer)
183+
}
180184
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
package kotlinx.serialization.json.internal
5+
6+
7+
internal actual object CharArrayPoolBatchSize {
8+
9+
actual fun take(): CharArray = CharArray(BATCH_SIZE)
10+
11+
actual fun release(array: CharArray) = Unit
12+
}
Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
/*
2+
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
14
package kotlinx.serialization.json.internal
25

36
import java.util.concurrent.*
47

5-
internal object CharArrayPool {
8+
internal open class CharArrayPoolBase {
69
private val arrays = ArrayDeque<CharArray>()
710
private var charsTotal = 0
11+
812
/*
913
* Not really documented kill switch as a workaround for potential
1014
* (unlikely) problems with memory consumptions.
@@ -13,20 +17,38 @@ internal object CharArrayPool {
1317
System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull()
1418
}.getOrNull() ?: 1024 * 1024 // 2 MB seems to be a reasonable constraint, (1M of chars)
1519

16-
fun take(): CharArray {
20+
protected fun take(size: Int): CharArray {
1721
/*
1822
* Initially the pool is empty, so an instance will be allocated
1923
* and the pool will be populated in the 'release'
2024
*/
2125
val candidate = synchronized(this) {
2226
arrays.removeLastOrNull()?.also { charsTotal -= it.size }
2327
}
24-
return candidate ?: CharArray(128)
28+
return candidate ?: CharArray(size)
2529
}
2630

27-
fun release(array: CharArray) = synchronized(this) {
31+
protected fun releaseImpl(array: CharArray): Unit = synchronized(this) {
2832
if (charsTotal + array.size >= MAX_CHARS_IN_POOL) return@synchronized
2933
charsTotal += array.size
3034
arrays.addLast(array)
3135
}
3236
}
37+
38+
internal object CharArrayPool : CharArrayPoolBase() {
39+
fun take(): CharArray = super.take(128)
40+
41+
// Can release array of an arbitrary size
42+
fun release(array: CharArray) = releaseImpl(array)
43+
}
44+
45+
// Pools char arrays of size 16K
46+
internal actual object CharArrayPoolBatchSize : CharArrayPoolBase() {
47+
48+
actual fun take(): CharArray = super.take(BATCH_SIZE)
49+
50+
actual fun release(array: CharArray) {
51+
require(array.size == BATCH_SIZE) { "Inconsistent internal invariant: unexpected array size ${array.size}" }
52+
releaseImpl(array)
53+
}
54+
}

formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,14 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr
6868
0.toByte() -> {
6969
charArray[sz++] = ch.toChar()
7070
}
71+
7172
1.toByte() -> {
7273
val escapedString = ESCAPE_STRINGS[ch]!!
7374
sz = ensureTotalCapacity(sz, escapedString.length)
7475
escapedString.toCharArray(charArray, sz, 0, escapedString.length)
7576
sz += escapedString.length
7677
}
78+
7779
else -> {
7880
charArray[sz] = '\\'
7981
charArray[sz + 1] = marker.toInt().toChar()
@@ -204,34 +206,38 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr
204206
}
205207
}
206208

207-
/*
208-
Sources taken from okio library with minor changes, see https://github.com/square/okio
209-
*/
209+
/**
210+
* Sources taken from okio library with minor changes, see https://github.com/square/okio
211+
*/
210212
private fun writeUtf8CodePoint(codePoint: Int) {
211213
when {
212214
codePoint < 0x80 -> {
213215
// Emit a 7-bit code point with 1 byte.
214216
ensure(1)
215217
write(codePoint)
216218
}
219+
217220
codePoint < 0x800 -> {
218221
// Emit a 11-bit code point with 2 bytes.
219222
ensure(2)
220223
write(codePoint shr 6 or 0xc0) // 110xxxxx
221224
write(codePoint and 0x3f or 0x80) // 10xxxxxx
222225
}
226+
223227
codePoint in 0xd800..0xdfff -> {
224228
// Emit a replacement character for a partial surrogate.
225229
ensure(1)
226230
write('?'.code)
227231
}
232+
228233
codePoint < 0x10000 -> {
229234
// Emit a 16-bit code point with 3 bytes.
230235
ensure(3)
231236
write(codePoint shr 12 or 0xe0) // 1110xxxx
232237
write(codePoint shr 6 and 0x3f or 0x80) // 10xxxxxx
233238
write(codePoint and 0x3f or 0x80) // 10xxxxxx
234239
}
240+
235241
codePoint <= 0x10ffff -> {
236242
// Emit a 21-bit code point with 4 bytes.
237243
ensure(4)
@@ -240,18 +246,16 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr
240246
write(codePoint shr 6 and 0x3f or 0x80) // 10xxyyyy
241247
write(codePoint and 0x3f or 0x80) // 10yyyyyy
242248
}
249+
243250
else -> {
244251
throw JsonEncodingException("Unexpected code point: $codePoint")
245252
}
246253
}
247254
}
248255
}
249256

250-
internal class JavaStreamSerialReader(
251-
stream: InputStream,
252-
charset: Charset = Charsets.UTF_8
253-
) : SerialReader {
254-
private val reader = stream.reader(charset)
257+
internal class JavaStreamSerialReader(stream: InputStream) : SerialReader {
258+
private val reader = stream.reader(Charsets.UTF_8)
255259

256260
override fun read(buffer: CharArray, bufferOffset: Int, count: Int): Int {
257261
return reader.read(buffer, bufferOffset, count)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
package kotlinx.serialization.json.internal
5+
6+
internal actual object CharArrayPoolBatchSize {
7+
actual fun take(): CharArray = CharArray(BATCH_SIZE)
8+
actual fun release(array: CharArray) = Unit
9+
}

0 commit comments

Comments
 (0)