13
13
*/
14
14
package org.modelix.model.server.handlers
15
15
16
+ import kotlinx.coroutines.Deferred
17
+ import kotlinx.coroutines.Dispatchers
18
+ import kotlinx.coroutines.async
16
19
import kotlinx.coroutines.flow.Flow
17
20
import kotlinx.coroutines.flow.asFlow
18
21
import kotlinx.coroutines.flow.channelFlow
19
22
import kotlinx.coroutines.flow.map
20
23
import kotlinx.coroutines.flow.onEach
24
+ import kotlinx.coroutines.launch
21
25
import kotlinx.coroutines.runBlocking
22
26
import kotlinx.datetime.Clock
23
27
import org.apache.commons.collections4.map.LRUMap
@@ -29,8 +33,6 @@ import org.modelix.model.api.IReadTransaction
29
33
import org.modelix.model.api.ITree
30
34
import org.modelix.model.api.IdGeneratorDummy
31
35
import org.modelix.model.api.PBranch
32
- import org.modelix.model.api.runSynchronized
33
- import org.modelix.model.client2.checkObjectHashes
34
36
import org.modelix.model.lazy.BranchReference
35
37
import org.modelix.model.lazy.CLTree
36
38
import org.modelix.model.lazy.CLVersion
@@ -46,6 +48,7 @@ import org.modelix.model.server.store.IStoreClient
46
48
import org.modelix.model.server.store.LocalModelClient
47
49
import org.modelix.model.server.store.pollEntry
48
50
import org.modelix.model.server.store.runTransactionSuspendable
51
+ import org.slf4j.LoggerFactory
49
52
import java.lang.ref.SoftReference
50
53
import java.util.UUID
51
54
@@ -258,58 +261,52 @@ class RepositoriesManager(val client: LocalModelClient) {
258
261
? : throw IllegalStateException (" No version found for branch '${branch.branchName} ' in repository '${branch.repositoryId} '" )
259
262
}
260
263
261
- private val deltaCache = LRUMap < Pair < String , String ?>, SoftReference < Lazy < ObjectDataMap >>>( 10 )
262
- fun computeDelta (versionHash : String , baseVersionHash : String? ): ObjectData {
264
+ private val versionDeltaCache = VersionDeltaCache (client.storeCache )
265
+ suspend fun computeDelta (versionHash : String , baseVersionHash : String? ): ObjectData {
263
266
if (versionHash == baseVersionHash) return ObjectData .empty
264
267
if (baseVersionHash == null ) {
265
268
// no need to cache anything if there is no delta computation happening
266
269
return allObjectDataAsFlow(versionHash)
267
270
}
268
271
269
- return runSynchronized(deltaCache) {
270
- val key = versionHash to baseVersionHash
271
- deltaCache.get(key)?.get() ? : lazy {
272
- // lazy { ... } allows to run the computation without locking deltaCache
273
- // SoftReference because deltas can be very large
274
- val version = CLVersion (versionHash, client.storeCache)
275
- val baseVersion = CLVersion (baseVersionHash, client.storeCache)
276
- val objectsMap = version.computeDelta(baseVersion)
277
- ObjectDataMap (objectsMap)
278
- }.also { deltaCache[key] = SoftReference (it) }
279
- }.value
272
+ return versionDeltaCache.getOrComputeDelta(versionHash, baseVersionHash)
280
273
}
281
274
282
275
private fun allObjectDataAsFlow (versionHash : String ): ObjectDataFlow {
283
276
val hashObjectFlow = channelFlow {
284
- val version = CLVersion (versionHash, objectStore)
285
- // Use a bulk query to make as few request to the underlying store as possible.
286
- val bulkQuery = objectStore.newBulkQuery()
287
- // It is unsatisfactory that we have to keep already emitted hashes in memory.
288
- // But without changing the underlying model,
289
- // we have to do this to not emit objects more than once.
290
- val seenHashes = mutableSetOf<String >()
291
- fun emitObjects (entry : KVEntryReference <* >) {
292
- if (seenHashes.contains(entry.getHash())) return
293
- seenHashes.add(entry.getHash())
294
- bulkQuery.get(entry).onSuccess {
295
- val value = checkNotNull(it) { " No value received for ${entry.getHash()} " }
296
- // Use `send` instead of `trySend`,
297
- // because `trySend` fails if the channel capacity is full.
298
- // This might happen if the data is produced faster than consumed.
299
- // A better solution would be to have bulk queries which itself are asynchronous
300
- // but doing that needs more consideration.
301
- runBlocking {
302
- // Maybe we should avoid Flow<Pair<String, String>> and use Flow<String>.
303
- // This needs profiling.
304
- channel.send(entry.getHash() to value.serialize())
305
- }
306
- for (referencedEntry in value.getReferencedEntries()) {
307
- emitObjects(referencedEntry)
277
+ // Our bulk query is blocking, therefor we explicitly launch it on one of the Dispatchers.IO.
278
+ // Without it, the consumer could accidentally start the flow on this thread and block it.
279
+ launch(Dispatchers .IO ) {
280
+ // Use a bulk query to make as few request to the underlying store as possible.
281
+ val bulkQuery = objectStore.newBulkQuery()
282
+ // It is unsatisfactory that we have to keep already emitted hashes in memory.
283
+ // But without changing the underlying model,
284
+ // we have to do this to not emit objects more than once.
285
+ val seenHashes = mutableSetOf<String >()
286
+ fun emitObjects (entry : KVEntryReference <* >) {
287
+ if (seenHashes.contains(entry.getHash())) return
288
+ seenHashes.add(entry.getHash())
289
+ bulkQuery.get(entry).onSuccess {
290
+ val value = checkNotNull(it) { " No value received for ${entry.getHash()} " }
291
+ // Use `send` instead of `trySend`,
292
+ // because `trySend` fails if the channel capacity is full.
293
+ // This might happen if the data is produced faster than consumed.
294
+ // A better solution would be to have bulk queries which itself are asynchronous
295
+ // but doing that needs more consideration.
296
+ runBlocking {
297
+ // Maybe we should avoid Flow<Pair<String, String>> and use Flow<String>.
298
+ // This needs profiling
299
+ channel.send(entry.getHash() to value.serialize())
300
+ }
301
+ for (referencedEntry in value.getReferencedEntries()) {
302
+ emitObjects(referencedEntry)
303
+ }
308
304
}
309
305
}
306
+ emitObjects(KVEntryReference (versionHash, CPVersion .DESERIALIZER ))
307
+ LOG .debug(" Starting to bulk query all objects." )
308
+ bulkQuery.process()
310
309
}
311
- emitObjects(KVEntryReference (versionHash, CPVersion .DESERIALIZER ))
312
- bulkQuery.process()
313
310
}
314
311
val checkedHashObjectFlow = hashObjectFlow.checkObjectHashes()
315
312
val objectData = ObjectDataFlow (checkedHashObjectFlow)
@@ -347,6 +344,7 @@ class RepositoriesManager(val client: LocalModelClient) {
347
344
}
348
345
349
346
companion object {
347
+ private val LOG = LoggerFactory .getLogger(RepositoriesManager ::class .java)
350
348
const val KEY_PREFIX = " :v2"
351
349
private const val REPOSITORIES_LIST_KEY = " $KEY_PREFIX :repositories"
352
350
const val LEGACY_SERVER_ID_KEY = " repositoryId"
@@ -382,3 +380,38 @@ class ObjectDataFlow(private val hashObjectFlow: Flow<Pair<String, String>>) : O
382
380
private fun Flow <Pair <String , String >>.checkObjectHashes (): Flow <Pair <String , String >> {
383
381
return onEach { HashUtil .checkObjectHash(it.first, it.second) }
384
382
}
383
+
384
+ class VersionDeltaCache (val store : IDeserializingKeyValueStore ) {
385
+
386
+ companion object {
387
+ private val LOG = LoggerFactory .getLogger(VersionDeltaCache ::class .java)
388
+ }
389
+
390
+ private val cacheMap = LRUMap <Pair <String , String ?>, SoftReference <Deferred <ObjectDataMap >>>(10 )
391
+
392
+ suspend fun getOrComputeDelta (versionHash : String , baseVersionHash : String ): ObjectDataMap {
393
+ val deferredDelta = synchronized(cacheMap) {
394
+ val key = versionHash to baseVersionHash
395
+ val existingDeferredDelta = cacheMap[key]?.get()
396
+ if (existingDeferredDelta != null ) {
397
+ LOG .debug(" Version delta found in cache for {}." , key)
398
+ existingDeferredDelta
399
+ } else {
400
+ LOG .debug(" Version delta not found in cache for {}." , key)
401
+ val version = CLVersion (versionHash, store)
402
+ val baseVersion = CLVersion (baseVersionHash, store)
403
+ val newDeferredDelta = runBlocking(Dispatchers .IO ) {
404
+ async {
405
+ LOG .debug(" Computing for delta for {}." , key)
406
+ val result = ObjectDataMap (version.computeDelta(baseVersion))
407
+ LOG .debug(" Computed version delta for {}." , key)
408
+ result
409
+ }
410
+ }
411
+ cacheMap[key] = SoftReference (newDeferredDelta)
412
+ newDeferredDelta
413
+ }
414
+ }
415
+ return deferredDelta.await()
416
+ }
417
+ }
0 commit comments