Skip to content

Commit 02f643c

Browse files
authored
Further improve stream decoding performance (#2101)
* Handroll CharsetReader implementation * Pool byte arrays for charset decoding Further improves #1893
1 parent ada7d2a commit 02f643c

File tree

5 files changed

+180
-42
lines changed

5 files changed

+180
-42
lines changed

formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@ public fun <T> Json.decodeFromStream(
5656
deserializer: DeserializationStrategy<T>,
5757
stream: InputStream
5858
): T {
59-
return decodeByReader(deserializer, JavaStreamSerialReader(stream))
59+
val reader = JavaStreamSerialReader(stream)
60+
try {
61+
return decodeByReader(deserializer, reader)
62+
} finally {
63+
reader.release()
64+
}
6065
}
6166

6267
/**

formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt renamed to formats/json/jvmMain/src/kotlinx/serialization/json/internal/ArrayPools.kt

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,18 @@
33
*/
44
package kotlinx.serialization.json.internal
55

6-
import java.util.concurrent.*
6+
/*
7+
* Not really documented kill switch as a workaround for potential
8+
* (unlikely) problems with memory consumptions.
9+
*/
10+
private val MAX_CHARS_IN_POOL = runCatching {
11+
System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull()
12+
}.getOrNull() ?: 2 * 1024 * 1024
713

814
internal open class CharArrayPoolBase {
915
private val arrays = ArrayDeque<CharArray>()
1016
private var charsTotal = 0
1117

12-
/*
13-
* Not really documented kill switch as a workaround for potential
14-
* (unlikely) problems with memory consumptions.
15-
*/
16-
private val MAX_CHARS_IN_POOL = runCatching {
17-
System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull()
18-
}.getOrNull() ?: 1024 * 1024 // 2 MB seems to be a reasonable constraint, (1M of chars)
19-
2018
protected fun take(size: Int): CharArray {
2119
/*
2220
* Initially the pool is empty, so an instance will be allocated
@@ -52,3 +50,40 @@ internal actual object CharArrayPoolBatchSize : CharArrayPoolBase() {
5250
releaseImpl(array)
5351
}
5452
}
53+
54+
// Byte array pool
55+
56+
internal open class ByteArrayPoolBase {
57+
private val arrays = ArrayDeque<kotlin.ByteArray>()
58+
private var bytesTotal = 0
59+
60+
protected fun take(size: Int): ByteArray {
61+
/*
62+
* Initially the pool is empty, so an instance will be allocated
63+
* and the pool will be populated in the 'release'
64+
*/
65+
val candidate = synchronized(this) {
66+
arrays.removeLastOrNull()?.also { bytesTotal -= it.size / 2 }
67+
}
68+
return candidate ?: ByteArray(size)
69+
}
70+
71+
protected fun releaseImpl(array: ByteArray): Unit = synchronized(this) {
72+
if (bytesTotal + array.size >= MAX_CHARS_IN_POOL) return@synchronized
73+
bytesTotal += array.size / 2
74+
arrays.addLast(array)
75+
}
76+
}
77+
78+
internal object ByteArrayPool8k : ByteArrayPoolBase() {
79+
fun take(): ByteArray = super.take(8196)
80+
81+
fun release(array: ByteArray) = releaseImpl(array)
82+
}
83+
84+
85+
internal object ByteArrayPool : ByteArrayPoolBase() {
86+
fun take(): ByteArray = super.take(512)
87+
88+
fun release(array: ByteArray) = releaseImpl(array)
89+
}

formats/json/jvmMain/src/kotlinx/serialization/json/internal/ByteArrayPool.kt

Lines changed: 0 additions & 30 deletions
This file was deleted.
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package kotlinx.serialization.json.internal
2+
3+
import java.io.*
4+
import java.nio.*
5+
import java.nio.charset.*
6+
7+
internal class CharsetReader(
8+
private val inputStream: InputStream,
9+
private val charset: Charset
10+
) {
11+
private val decoder: CharsetDecoder
12+
private val byteBuffer: ByteBuffer
13+
14+
// Surrogate-handling in cases when a single char is requested, but two were read
15+
private var hasLeftoverPotentiallySurrogateChar = false
16+
private var leftoverChar = 0.toChar()
17+
18+
init {
19+
decoder = charset.newDecoder()
20+
.onMalformedInput(CodingErrorAction.REPLACE)
21+
.onUnmappableCharacter(CodingErrorAction.REPLACE)
22+
byteBuffer = ByteBuffer.wrap(ByteArrayPool8k.take())
23+
byteBuffer.flip() // Make empty
24+
}
25+
26+
@Suppress("NAME_SHADOWING")
27+
fun read(array: CharArray, offset: Int, length: Int): Int {
28+
if (length == 0) return 0
29+
require(offset in 0 until array.size && length >= 0 && offset + length <= array.size) {
30+
"Unexpected arguments: $offset, $length, ${array.size}"
31+
}
32+
33+
var offset = offset
34+
var length = length
35+
var bytesRead = 0
36+
if (hasLeftoverPotentiallySurrogateChar) {
37+
array[offset] = leftoverChar
38+
offset++
39+
length--
40+
hasLeftoverPotentiallySurrogateChar = false
41+
bytesRead = 1
42+
if (length == 0) return bytesRead
43+
}
44+
if (length == 1) {
45+
// Treat single-character array reads just like read()
46+
val c = oneShotReadSlowPath()
47+
if (c == -1) return if (bytesRead == 0) -1 else bytesRead
48+
array[offset] = c.toChar()
49+
return bytesRead + 1
50+
}
51+
return doRead(array, offset, length) + bytesRead
52+
}
53+
54+
private fun doRead(array: CharArray, offset: Int, length: Int): Int {
55+
var charBuffer = CharBuffer.wrap(array, offset, length)
56+
if (charBuffer.position() != 0) {
57+
charBuffer = charBuffer.slice()
58+
}
59+
var isEof = false
60+
while (true) {
61+
val cr = decoder.decode(byteBuffer, charBuffer, isEof)
62+
if (cr.isUnderflow) {
63+
if (isEof) break
64+
if (!charBuffer.hasRemaining()) break
65+
val n = fillByteBuffer()
66+
if (n < 0) {
67+
isEof = true
68+
if (charBuffer.position() == 0 && !byteBuffer.hasRemaining()) break
69+
decoder.reset()
70+
}
71+
continue
72+
}
73+
if (cr.isOverflow) {
74+
assert(charBuffer.position() > 0)
75+
break
76+
}
77+
cr.throwException()
78+
}
79+
if (isEof) decoder.reset()
80+
return if (charBuffer.position() == 0) -1
81+
else charBuffer.position()
82+
}
83+
84+
private fun fillByteBuffer(): Int {
85+
byteBuffer.compact()
86+
try {
87+
// Read from the input stream, and then update the buffer
88+
val limit = byteBuffer.limit()
89+
val position = byteBuffer.position()
90+
val remaining = if (position <= limit) limit - position else 0
91+
val bytesRead = inputStream.read(byteBuffer.array(), byteBuffer.arrayOffset() + position, remaining)
92+
if (bytesRead < 0) return bytesRead
93+
byteBuffer.position(position + bytesRead)
94+
} finally {
95+
byteBuffer.flip()
96+
}
97+
return byteBuffer.remaining()
98+
}
99+
100+
private fun oneShotReadSlowPath(): Int {
101+
// Return the leftover char, if there is one
102+
if (hasLeftoverPotentiallySurrogateChar) {
103+
hasLeftoverPotentiallySurrogateChar = false
104+
return leftoverChar.code
105+
}
106+
107+
val array = CharArray(2)
108+
val bytesRead = read(array, 0, 2)
109+
return when (bytesRead) {
110+
-1 -> -1
111+
1 -> array[0].code
112+
2 -> {
113+
leftoverChar = array[1]
114+
hasLeftoverPotentiallySurrogateChar = true
115+
array[0].code
116+
}
117+
else -> error("Unreachable state: $bytesRead")
118+
}
119+
}
120+
121+
public fun release() {
122+
ByteArrayPool8k.release(byteBuffer.array())
123+
}
124+
}

formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package kotlinx.serialization.json.internal
22

33
import java.io.InputStream
44
import java.io.OutputStream
5-
import java.nio.charset.Charset
65

76
internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWriter {
87
private val buffer = ByteArrayPool.take()
@@ -255,9 +254,14 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr
255254
}
256255

257256
internal class JavaStreamSerialReader(stream: InputStream) : SerialReader {
258-
private val reader = stream.reader(Charsets.UTF_8)
257+
// NB: not closed on purpose, it is the responsibility of the caller
258+
private val reader = CharsetReader(stream, Charsets.UTF_8)
259259

260260
override fun read(buffer: CharArray, bufferOffset: Int, count: Int): Int {
261261
return reader.read(buffer, bufferOffset, count)
262262
}
263+
264+
fun release() {
265+
reader.release()
266+
}
263267
}

0 commit comments

Comments
 (0)