Skip to content

Commit 28fd007

Browse files
author
Oleksandr Dzhychko
committed
feat(model-server): add a version delta stream format that enables to detect incomplete responses
Clients can still use the old format, but they might not detect incomplete streams.
1 parent 35cc6fc commit 28fd007

File tree

6 files changed

+359
-5
lines changed

6 files changed

+359
-5
lines changed

model-server-api/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ kotlin {
3939
}
4040
val commonTest by getting {
4141
dependencies {
42-
// implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinCoroutinesVersion")
42+
implementation(libs.kotlin.coroutines.test)
4343
implementation(kotlin("test"))
4444
}
4545
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.server.api.v2
18+
19+
import io.ktor.http.ContentType
20+
import io.ktor.utils.io.ByteReadChannel
21+
import io.ktor.utils.io.ByteWriteChannel
22+
import io.ktor.utils.io.readUTF8Line
23+
import io.ktor.utils.io.writeStringUtf8
24+
import kotlinx.coroutines.flow.Flow
25+
import kotlinx.coroutines.flow.flow
26+
27+
/**
28+
* In comparison to the previous format for version deltas,
29+
* this format is structured so that incompletely sent data can be detected.
30+
*
31+
* Detecting incomplete data is a workaround for:
32+
* - https://youtrack.jetbrains.com/issue/KTOR-6905/The-client-reads-incomplete-streamed-responses-without-failing
33+
* In this case partial data from a request might be read
34+
* without throwing an exception or indicating it with a correct return value
35+
* - https://youtrack.jetbrains.com/issue/KTOR-4862/Ktor-hangs-if-exception-occurs-during-write-response-body
36+
* Because Ktor server does not close the connection when an exception occurs while writing a body
37+
* we always close the connection even if not all data was yet send (see [ByteWriteChannel.useClosingWithoutCause]).
38+
*
39+
* The format sends redundant hashes because of previous bugs encountered with SHA1 calculation.
40+
* See https://github.com/modelix/modelix.core/pull/213/commits/a412bc97765426fcc81db0c55516c65b8679641b
41+
*/
42+
class VersionDeltaStreamV2(
43+
val versionHash: String,
44+
val hashesWithDeltaObject: Flow<Pair<String, String>>,
45+
) {
46+
companion object {
47+
class IncompleteData(message: String) : RuntimeException(message)
48+
49+
val CONTENT_TYPE = ContentType("application", "x-modelix-objects-v2")
50+
private const val NEW_LINE_IN_VERSION_DELTA_STREAM_V2 = "\n"
51+
52+
/**
53+
* Magic byte (as string) that indicates the end of one data line/all streaming data.
54+
* Use `$`/`~` because it:
55+
* (1) encodes as a single byte in UTF8 and therefore cannot be sent partially.
56+
* (2) is not a first character in serialized value (see. [IKVValue.serialize]).
57+
* (3) it is a char that would be URL-encoded in object data.
58+
*/
59+
private const val MAGIC_BYTE_FOR_END_OF_DATA_LINE = "$"
60+
private const val MAGIC_BYTE_FOR_END_OF_VERSION_DELTA = "~"
61+
62+
private suspend fun encodeLine(output: ByteWriteChannel, line: String) {
63+
output.writeStringUtf8(line)
64+
output.writeStringUtf8(NEW_LINE_IN_VERSION_DELTA_STREAM_V2)
65+
output.writeStringUtf8(MAGIC_BYTE_FOR_END_OF_DATA_LINE)
66+
output.writeStringUtf8(NEW_LINE_IN_VERSION_DELTA_STREAM_V2)
67+
}
68+
69+
suspend fun encodeVersionDeltaStreamV2(
70+
output: ByteWriteChannel,
71+
versionHash: String,
72+
hashesWithDeltaObject: Flow<Pair<String, String>>,
73+
) {
74+
encodeLine(output, versionHash)
75+
hashesWithDeltaObject.collect { (hash, deltaObject) ->
76+
encodeLine(output, hash)
77+
encodeLine(output, deltaObject)
78+
}
79+
output.writeStringUtf8(MAGIC_BYTE_FOR_END_OF_VERSION_DELTA)
80+
}
81+
82+
private suspend fun decodeLine(input: ByteReadChannel): String? {
83+
val dataLine = input.readUTF8Line()
84+
when (dataLine) {
85+
null -> throw IncompleteData("Missing data line")
86+
MAGIC_BYTE_FOR_END_OF_VERSION_DELTA -> return null
87+
}
88+
val endOfDataLine = input.readUTF8Line()
89+
if (endOfDataLine != MAGIC_BYTE_FOR_END_OF_DATA_LINE) {
90+
throw IncompleteData("Missing end of data line [dataLine=`$dataLine`] [endOfDataLine=`$endOfDataLine`]")
91+
}
92+
return dataLine
93+
}
94+
95+
/**
96+
* @throws IncompleteData if data is detected to be incomplete
97+
*/
98+
suspend fun decodeVersionDeltaStreamV2(input: ByteReadChannel): VersionDeltaStreamV2 {
99+
val versionHash = decodeLine(input) ?: throw IncompleteData("Version hash missing.")
100+
val hashesWithDeltaObject = flow {
101+
while (true) {
102+
val hash = decodeLine(input) ?: break
103+
val deltaObject = decodeLine(input) ?: throw IncompleteData("Missing delta object.")
104+
emit(hash to deltaObject)
105+
}
106+
}
107+
return VersionDeltaStreamV2(versionHash, hashesWithDeltaObject)
108+
}
109+
}
110+
}
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.server.api.v2
18+
19+
import io.ktor.util.cio.use
20+
import io.ktor.util.toByteArray
21+
import io.ktor.utils.io.ByteChannel
22+
import io.ktor.utils.io.writeFully
23+
import io.ktor.utils.io.writeStringUtf8
24+
import kotlinx.coroutines.flow.emptyFlow
25+
import kotlinx.coroutines.flow.flowOf
26+
import kotlinx.coroutines.flow.toList
27+
import kotlinx.coroutines.test.runTest
28+
import org.modelix.model.server.api.v2.VersionDeltaStreamV2.Companion.IncompleteData
29+
import org.modelix.model.server.api.v2.VersionDeltaStreamV2.Companion.decodeVersionDeltaStreamV2
30+
import org.modelix.model.server.api.v2.VersionDeltaStreamV2.Companion.encodeVersionDeltaStreamV2
31+
import kotlin.test.Test
32+
import kotlin.test.assertEquals
33+
import kotlin.test.assertFailsWith
34+
35+
class VersionDeltaStreamV2Test {
36+
37+
@Test
38+
fun parsesStreamWithoutObjects() = runTest {
39+
val channel = ByteChannel()
40+
channel.use {
41+
encodeVersionDeltaStreamV2(channel, "CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A", emptyFlow())
42+
}
43+
val versionDeltaStream = decodeVersionDeltaStreamV2(channel)
44+
45+
assertEquals("CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A", versionDeltaStream.versionHash)
46+
assertEquals(emptyList(), versionDeltaStream.hashesWithDeltaObject.toList())
47+
}
48+
49+
@Test
50+
fun parsesStreamWithObjects() = runTest {
51+
val channel = ByteChannel()
52+
channel.use {
53+
encodeVersionDeltaStreamV2(
54+
channel,
55+
"CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A",
56+
flowOf(
57+
"r7k0y*p0mmIhhD46RvqLsmTEGuBQvAf9hw7aN0IzihLc" to "L/100000017/xioDt*mnraICBf48DpWkvvtl2KuPixWn1p7yteYQ3XSg",
58+
"CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A" to "1/%00/0/%00///",
59+
),
60+
)
61+
}
62+
63+
val versionDeltaStream = decodeVersionDeltaStreamV2(channel)
64+
65+
assertEquals("CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A", versionDeltaStream.versionHash)
66+
val expectedObjects = listOf(
67+
"r7k0y*p0mmIhhD46RvqLsmTEGuBQvAf9hw7aN0IzihLc" to "L/100000017/xioDt*mnraICBf48DpWkvvtl2KuPixWn1p7yteYQ3XSg",
68+
"CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A" to "1/%00/0/%00///",
69+
)
70+
assertEquals(expectedObjects, versionDeltaStream.hashesWithDeltaObject.toList())
71+
}
72+
73+
@Test
74+
fun failsToParseEmptyStream() = runTest {
75+
val data = ""
76+
val channel = ByteChannel()
77+
channel.use {
78+
writeStringUtf8(data)
79+
}
80+
81+
val exception = assertFailsWith<IncompleteData> {
82+
decodeVersionDeltaStreamV2(channel).hashesWithDeltaObject.toList()
83+
}
84+
assertEquals("Missing data line", exception.message)
85+
}
86+
87+
@Test
88+
fun failsToParseIncompleteLine() = runTest {
89+
val data = "CTVRw*a6KXJ4o7uzGlp"
90+
val channel = ByteChannel()
91+
channel.use {
92+
writeStringUtf8(data)
93+
}
94+
95+
val exception = assertFailsWith<IncompleteData> {
96+
decodeVersionDeltaStreamV2(channel).hashesWithDeltaObject.toList()
97+
}
98+
assertEquals(
99+
"Missing end of data line [dataLine=`CTVRw*a6KXJ4o7uzGlp`] [endOfDataLine=`null`]",
100+
exception.message,
101+
)
102+
}
103+
104+
@Test
105+
fun failsToParseStreamWithWrongEndOfDataLine() = runTest {
106+
val data = "CTVRw*a6KXJ4o7uzGlp\n%"
107+
val channel = ByteChannel()
108+
channel.use {
109+
writeStringUtf8(data)
110+
}
111+
112+
val exception = assertFailsWith<IncompleteData> {
113+
decodeVersionDeltaStreamV2(channel).hashesWithDeltaObject.toList()
114+
}
115+
assertEquals(
116+
"Missing end of data line [dataLine=`CTVRw*a6KXJ4o7uzGlp`] [endOfDataLine=`%`]",
117+
exception.message,
118+
)
119+
}
120+
121+
@Test
122+
fun failsToParseStreamWithoutVersionHash() = runTest {
123+
val data = "~"
124+
val channel = ByteChannel()
125+
channel.use {
126+
writeStringUtf8(data)
127+
}
128+
129+
val exception = assertFailsWith<IncompleteData> {
130+
decodeVersionDeltaStreamV2(channel).hashesWithDeltaObject.toList()
131+
}
132+
assertEquals("Version hash missing.", exception.message)
133+
}
134+
135+
@Test
136+
fun failsParsingStreamWithMissingDataAfterVersionHash() = runTest {
137+
val data = "CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A\n$\n"
138+
val channel = ByteChannel()
139+
channel.use {
140+
writeStringUtf8(data)
141+
}
142+
143+
val exception = assertFailsWith<IncompleteData> {
144+
decodeVersionDeltaStreamV2(channel).hashesWithDeltaObject.toList()
145+
}
146+
assertEquals("Missing data line", exception.message)
147+
}
148+
149+
@Test
150+
fun failsParsingStreamWithMissingObject() = runTest {
151+
val data = "CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A\n$\n" +
152+
"r7k0y*p0mmIhhD46RvqLsmTEGuBQvAf9hw7aN0IzihLc\n$\n" +
153+
"L/100000017/xioDt*mnraICBf48DpWkvvtl2KuPixWn1p7yteYQ3XSg\n$\n" +
154+
"CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A\n$\n" +
155+
"~"
156+
157+
val channel = ByteChannel()
158+
channel.use {
159+
writeStringUtf8(data)
160+
}
161+
162+
val emittedObjects = mutableListOf<Pair<String, String>>()
163+
val exception = assertFailsWith<IncompleteData> {
164+
decodeVersionDeltaStreamV2(channel).hashesWithDeltaObject.collect(emittedObjects::add)
165+
}
166+
assertEquals("Missing delta object.", exception.message)
167+
assertEquals(
168+
listOf("r7k0y*p0mmIhhD46RvqLsmTEGuBQvAf9hw7aN0IzihLc" to "L/100000017/xioDt*mnraICBf48DpWkvvtl2KuPixWn1p7yteYQ3XSg"),
169+
emittedObjects,
170+
)
171+
}
172+
173+
@Test
174+
fun failsToParseAnySubstringOfAValidEncoding() = runTest {
175+
val channel = ByteChannel()
176+
channel.use {
177+
encodeVersionDeltaStreamV2(
178+
channel,
179+
"CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A",
180+
flowOf(
181+
"r7k0y*p0mmIhhD46RvqLsmTEGuBQvAf9hw7aN0IzihLc" to "L/100000017/xioDt*mnraICBf48DpWkvvtl2KuPixWn1p7yteYQ3XSg",
182+
"CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4fUpHnLokG9T86A" to "1/%00/0/%00///",
183+
),
184+
)
185+
}
186+
val fullByteArray = channel.toByteArray()
187+
188+
for (newSize in fullByteArray.indices) {
189+
val subByteArray = fullByteArray.copyOf(newSize)
190+
val subChannel = ByteChannel()
191+
subChannel.use {
192+
subChannel.writeFully(subByteArray)
193+
}
194+
195+
assertFailsWith<IncompleteData> {
196+
decodeVersionDeltaStreamV2(subChannel).hashesWithDeltaObject.toList()
197+
}
198+
}
199+
}
200+
}

model-server-openapi/specifications/model-server.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,9 @@ components:
603603
'application/x-modelix-objects':
604604
schema:
605605
type: string
606+
'application/x-modelix-objects-v2':
607+
schema:
608+
type: string
606609
'application/json':
607610
schema:
608611
type: object

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ import org.modelix.model.operations.OTBranch
6565
import org.modelix.model.persistent.HashUtil
6666
import org.modelix.model.server.api.v2.VersionDelta
6767
import org.modelix.model.server.api.v2.VersionDeltaStream
68+
import org.modelix.model.server.api.v2.VersionDeltaStreamV2
6869
import org.modelix.model.server.store.IStoreClient
6970
import org.modelix.model.server.store.LocalModelClient
7071
import org.modelix.modelql.server.ModelQLServer
@@ -365,12 +366,14 @@ class ModelReplicationServer(
365366

366367
private suspend fun ApplicationCall.respondDelta(versionHash: String, baseVersionHash: String?) {
367368
val expectedTypes = request.acceptItems().map { ContentType.parse(it.value) }
368-
return if (expectedTypes.any { it.match(VersionDeltaStream.CONTENT_TYPE) }) {
369-
respondDeltaAsObjectStream(versionHash, baseVersionHash, false)
369+
return if (expectedTypes.any { it.match(VersionDeltaStreamV2.CONTENT_TYPE) }) {
370+
respondDeltaAsObjectStreamV2(versionHash, baseVersionHash)
371+
} else if (expectedTypes.any { it.match(VersionDeltaStream.CONTENT_TYPE) }) {
372+
respondDeltaAsObjectStreamV1(versionHash, baseVersionHash, false)
370373
} else if (expectedTypes.any { it.match(ContentType.Application.Json) }) {
371374
respondDeltaAsJson(versionHash, baseVersionHash)
372375
} else {
373-
respondDeltaAsObjectStream(versionHash, baseVersionHash, true)
376+
respondDeltaAsObjectStreamV1(versionHash, baseVersionHash, true)
374377
}
375378
}
376379

@@ -383,7 +386,7 @@ class ModelReplicationServer(
383386
respond(delta)
384387
}
385388

386-
private suspend fun ApplicationCall.respondDeltaAsObjectStream(
389+
private suspend fun ApplicationCall.respondDeltaAsObjectStreamV1(
387390
versionHash: String,
388391
baseVersionHash: String?,
389392
plainText: Boolean,
@@ -406,6 +409,15 @@ class ModelReplicationServer(
406409
}
407410
}
408411
}
412+
413+
private suspend fun ApplicationCall.respondDeltaAsObjectStreamV2(versionHash: String, baseVersionHash: String?) {
414+
val objectData = repositoriesManager.computeDelta(versionHash, baseVersionHash)
415+
respondBytesWriter(VersionDeltaStreamV2.CONTENT_TYPE) {
416+
this.useClosingWithoutCause {
417+
VersionDeltaStreamV2.encodeVersionDeltaStreamV2(this, versionHash, objectData.asFlow())
418+
}
419+
}
420+
}
409421
}
410422

411423
/**

0 commit comments

Comments
 (0)