Skip to content

Commit 144fab7

Browse files
committed
perf(model-server): transfer model objects as plain text instead of JSON
VersionDelta is basically just a map of objects and their SHA hashes. This can be serialized more efficiently with just a stream of strings separated by \n. Using flows/sequences/streams is more memory efficient, because the data can directly be streamed to the client without having to create this large JSON object. This should also fix the timeout issues, because the server already starts responding while still iterating the over the model.
1 parent c374e54 commit 144fab7

File tree

8 files changed

+217
-56
lines changed

8 files changed

+217
-56
lines changed

api/model-server.yaml

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ paths:
107107
"404":
108108
$ref: '#/components/responses/404'
109109
"200":
110-
description: OK
110+
$ref: '#/components/responses/versionDelta'
111111
# content:
112112
# '*/*':
113113
# schema:
@@ -137,11 +137,7 @@ paths:
137137
# required: true
138138
responses:
139139
"200":
140-
$ref: '#/components/responses/200'
141-
# content:
142-
# '*/*':
143-
# schema:
144-
# type: object
140+
$ref: '#/components/responses/versionDelta'
145141
/v2/repositories/{repository}/branches/{branch}/hash:
146142
get:
147143
operationId: getRepositoryBranchHash
@@ -247,11 +243,7 @@ paths:
247243
"500":
248244
$ref: '#/components/responses/500'
249245
"200":
250-
description: OK
251-
content:
252-
'*/*':
253-
schema:
254-
type: object
246+
$ref: '#/components/responses/versionDelta'
255247
/v2/repositories/{repository}/branches/{branch}/pollHash:
256248
get:
257249
operationId: pollRepositoryBranchHash
@@ -321,11 +313,7 @@ paths:
321313
"500":
322314
$ref: '#/components/responses/500'
323315
"200":
324-
description: OK
325-
content:
326-
'*/*':
327-
schema:
328-
type: object
316+
$ref: '#/components/responses/versionDelta'
329317
/v2/repositories/{repository}/versions/{versionHash}:
330318
get:
331319
operationId: getRepositoryVersionHash
@@ -414,11 +402,7 @@ paths:
414402
"404":
415403
$ref: '#/components/responses/404'
416404
"200":
417-
description: OK
418-
content:
419-
'*/*':
420-
schema:
421-
type: object
405+
$ref: '#/components/responses/versionDelta'
422406
/v2/versions/{versionHash}/history/{oldestVersionHash}:
423407
get:
424408
operationId: getOldestVersionHashForVersion
@@ -599,7 +583,18 @@ components:
599583
text/plain:
600584
schema:
601585
type: string
602-
586+
"versionDelta":
587+
description: OK
588+
content:
589+
'application/x-modelix-objects':
590+
schema:
591+
type: string
592+
'application/json':
593+
schema:
594+
type: object
595+
'text/plain':
596+
schema:
597+
type: string
603598
schemas:
604599
VersionDelta:
605600
title: VersionDelta

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ ktor-client-websockets = { group = "io.ktor", name = "ktor-client-websockets", v
7373
ktor-client-js = { group = "io.ktor", name = "ktor-client-js", version.ref = "ktor" }
7474
ktor-client-auth = { group = "io.ktor", name = "ktor-client-auth", version.ref = "ktor" }
7575

76+
ktor-serialization = { group = "io.ktor", name = "ktor-serialization", version.ref = "ktor" }
7677
ktor-serialization-json = { group = "io.ktor", name = "ktor-serialization-kotlinx-json", version.ref = "ktor" }
7778

7879
keycloak-authz-client = { group = "org.keycloak", name = "keycloak-authz-client", version = "23.0.3" }

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 89 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,28 @@ import io.ktor.client.plugins.HttpTimeout
2121
import io.ktor.client.plugins.ResponseException
2222
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
2323
import io.ktor.client.plugins.expectSuccess
24+
import io.ktor.client.request.HttpRequestBuilder
2425
import io.ktor.client.request.get
2526
import io.ktor.client.request.post
2627
import io.ktor.client.request.put
2728
import io.ktor.client.request.setBody
29+
import io.ktor.client.statement.HttpResponse
30+
import io.ktor.client.statement.bodyAsChannel
2831
import io.ktor.client.statement.bodyAsText
2932
import io.ktor.http.ContentType
33+
import io.ktor.http.HttpHeaders
3034
import io.ktor.http.HttpStatusCode
3135
import io.ktor.http.URLBuilder
3236
import io.ktor.http.appendPathSegments
3337
import io.ktor.http.contentType
3438
import io.ktor.http.takeFrom
3539
import io.ktor.serialization.kotlinx.json.json
40+
import io.ktor.utils.io.readUTF8Line
3641
import kotlinx.coroutines.CoroutineScope
42+
import kotlinx.coroutines.flow.Flow
43+
import kotlinx.coroutines.flow.emptyFlow
44+
import kotlinx.coroutines.flow.flow
3745
import kotlinx.coroutines.launch
38-
import kotlinx.serialization.json.Json
3946
import org.modelix.kotlin.utils.DeprecationInfo
4047
import org.modelix.model.IVersion
4148
import org.modelix.model.api.IIdGenerator
@@ -54,6 +61,8 @@ import org.modelix.model.operations.OTBranch
5461
import org.modelix.model.persistent.HashUtil
5562
import org.modelix.model.persistent.MapBasedStore
5663
import org.modelix.model.server.api.v2.VersionDelta
64+
import org.modelix.model.server.api.v2.VersionDeltaStream
65+
import org.modelix.model.server.api.v2.asStream
5766
import org.modelix.modelql.client.ModelQLClient
5867
import org.modelix.modelql.core.IMonoStep
5968
import kotlin.time.Duration
@@ -117,9 +126,9 @@ class ModelClientV2(
117126
takeFrom(baseUrl)
118127
appendPathSegmentsEncodingSlash("repositories", repository.id, "init")
119128
}
129+
useVersionStreamFormat()
120130
}
121-
val delta = response.body<VersionDelta>()
122-
return createVersion(null, delta)
131+
return createVersion(null, response.readVersionDelta())
123132
}
124133

125134
override suspend fun listRepositories(): List<RepositoryId> {
@@ -151,9 +160,9 @@ class ModelClientV2(
151160
parameters["lastKnown"] = (baseVersion as CLVersion).getContentHash()
152161
}
153162
}
163+
useVersionStreamFormat()
154164
}
155-
val delta = Json.decodeFromString<VersionDelta>(response.bodyAsText())
156-
return createVersion(baseVersion as CLVersion?, delta)
165+
return createVersion(baseVersion as CLVersion?, response.readVersionDelta())
157166
}
158167

159168
override suspend fun loadVersion(
@@ -169,9 +178,9 @@ class ModelClientV2(
169178
parameters["lastKnown"] = (baseVersion as CLVersion).getContentHash()
170179
}
171180
}
181+
useVersionStreamFormat()
172182
}
173-
val delta = Json.decodeFromString<VersionDelta>(response.bodyAsText())
174-
return createVersion(baseVersion as CLVersion?, delta)
183+
return createVersion(baseVersion as CLVersion?, response.readVersionDelta())
175184
}
176185

177186
override suspend fun push(branch: BranchReference, version: IVersion, baseVersion: IVersion?): IVersion {
@@ -193,11 +202,11 @@ class ModelClientV2(
193202
takeFrom(baseUrl)
194203
appendPathSegmentsEncodingSlash("repositories", branch.repositoryId.id, "branches", branch.branchName)
195204
}
205+
useVersionStreamFormat()
196206
contentType(ContentType.Application.Json)
197207
setBody(delta)
198208
}
199-
val mergedVersionDelta = response.body<VersionDelta>()
200-
return createVersion(version, mergedVersionDelta)
209+
return createVersion(version, response.readVersionDelta())
201210
}
202211

203212
private suspend fun uploadObjects(repository: RepositoryId, objects: Sequence<Pair<String, String>>) {
@@ -224,8 +233,9 @@ class ModelClientV2(
224233
parameters["lastKnown"] = lastKnownVersion.hash
225234
}
226235
}
236+
useVersionStreamFormat()
227237
}
228-
val receivedVersion = createVersion(lastKnownVersion, response.body())
238+
val receivedVersion = createVersion(lastKnownVersion, response.readVersionDelta())
229239
LOG.debug { "${clientId.toString(16)}.pull($branch, $lastKnownVersion) -> $receivedVersion" }
230240
return receivedVersion
231241
}
@@ -237,11 +247,12 @@ class ModelClientV2(
237247
takeFrom(baseUrl)
238248
appendPathSegmentsEncodingSlash("repositories", branch.repositoryId.id, "branches", branch.branchName)
239249
}
250+
useVersionStreamFormat()
240251
}
241252

242253
val receivedVersion = when (response.status) {
243254
HttpStatusCode.NotFound -> null
244-
HttpStatusCode.OK -> createVersion(null, response.body())
255+
HttpStatusCode.OK -> createVersion(null, response.readVersionDelta())
245256
else -> throw ResponseException(response, response.bodyAsText())
246257
}
247258
LOG.debug { "${clientId.toString(16)}.pullIfExists($branch) -> $receivedVersion" }
@@ -284,8 +295,9 @@ class ModelClientV2(
284295
parameters["lastKnown"] = lastKnownVersion.hash
285296
}
286297
}
298+
useVersionStreamFormat()
287299
}
288-
val receivedVersion = createVersion(lastKnownVersion, response.body())
300+
val receivedVersion = createVersion(lastKnownVersion, response.readVersionDelta())
289301
LOG.debug { "${clientId.toString(16)}.poll($branch, $lastKnownVersion) -> $receivedVersion" }
290302
return receivedVersion
291303
}
@@ -310,24 +322,78 @@ class ModelClientV2(
310322
httpClient.close()
311323
}
312324

313-
private fun createVersion(baseVersion: CLVersion?, delta: VersionDelta): CLVersion {
325+
private suspend fun HttpResponse.readVersionDelta(): VersionDeltaStream {
326+
return if (contentType()?.match(VersionDeltaStream.CONTENT_TYPE) == true) {
327+
val content = bodyAsChannel()
328+
val versionHash = checkNotNull(content.readUTF8Line()) { "No objects received" }
329+
val versionObject = content.readUTF8Line()
330+
return if (versionObject == null) {
331+
VersionDeltaStream(versionHash, emptyFlow())
332+
} else {
333+
VersionDeltaStream(
334+
versionHash,
335+
flow {
336+
emit(versionHash to versionObject)
337+
while (true) {
338+
val key = content.readUTF8Line() ?: break
339+
val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" }
340+
emit(key to value)
341+
}
342+
},
343+
)
344+
}
345+
} else {
346+
body<VersionDelta>().asStream()
347+
}
348+
}
349+
350+
private suspend fun createVersion(baseVersion: CLVersion?, delta: VersionDeltaStream): CLVersion {
351+
delta.getObjectsAsFlow().collect {
352+
HashUtil.checkObjectHash(it.first, it.second)
353+
store.keyValueStore.put(it.first, it.second)
354+
}
355+
return if (baseVersion == null) {
356+
CLVersion(delta.versionHash, store)
357+
} else if (delta.versionHash == baseVersion.getContentHash()) {
358+
baseVersion
359+
} else {
360+
require(baseVersion.store == store) { "baseVersion was not created by this client" }
361+
CLVersion(delta.versionHash, store)
362+
}
363+
}
364+
365+
private suspend fun createVersion(baseVersion: CLVersion?, delta: Flow<String>): CLVersion {
366+
var firstHash: String? = null
367+
var isHash = true
368+
var lastHash: String? = null
369+
delta.collect {
370+
if (isHash) {
371+
lastHash = it
372+
if (firstHash == null) {
373+
firstHash = it
374+
}
375+
} else {
376+
val value = it
377+
store.keyValueStore.put(lastHash!!, value)
378+
}
379+
isHash = !isHash
380+
}
381+
val versionHash = checkNotNull(firstHash) { "No objects received" }
382+
314383
return if (baseVersion == null) {
315-
CLVersion(
316-
delta.versionHash,
317-
store.also { it.keyValueStore.putAll(delta.getAllObjects()) },
318-
)
319-
} else if (delta.versionHash == baseVersion.hash) {
384+
CLVersion(versionHash, store)
385+
} else if (versionHash == baseVersion.getContentHash()) {
320386
baseVersion
321387
} else {
322388
require(baseVersion.store == store) { "baseVersion was not created by this client" }
323-
store.keyValueStore.putAll(delta.getAllObjects())
324-
CLVersion(
325-
delta.versionHash,
326-
baseVersion.store,
327-
)
389+
CLVersion(versionHash, store)
328390
}
329391
}
330392

393+
private fun HttpRequestBuilder.useVersionStreamFormat() {
394+
headers.set(HttpHeaders.Accept, VersionDeltaStream.CONTENT_TYPE.toString())
395+
}
396+
331397
companion object {
332398
private val LOG = mu.KotlinLogging.logger {}
333399
fun builder(): ModelClientV2Builder = ModelClientV2PlatformSpecificBuilder()

model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ private fun trackAccessedEntries(store: IKeyValueStore, body: (IDeserializingKey
398398
}
399399

400400
private class AccessTrackingStore(val store: IKeyValueStore) : IKeyValueStore {
401-
val accessedEntries: MutableMap<String, String?> = HashMap()
401+
val accessedEntries: MutableMap<String, String?> = LinkedHashMap()
402402

403403
override fun newBulkQuery(deserializingCache: IDeserializingKeyValueStore): IBulkQuery {
404404
return store.newBulkQuery(deserializingCache)

model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/HashUtil.kt

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,17 @@ object HashUtil {
4040

4141
fun checkObjectHashes(entries: Map<String, String?>) {
4242
for (entry in entries) {
43-
val value = entry.value ?: continue
44-
if (!isSha256(entry.key)) continue
45-
val computedHash = sha256(value)
46-
val providedHash = entry.key
47-
require(computedHash == providedHash) {
48-
val bytes = value.encodeToByteArray(throwOnInvalidSequence = true)
49-
"Provided hash $providedHash doesn't match the computed hash $computedHash for value: $value\n Value as ByteArray$bytes"
50-
}
43+
checkObjectHash(entry.key, entry.value)
44+
}
45+
}
46+
47+
fun checkObjectHash(providedHash: String, value: String?) {
48+
if (value == null) return
49+
if (!isSha256(providedHash)) return
50+
val computedHash = sha256(value)
51+
require(computedHash == providedHash) {
52+
val bytes = value.encodeToByteArray(throwOnInvalidSequence = true)
53+
"Provided hash $providedHash doesn't match the computed hash $computedHash for value: $value\n Value as ByteArray$bytes"
5154
}
5255
}
5356
}

model-server-api/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ kotlin {
2626
implementation(kotlin("stdlib-common"))
2727
implementation(libs.kotlin.logging)
2828
implementation(libs.kotlin.serialization.json)
29+
api(libs.kotlin.coroutines.core)
30+
api(libs.ktor.serialization)
2931
// implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
3032

3133
// implementation("io.ktor:ktor-client-core:$ktorVersion")

model-server-api/src/commonMain/kotlin/org/modelix/model/server/api/v2/VersionDelta.kt

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
*/
1414
package org.modelix.model.server.api.v2
1515

16+
import io.ktor.http.ContentType
17+
import kotlinx.coroutines.flow.Flow
18+
import kotlinx.coroutines.flow.asFlow
19+
import kotlinx.coroutines.flow.emptyFlow
1620
import kotlinx.serialization.Serializable
1721

1822
@Serializable
@@ -23,3 +27,34 @@ class VersionDelta(
2327
val objects: Set<String> = emptySet(),
2428
val objectsMap: Map<String, String> = emptyMap(),
2529
)
30+
31+
fun VersionDelta.asStream(): VersionDeltaStream {
32+
require(objects.isEmpty()) { "Legacy serialization not supported" }
33+
return VersionDeltaStream(
34+
versionHash = versionHash,
35+
objectsFlow = null,
36+
objectsSequence = objectsMap.asSequence().map { it.key to it.value },
37+
)
38+
}
39+
40+
class VersionDeltaStream(
41+
val versionHash: String,
42+
val objectsFlow: Flow<Pair<String, String>>? = null,
43+
val objectsSequence: Sequence<Pair<String, String>>? = null,
44+
) {
45+
companion object {
46+
val CONTENT_TYPE = ContentType("application", "x-modelix-objects")
47+
}
48+
49+
fun getObjectsAsFlow(): Flow<Pair<String, String>> {
50+
return objectsFlow ?: objectsSequence?.asFlow() ?: emptyFlow()
51+
}
52+
}
53+
54+
suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
55+
val map = LinkedHashMap<K, V>()
56+
collect {
57+
map[it.first] = it.second
58+
}
59+
return map
60+
}

0 commit comments

Comments
 (0)