Skip to content

Commit 8a30b45

Browse files
author
Sergey Mashkov
committed
IO: add read until delimiter tests to cover more corner-cases
1 parent 2c3a0dc commit 8a30b45

File tree

3 files changed

+340
-22
lines changed

3 files changed

+340
-22
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -978,7 +978,7 @@ internal class ByteBufferChannel(
978978
if (!rc) {
979979
if (closed != null || state === ReadWriteBufferState.Terminated) return visitor(TerminatedLookAhead)
980980
result = visitor(this)
981-
if (!state.idle) {
981+
if (!state.idle && state !== ReadWriteBufferState.Terminated) {
982982
restoreStateAfterRead()
983983
tryTerminate()
984984
}

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/Delimited.kt

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ suspend fun ByteReadChannel.readUntilDelimiter(delimiter: ByteBuffer, dst: ByteB
3232
} while (dst.hasRemaining() && !endFound)
3333
}
3434

35-
if (!dst.hasRemaining() || endFound) return copied
36-
37-
return readUntilDelimiterSuspend(delimiter, dst, copied)
35+
return when {
36+
copied == 0 && isClosedForRead -> -1
37+
!dst.hasRemaining() || endFound -> copied
38+
else -> readUntilDelimiterSuspend(delimiter, dst, copied)
39+
}
3840
}
3941

4042
suspend fun ByteReadChannel.skipDelimiter(delimiter: ByteBuffer) {
@@ -62,18 +64,21 @@ private suspend fun ByteReadChannel.readUntilDelimiterSuspend(delimiter: ByteBuf
6264
require(delimiter !== dst)
6365
require(copied0 >= 0)
6466

65-
return lookAheadSuspend {
66-
var endFound = false
67+
var endFound = false
68+
val copied = lookAheadSuspend {
6769
var copied = copied0
6870

6971
do {
7072
awaitAtLeast(1)
7173
val rc = tryCopyUntilDelimiter(delimiter, dst)
7274
if (rc == 0) {
73-
if (request(0, delimiter.remaining())?.startsWith(delimiter, 0) == true) {
75+
if (startsWithDelimiter(delimiter) == delimiter.remaining()) {
76+
endFound = true
7477
break
7578
}
76-
if (!isClosedForRead) {
79+
if (isClosedForWrite) {
80+
break
81+
} else {
7782
awaitAtLeast(delimiter.remaining())
7883
continue
7984
}
@@ -88,6 +93,12 @@ private suspend fun ByteReadChannel.readUntilDelimiterSuspend(delimiter: ByteBuf
8893

8994
copied
9095
}
96+
97+
return when {
98+
copied > 0 && isClosedForWrite && !endFound -> copied + readAvailable(dst).coerceAtLeast(0)
99+
copied == 0 && isClosedForRead -> -1
100+
else -> copied
101+
}
91102
}
92103

93104
/**
@@ -102,17 +113,24 @@ private fun LookAheadSession.tryCopyUntilDelimiter(delimiter: ByteBuffer, dst: B
102113
val found = minOf(buffer.remaining() - index, delimiter.remaining())
103114
val notKnown = delimiter.remaining() - found
104115

105-
val next = if (notKnown > 0) request(index + found, notKnown) else null
106-
if (next != null) {
107-
if (next.startsWith(delimiter, found)) {
108-
endFound = true
109-
dst.putLimited(buffer, buffer.position() + index)
116+
if (notKnown == 0) {
117+
endFound = true
118+
dst.putLimited(buffer, buffer.position() + index)
119+
} else {
120+
val remembered = buffer.duplicate()
121+
val next = request(index + found, 1)
122+
if (next == null) {
123+
dst.putLimited(remembered, remembered.position() + index)
124+
} else if (next.startsWith(delimiter, found)) {
125+
if (next.remaining() >= notKnown) {
126+
endFound = true
127+
dst.putLimited(remembered, remembered.position() + index)
128+
} else {
129+
dst.putLimited(remembered, remembered.position() + index)
130+
}
110131
} else {
111-
dst.putLimited(buffer, buffer.position() + index + 1)
132+
dst.putLimited(remembered, remembered.position() + index + 1)
112133
}
113-
} else {
114-
endFound = notKnown == 0
115-
dst.putLimited(buffer, buffer.position() + index)
116134
}
117135
} else {
118136
dst.putAtMost(buffer)
@@ -123,18 +141,29 @@ private fun LookAheadSession.tryCopyUntilDelimiter(delimiter: ByteBuffer, dst: B
123141
}
124142

125143
private fun LookAheadSession.tryEnsureDelimiter(delimiter: ByteBuffer): Int {
144+
val found = startsWithDelimiter(delimiter)
145+
if (found == -1) throw IOException("Failed to skip delimiter: actual bytes differ from delimiter bytes")
146+
if (found < delimiter.remaining()) return found
147+
148+
consumed(delimiter.remaining())
149+
return delimiter.remaining()
150+
}
151+
152+
/**
153+
* @return Number of bytes of the delimiter found (possibly 0 if no bytes available yet) or -1 if it doesn't start
154+
*/
155+
private fun LookAheadSession.startsWithDelimiter(delimiter: ByteBuffer): Int {
126156
val buffer = request(0, 1) ?: return 0
127157
val index = buffer.indexOfPartial(delimiter)
128-
if (index != 0) throw IOException("Failed to skip delimiter: actual bytes differ from delimiter bytes")
158+
if (index != 0) return -1
129159

130160
val found = minOf(buffer.remaining() - index, delimiter.remaining())
131161
val notKnown = delimiter.remaining() - found
132162

133163
if (notKnown > 0) {
134164
val next = request(index + found, notKnown) ?: return found
135-
if (!next.startsWith(delimiter, found)) throw IOException("Failed to skip delimiter: actual bytes differ from delimiter bytes")
165+
if (!next.startsWith(delimiter, found)) return -1
136166
}
137167

138-
consumed(delimiter.remaining())
139168
return delimiter.remaining()
140169
}

0 commit comments

Comments
 (0)