Skip to content

Commit 5328bdb

Browse files
author
Oleksandr Dzhychko
committed
fix(model-client): detect incomplete version delta streams
Use a new version of the delta stream format that enables the client to properly detect incomplete delta version streams.
1 parent 28fd007 commit 5328bdb

File tree

3 files changed

+71
-24
lines changed

3 files changed

+71
-24
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ import org.modelix.model.persistent.HashUtil
6565
import org.modelix.model.persistent.MapBasedStore
6666
import org.modelix.model.server.api.v2.VersionDelta
6767
import org.modelix.model.server.api.v2.VersionDeltaStream
68+
import org.modelix.model.server.api.v2.VersionDeltaStreamV2
6869
import org.modelix.model.server.api.v2.asStream
6970
import org.modelix.modelql.client.ModelQLClient
7071
import org.modelix.modelql.core.IMonoStep
@@ -491,32 +492,47 @@ private fun URLBuilder.appendPathSegmentsEncodingSlash(vararg components: String
491492
fun VersionDelta.getAllObjects(): Map<String, String> = objectsMap + objects.associateBy { HashUtil.sha256(it) }
492493

493494
suspend fun HttpResponse.readVersionDelta(): VersionDeltaStream {
494-
return if (contentType()?.match(VersionDeltaStream.CONTENT_TYPE) == true) {
495-
val content = bodyAsChannel()
496-
val versionHash = checkNotNull(content.readUTF8Line()) { "No objects received" }
497-
val versionObject = content.readUTF8Line()
498-
return if (versionObject == null) {
499-
VersionDeltaStream(versionHash, emptyFlow())
500-
} else {
501-
VersionDeltaStream(
502-
versionHash,
503-
flow {
504-
emit(versionHash to versionObject)
505-
while (true) {
506-
val key = content.readUTF8Line() ?: break
507-
val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" }
508-
emit(key to value)
509-
}
510-
},
511-
)
512-
}
495+
val parsedContentType = contentType()
496+
return if (parsedContentType?.match(VersionDeltaStreamV2.CONTENT_TYPE) == true) {
497+
return readVersionDeltaStreamV2()
498+
} else if (parsedContentType?.match(VersionDeltaStream.CONTENT_TYPE) == true) {
499+
return readVersionDeltaStreamV1()
513500
} else {
514501
body<VersionDelta>().asStream()
515502
}
516503
}
517504

505+
private suspend fun HttpResponse.readVersionDeltaStreamV1(): VersionDeltaStream {
506+
val content = bodyAsChannel()
507+
val versionHash = checkNotNull(content.readUTF8Line()) { "No objects received" }
508+
val versionObject = content.readUTF8Line()
509+
return if (versionObject == null) {
510+
VersionDeltaStream(versionHash, emptyFlow())
511+
} else {
512+
VersionDeltaStream(
513+
versionHash,
514+
flow {
515+
emit(versionHash to versionObject)
516+
while (true) {
517+
val key = content.readUTF8Line() ?: break
518+
val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" }
519+
emit(key to value)
520+
}
521+
},
522+
)
523+
}
524+
}
525+
526+
private suspend fun HttpResponse.readVersionDeltaStreamV2(): VersionDeltaStream {
527+
val content = bodyAsChannel()
528+
val decodeVersionDeltaStreamV2 = VersionDeltaStreamV2.decodeVersionDeltaStreamV2(content)
529+
return VersionDeltaStream(decodeVersionDeltaStreamV2.versionHash, decodeVersionDeltaStreamV2.hashesWithDeltaObject)
530+
}
531+
518532
fun HttpRequestBuilder.useVersionStreamFormat() {
519-
headers.set(HttpHeaders.Accept, VersionDeltaStream.CONTENT_TYPE.toString())
533+
headers[HttpHeaders.Accept] = VersionDeltaStreamV2.CONTENT_TYPE.toString()
534+
// Add CONTENT_TYPE_VERSION_DELTA_V1 so that newer clients cant talk with older servers.
535+
headers.append(HttpHeaders.Accept, VersionDeltaStream.CONTENT_TYPE.toString())
520536
}
521537

522538
/**

model-client/src/commonTest/kotlin/org/modelix/model/client2/ModelClientV2Test.kt

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@ package org.modelix.model.client2
1717

1818
import io.ktor.client.HttpClient
1919
import io.ktor.client.engine.mock.MockEngine
20+
import io.ktor.client.engine.mock.respond
2021
import io.ktor.client.engine.mock.respondError
22+
import io.ktor.http.HttpHeaders
2123
import io.ktor.http.HttpStatusCode
24+
import io.ktor.http.headersOf
2225
import kotlinx.coroutines.CancellationException
2326
import kotlinx.coroutines.test.runTest
27+
import org.modelix.model.lazy.RepositoryId
28+
import org.modelix.model.server.api.v2.VersionDeltaStreamV2
2429
import kotlin.test.Test
30+
import kotlin.test.assertEquals
2531
import kotlin.test.assertFailsWith
2632

2733
class ModelClientV2Test {
@@ -38,8 +44,30 @@ class ModelClientV2Test {
3844
.url(url)
3945
.build()
4046
modelClient.close()
41-
assertFailsWith<CancellationException>("Parent job is Completed") {
47+
val exception = assertFailsWith<CancellationException> {
4248
modelClient.init()
4349
}
50+
assertEquals("Parent job is Completed", exception.message)
51+
}
52+
53+
@Test
54+
fun detectIncompleteDataInVersionDeltaStreamV2() = runTest {
55+
val incompleteData = "CTVRw*a6KXJ4o7uzGlp-kUosxpyRf4f"
56+
val repositoryId = RepositoryId("aRepositoryId")
57+
val branchRef = repositoryId.getBranchReference("main")
58+
val url = "http://localhost/v2"
59+
val mockEngine = MockEngine { requestData ->
60+
assertEquals(VersionDeltaStreamV2.CONTENT_TYPE.toString(), requestData.headers[HttpHeaders.Accept])
61+
respond(incompleteData, HttpStatusCode.OK, headersOf(HttpHeaders.ContentType, VersionDeltaStreamV2.CONTENT_TYPE.toString()))
62+
}
63+
val httpClient = HttpClient(mockEngine)
64+
val modelClient = ModelClientV2.builder()
65+
.client(httpClient)
66+
.url(url)
67+
.build()
68+
69+
assertFailsWith<VersionDeltaStreamV2.Companion.IncompleteData> {
70+
modelClient.pull(branchRef, null)
71+
}
4472
}
4573
}

model-client/src/jvmTest/kotlin/org/modelix/model/client2/ModelClientV2JvmTest.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import io.ktor.http.HttpStatusCode
2222
import kotlinx.coroutines.CancellationException
2323
import kotlinx.coroutines.test.runTest
2424
import kotlin.test.Test
25+
import kotlin.test.assertEquals
2526
import kotlin.test.assertFailsWith
2627

2728
class ModelClientV2JvmTest {
@@ -40,13 +41,15 @@ class ModelClientV2JvmTest {
4041
// Implementing `close` allow to use `.use` method.
4142
modelClient.use {
4243
}
43-
assertFailsWith<CancellationException>("Parent job is Completed") {
44+
val firstException = assertFailsWith<CancellationException> {
4445
modelClient.init()
4546
}
46-
// `Closable` implies, that `.close` method is idempotent.
47+
assertEquals("Parent job is Completed", firstException.message)
48+
// `Closable` implies that `.close` method is idempotent.
4749
modelClient.close()
48-
assertFailsWith<CancellationException>("Parent job is Completed") {
50+
val secondException = assertFailsWith<CancellationException> {
4951
modelClient.init()
5052
}
53+
assertEquals("Parent job is Completed", secondException.message)
5154
}
5255
}

0 commit comments

Comments
 (0)