Skip to content

Commit 19e7e52

Browse files
author
Oleksandr Dzhychko
authored
Merge pull request #650 from modelix/MODELIX-830-detect-incomplete-version-delta-streams
fix(model-client): detect incomplete version delta streams
2 parents 2b34bda + 5328bdb commit 19e7e52

File tree

9 files changed

+430
-29
lines changed

9 files changed

+430
-29
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ import org.modelix.model.persistent.HashUtil
6565
import org.modelix.model.persistent.MapBasedStore
6666
import org.modelix.model.server.api.v2.VersionDelta
6767
import org.modelix.model.server.api.v2.VersionDeltaStream
68+
import org.modelix.model.server.api.v2.VersionDeltaStreamV2
6869
import org.modelix.model.server.api.v2.asStream
6970
import org.modelix.modelql.client.ModelQLClient
7071
import org.modelix.modelql.core.IMonoStep
@@ -491,32 +492,47 @@ private fun URLBuilder.appendPathSegmentsEncodingSlash(vararg components: String
491492
fun VersionDelta.getAllObjects(): Map<String, String> = objectsMap + objects.associateBy { HashUtil.sha256(it) }
492493

493494
suspend fun HttpResponse.readVersionDelta(): VersionDeltaStream {
494-
return if (contentType()?.match(VersionDeltaStream.CONTENT_TYPE) == true) {
495-
val content = bodyAsChannel()
496-
val versionHash = checkNotNull(content.readUTF8Line()) { "No objects received" }
497-
val versionObject = content.readUTF8Line()
498-
return if (versionObject == null) {
499-
VersionDeltaStream(versionHash, emptyFlow())
500-
} else {
501-
VersionDeltaStream(
502-
versionHash,
503-
flow {
504-
emit(versionHash to versionObject)
505-
while (true) {
506-
val key = content.readUTF8Line() ?: break
507-
val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" }
508-
emit(key to value)
509-
}
510-
},
511-
)
512-
}
495+
val parsedContentType = contentType()
496+
return if (parsedContentType?.match(VersionDeltaStreamV2.CONTENT_TYPE) == true) {
497+
return readVersionDeltaStreamV2()
498+
} else if (parsedContentType?.match(VersionDeltaStream.CONTENT_TYPE) == true) {
499+
return readVersionDeltaStreamV1()
513500
} else {
514501
body<VersionDelta>().asStream()
515502
}
516503
}
517504

505+
private suspend fun HttpResponse.readVersionDeltaStreamV1(): VersionDeltaStream {
506+
val content = bodyAsChannel()
507+
val versionHash = checkNotNull(content.readUTF8Line()) { "No objects received" }
508+
val versionObject = content.readUTF8Line()
509+
return if (versionObject == null) {
510+
VersionDeltaStream(versionHash, emptyFlow())
511+
} else {
512+
VersionDeltaStream(
513+
versionHash,
514+
flow {
515+
emit(versionHash to versionObject)
516+
while (true) {
517+
val key = content.readUTF8Line() ?: break
518+
val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" }
519+
emit(key to value)
520+
}
521+
},
522+
)
523+
}
524+
}
525+
526+
private suspend fun HttpResponse.readVersionDeltaStreamV2(): VersionDeltaStream {
527+
val content = bodyAsChannel()
528+
val decodeVersionDeltaStreamV2 = VersionDeltaStreamV2.decodeVersionDeltaStreamV2(content)
529+
return VersionDeltaStream(decodeVersionDeltaStreamV2.versionHash, decodeVersionDeltaStreamV2.hashesWithDeltaObject)
530+
}
531+
518532
fun HttpRequestBuilder.useVersionStreamFormat() {
519-
headers.set(HttpHeaders.Accept, VersionDeltaStream.CONTENT_TYPE.toString())
533+
headers[HttpHeaders.Accept] = VersionDeltaStreamV2.CONTENT_TYPE.toString()
534+
// Add CONTENT_TYPE_VERSION_DELTA_V1 so that newer clients cant talk with older servers.
535+
headers.append(HttpHeaders.Accept, VersionDeltaStream.CONTENT_TYPE.toString())
520536
}
521537

522538
/**

model-client/src/commonTest/kotlin/org/modelix/model/client2/ModelClientV2Test.kt

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@ package org.modelix.model.client2
1717

1818
import io.ktor.client.HttpClient
1919
import io.ktor.client.engine.mock.MockEngine
20+
import io.ktor.client.engine.mock.respond
2021
import io.ktor.client.engine.mock.respondError
22+
import io.ktor.http.HttpHeaders
2123
import io.ktor.http.HttpStatusCode
24+
import io.ktor.http.headersOf
2225
import kotlinx.coroutines.CancellationException
2326
import kotlinx.coroutines.test.runTest
27+
import org.modelix.model.lazy.RepositoryId
28+
import org.modelix.model.server.api.v2.VersionDeltaStreamV2
2429
import kotlin.test.Test
30+
import kotlin.test.assertEquals
2531
import kotlin.test.assertFailsWith
2632

2733
class ModelClientV2Test {
@@ -38,8 +44,30 @@ class ModelClientV2Test {
3844
.url(url)
3945
.build()
4046
modelClient.close()
41-
assertFailsWith<CancellationException>("Parent job is Completed") {
47+
val exception = assertFailsWith<CancellationException> {
4248
modelClient.init()
4349
}
50+
assertEquals("Parent job is Completed", exception.message)
51+
}
52+
53+
@Test
54+
fun detectIncompleteDataInVersionDeltaStreamV2() = runTest {
55+
val incompleteData = "CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4f"
56+
val repositoryId = RepositoryId("aRepositoryId")
57+
val branchRef = repositoryId.getBranchReference("main")
58+
val url = "http://localhost/v2"
59+
val mockEngine = MockEngine { requestData ->
60+
assertEquals(VersionDeltaStreamV2.CONTENT_TYPE.toString(), requestData.headers[HttpHeaders.Accept])
61+
respond(incompleteData, HttpStatusCode.OK, headersOf(HttpHeaders.ContentType, VersionDeltaStreamV2.CONTENT_TYPE.toString()))
62+
}
63+
val httpClient = HttpClient(mockEngine)
64+
val modelClient = ModelClientV2.builder()
65+
.client(httpClient)
66+
.url(url)
67+
.build()
68+
69+
assertFailsWith<VersionDeltaStreamV2.Companion.IncompleteData> {
70+
modelClient.pull(branchRef, null)
71+
}
4472
}
4573
}

model-client/src/jvmTest/kotlin/org/modelix/model/client2/ModelClientV2JvmTest.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import io.ktor.http.HttpStatusCode
2222
import kotlinx.coroutines.CancellationException
2323
import kotlinx.coroutines.test.runTest
2424
import kotlin.test.Test
25+
import kotlin.test.assertEquals
2526
import kotlin.test.assertFailsWith
2627

2728
class ModelClientV2JvmTest {
@@ -40,13 +41,15 @@ class ModelClientV2JvmTest {
4041
// Implementing `close` allow to use `.use` method.
4142
modelClient.use {
4243
}
43-
assertFailsWith<CancellationException>("Parent job is Completed") {
44+
val firstException = assertFailsWith<CancellationException> {
4445
modelClient.init()
4546
}
46-
// `Closable` implies, that `.close` method is idempotent.
47+
assertEquals("Parent job is Completed", firstException.message)
48+
// `Closable` implies that `.close` method is idempotent.
4749
modelClient.close()
48-
assertFailsWith<CancellationException>("Parent job is Completed") {
50+
val secondException = assertFailsWith<CancellationException> {
4951
modelClient.init()
5052
}
53+
assertEquals("Parent job is Completed", secondException.message)
5154
}
5255
}

model-server-api/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ kotlin {
3939
}
4040
val commonTest by getting {
4141
dependencies {
42-
// implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinCoroutinesVersion")
42+
implementation(libs.kotlin.coroutines.test)
4343
implementation(kotlin("test"))
4444
}
4545
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.server.api.v2
18+
19+
import io.ktor.http.ContentType
20+
import io.ktor.utils.io.ByteReadChannel
21+
import io.ktor.utils.io.ByteWriteChannel
22+
import io.ktor.utils.io.readUTF8Line
23+
import io.ktor.utils.io.writeStringUtf8
24+
import kotlinx.coroutines.flow.Flow
25+
import kotlinx.coroutines.flow.flow
26+
27+
/**
28+
* In comparison to the previous format for version deltas,
29+
* this format is structured so that incompletely sent data can be detected.
30+
*
31+
* Detecting incomplete data is a workaround for:
32+
* - https://youtrack.jetbrains.com/issue/KTOR-6905/The-client-reads-incomplete-streamed-responses-without-failing
33+
* In this case partial data from a request might be read
34+
* without throwing an exception or indicating it with a correct return value
35+
* - https://youtrack.jetbrains.com/issue/KTOR-4862/Ktor-hangs-if-exception-occurs-during-write-response-body
36+
* Because Ktor server does not close the connection when an exception occurs while writing a body
37+
* we always close the connection even if not all data was yet send (see [ByteWriteChannel.useClosingWithoutCause]).
38+
*
39+
* The format sends redundant hashes because of previous bugs encountered with SHA1 calculation.
40+
* See https://github.com/modelix/modelix.core/pull/213/commits/a412bc97765426fcc81db0c55516c65b8679641b
41+
*/
42+
class VersionDeltaStreamV2(
43+
val versionHash: String,
44+
val hashesWithDeltaObject: Flow<Pair<String, String>>,
45+
) {
46+
companion object {
47+
class IncompleteData(message: String) : RuntimeException(message)
48+
49+
val CONTENT_TYPE = ContentType("application", "x-modelix-objects-v2")
50+
private const val NEW_LINE_IN_VERSION_DELTA_STREAM_V2 = "\n"
51+
52+
/**
53+
* Magic byte (as string) that indicates the end of one data line/all streaming data.
54+
* Use `$`/`~` because it:
55+
* (1) encodes as a single byte in UTF8 and therefore cannot be sent partially.
56+
* (2) is not a first character in serialized value (see. [IKVValue.serialize]).
57+
* (3) it is a char that would be URL-encoded in object data.
58+
*/
59+
private const val MAGIC_BYTE_FOR_END_OF_DATA_LINE = "$"
60+
private const val MAGIC_BYTE_FOR_END_OF_VERSION_DELTA = "~"
61+
62+
private suspend fun encodeLine(output: ByteWriteChannel, line: String) {
63+
output.writeStringUtf8(line)
64+
output.writeStringUtf8(NEW_LINE_IN_VERSION_DELTA_STREAM_V2)
65+
output.writeStringUtf8(MAGIC_BYTE_FOR_END_OF_DATA_LINE)
66+
output.writeStringUtf8(NEW_LINE_IN_VERSION_DELTA_STREAM_V2)
67+
}
68+
69+
suspend fun encodeVersionDeltaStreamV2(
70+
output: ByteWriteChannel,
71+
versionHash: String,
72+
hashesWithDeltaObject: Flow<Pair<String, String>>,
73+
) {
74+
encodeLine(output, versionHash)
75+
hashesWithDeltaObject.collect { (hash, deltaObject) ->
76+
encodeLine(output, hash)
77+
encodeLine(output, deltaObject)
78+
}
79+
output.writeStringUtf8(MAGIC_BYTE_FOR_END_OF_VERSION_DELTA)
80+
}
81+
82+
private suspend fun decodeLine(input: ByteReadChannel): String? {
83+
val dataLine = input.readUTF8Line()
84+
when (dataLine) {
85+
null -> throw IncompleteData("Missing data line")
86+
MAGIC_BYTE_FOR_END_OF_VERSION_DELTA -> return null
87+
}
88+
val endOfDataLine = input.readUTF8Line()
89+
if (endOfDataLine != MAGIC_BYTE_FOR_END_OF_DATA_LINE) {
90+
throw IncompleteData("Missing end of data line [dataLine=`$dataLine`] [endOfDataLine=`$endOfDataLine`]")
91+
}
92+
return dataLine
93+
}
94+
95+
/**
96+
* @throws IncompleteData if data is detected to be incomplete
97+
*/
98+
suspend fun decodeVersionDeltaStreamV2(input: ByteReadChannel): VersionDeltaStreamV2 {
99+
val versionHash = decodeLine(input) ?: throw IncompleteData("Version hash missing.")
100+
val hashesWithDeltaObject = flow {
101+
while (true) {
102+
val hash = decodeLine(input) ?: break
103+
val deltaObject = decodeLine(input) ?: throw IncompleteData("Missing delta object.")
104+
emit(hash to deltaObject)
105+
}
106+
}
107+
return VersionDeltaStreamV2(versionHash, hashesWithDeltaObject)
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)