Skip to content

Commit 4800396

Browse files
author
Oleksandr Dzhychko
committed
perf(model-server): avoid converting object maps to object flows and vice versa
1 parent 0d19abf commit 4800396

File tree

2 files changed

+77
-46
lines changed

2 files changed

+77
-46
lines changed

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import kotlinx.coroutines.Dispatchers
3939
import kotlinx.coroutines.Job
4040
import kotlinx.coroutines.flow.Flow
4141
import kotlinx.coroutines.flow.flow
42-
import kotlinx.coroutines.flow.onEach
4342
import kotlinx.coroutines.flow.onEmpty
4443
import kotlinx.coroutines.flow.withIndex
4544
import kotlinx.coroutines.withContext
@@ -61,7 +60,6 @@ import org.modelix.model.operations.OTBranch
6160
import org.modelix.model.persistent.HashUtil
6261
import org.modelix.model.server.api.v2.VersionDelta
6362
import org.modelix.model.server.api.v2.VersionDeltaStream
64-
import org.modelix.model.server.api.v2.toMap
6563
import org.modelix.model.server.store.IStoreClient
6664
import org.modelix.model.server.store.LocalModelClient
6765
import org.modelix.modelql.server.ModelQLServer
@@ -231,9 +229,8 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
231229
val delta = VersionDelta(
232230
newVersionHash,
233231
lastVersionHash,
234-
objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).toMap(),
232+
objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).asMap(),
235233
)
236-
delta.checkObjectHashes()
237234
send(Json.encodeToString(delta))
238235
lastVersionHash = newVersionHash
239236
}
@@ -370,16 +367,14 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
370367
val delta = VersionDelta(
371368
versionHash,
372369
baseVersionHash,
373-
objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).toMap(),
370+
objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).asMap(),
374371
)
375-
delta.checkObjectHashes()
376372
respond(delta)
377373
}
378374

379375
private suspend fun ApplicationCall.respondDeltaAsObjectStream(versionHash: String, baseVersionHash: String?, plainText: Boolean) {
380376
respondTextWriter(contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE) {
381-
repositoriesManager.computeDelta(versionHash, baseVersionHash)
382-
.checkObjectHashes()
377+
repositoriesManager.computeDelta(versionHash, baseVersionHash).asFlow()
383378
.flatten()
384379
.withSeparator("\n")
385380
.onEmpty { emit(versionHash) }
@@ -410,7 +405,3 @@ private fun Flow<String>.withSeparator(separator: String) = flow {
410405
emit(it)
411406
}
412407
}
413-
414-
private fun <V : String?> Flow<Pair<String, V>>.checkObjectHashes(): Flow<Pair<String, V>> {
415-
return onEach { HashUtil.checkObjectHash(it.first, it.second) }
416-
}

model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt

Lines changed: 74 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ package org.modelix.model.server.handlers
1616
import kotlinx.coroutines.flow.Flow
1717
import kotlinx.coroutines.flow.asFlow
1818
import kotlinx.coroutines.flow.channelFlow
19-
import kotlinx.coroutines.flow.emptyFlow
2019
import kotlinx.coroutines.flow.map
20+
import kotlinx.coroutines.flow.onEach
2121
import kotlinx.coroutines.runBlocking
2222
import kotlinx.datetime.Clock
2323
import org.apache.commons.collections4.map.LRUMap
@@ -30,6 +30,7 @@ import org.modelix.model.api.ITree
3030
import org.modelix.model.api.IdGeneratorDummy
3131
import org.modelix.model.api.PBranch
3232
import org.modelix.model.api.runSynchronized
33+
import org.modelix.model.client2.checkObjectHashes
3334
import org.modelix.model.lazy.BranchReference
3435
import org.modelix.model.lazy.CLTree
3536
import org.modelix.model.lazy.CLVersion
@@ -39,6 +40,8 @@ import org.modelix.model.lazy.RepositoryId
3940
import org.modelix.model.lazy.computeDelta
4041
import org.modelix.model.metameta.MetaModelBranch
4142
import org.modelix.model.persistent.CPVersion
43+
import org.modelix.model.persistent.HashUtil
44+
import org.modelix.model.server.api.v2.toMap
4245
import org.modelix.model.server.store.IStoreClient
4346
import org.modelix.model.server.store.LocalModelClient
4447
import org.modelix.model.server.store.pollEntry
@@ -255,39 +258,12 @@ class RepositoriesManager(val client: LocalModelClient) {
255258
?: throw IllegalStateException("No version found for branch '${branch.branchName}' in repository '${branch.repositoryId}'")
256259
}
257260

258-
private val deltaCache = LRUMap<Pair<String, String?>, SoftReference<Lazy<Map<String, String>>>>(10)
259-
fun computeDelta(versionHash: String, baseVersionHash: String?): Flow<Pair<String, String>> {
260-
if (versionHash == baseVersionHash) return emptyFlow()
261+
private val deltaCache = LRUMap<Pair<String, String?>, SoftReference<Lazy<ObjectDataMap>>>(10)
262+
fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData {
263+
if (versionHash == baseVersionHash) return ObjectData.empty
261264
if (baseVersionHash == null) {
262265
// no need to cache anything if there is no delta computation happening
263-
264-
return channelFlow {
265-
val version = CLVersion(versionHash, objectStore)
266-
// Use a bulk query to make as few request to the underlying store as possible.
267-
val bulkQuery = objectStore.newBulkQuery()
268-
// It is unsatisfactory that we have to keep already emitted hashes in memory.
269-
// But without changing the underlying model,
270-
// we have to do this to not emit objects more than once.
271-
val seenHashes = mutableSetOf<String>()
272-
fun emitObjects(entry: KVEntryReference<*>) {
273-
if (seenHashes.contains(entry.getHash())) return
274-
seenHashes.add(entry.getHash())
275-
bulkQuery.get(entry).onSuccess {
276-
val value = checkNotNull(it) { "No value received for ${entry.getHash()}" }
277-
// Use `send` instead of `trySend`,
278-
// because `trySend` fails if the channel capacity is full.
279-
// This might happen if the data is produced faster than consumed.
280-
// A better solution would be to have bulk queries which itself are asynchronous
281-
// but doing that needs more consideration.
282-
runBlocking { channel.send(entry.getHash() to value.serialize()) }
283-
for (referencedEntry in value.getReferencedEntries()) {
284-
emitObjects(referencedEntry)
285-
}
286-
}
287-
}
288-
emitObjects(KVEntryReference(versionHash, CPVersion.DESERIALIZER))
289-
bulkQuery.process()
290-
}
266+
return allObjectDataAsFlow(versionHash)
291267
}
292268

293269
return runSynchronized(deltaCache) {
@@ -297,9 +273,47 @@ class RepositoriesManager(val client: LocalModelClient) {
297273
// SoftReference because deltas can be very large
298274
val version = CLVersion(versionHash, client.storeCache)
299275
val baseVersion = CLVersion(baseVersionHash, client.storeCache)
300-
version.computeDelta(baseVersion)
276+
val objectsMap = version.computeDelta(baseVersion)
277+
ObjectDataMap(objectsMap)
301278
}.also { deltaCache[key] = SoftReference(it) }
302-
}.value.entries.asFlow().map { it.toPair() }
279+
}.value
280+
}
281+
282+
private fun allObjectDataAsFlow(versionHash: String): ObjectDataFlow {
283+
val hashObjectFlow = channelFlow {
284+
val version = CLVersion(versionHash, objectStore)
285+
// Use a bulk query to make as few request to the underlying store as possible.
286+
val bulkQuery = objectStore.newBulkQuery()
287+
// It is unsatisfactory that we have to keep already emitted hashes in memory.
288+
// But without changing the underlying model,
289+
// we have to do this to not emit objects more than once.
290+
val seenHashes = mutableSetOf<String>()
291+
fun emitObjects(entry: KVEntryReference<*>) {
292+
if (seenHashes.contains(entry.getHash())) return
293+
seenHashes.add(entry.getHash())
294+
bulkQuery.get(entry).onSuccess {
295+
val value = checkNotNull(it) { "No value received for ${entry.getHash()}" }
296+
// Use `send` instead of `trySend`,
297+
// because `trySend` fails if the channel capacity is full.
298+
// This might happen if the data is produced faster than consumed.
299+
// A better solution would be to have bulk queries which itself are asynchronous
300+
// but doing that needs more consideration.
301+
runBlocking {
302+
// Maybe we should avoid Flow<Pair<String, String>> and use Flow<String>.
303+
// This needs profiling.
304+
channel.send(entry.getHash() to value.serialize())
305+
}
306+
for (referencedEntry in value.getReferencedEntries()) {
307+
emitObjects(referencedEntry)
308+
}
309+
}
310+
}
311+
emitObjects(KVEntryReference(versionHash, CPVersion.DESERIALIZER))
312+
bulkQuery.process()
313+
}
314+
val checkedHashObjectFlow = hashObjectFlow.checkObjectHashes()
315+
val objectData = ObjectDataFlow(checkedHashObjectFlow)
316+
return objectData
303317
}
304318

305319
private fun branchKey(branch: BranchReference): String {
@@ -342,3 +356,29 @@ class RepositoriesManager(val client: LocalModelClient) {
342356
}
343357

344358
class RepositoryAlreadyExistsException(val name: String) : IllegalStateException("Repository '$name' already exists")
359+
360+
sealed interface ObjectData {
361+
suspend fun asMap(): Map<String, String>
362+
fun asFlow(): Flow<Pair<String, String>>
363+
364+
companion object {
365+
val empty = ObjectDataMap(emptyMap())
366+
}
367+
}
368+
369+
class ObjectDataMap(private val byHashObjects: Map<String, String>) : ObjectData {
370+
init {
371+
HashUtil.checkObjectHashes(byHashObjects)
372+
}
373+
override suspend fun asMap(): Map<String, String> = byHashObjects
374+
override fun asFlow(): Flow<Pair<String, String>> = byHashObjects.entries.asFlow().map { it.toPair() }
375+
}
376+
377+
class ObjectDataFlow(private val hashObjectFlow: Flow<Pair<String, String>>) : ObjectData {
378+
override suspend fun asMap(): Map<String, String> = hashObjectFlow.toMap()
379+
override fun asFlow(): Flow<Pair<String, String>> = hashObjectFlow
380+
}
381+
382+
private fun Flow<Pair<String, String>>.checkObjectHashes(): Flow<Pair<String, String>> {
383+
return onEach { HashUtil.checkObjectHash(it.first, it.second) }
384+
}

0 commit comments

Comments
 (0)