13
13
*/
14
14
package org.modelix.model.server.handlers
15
15
16
+ import kotlinx.coroutines.flow.Flow
17
+ import kotlinx.coroutines.flow.asFlow
18
+ import kotlinx.coroutines.flow.emptyFlow
19
+ import kotlinx.coroutines.flow.flow
20
+ import kotlinx.coroutines.flow.map
16
21
import kotlinx.datetime.Clock
17
22
import org.apache.commons.collections4.map.LRUMap
18
23
import org.modelix.model.VersionMerger
@@ -25,12 +30,14 @@ import org.modelix.model.api.runSynchronized
25
30
import org.modelix.model.lazy.BranchReference
26
31
import org.modelix.model.lazy.CLTree
27
32
import org.modelix.model.lazy.CLVersion
33
+ import org.modelix.model.lazy.KVEntryReference
28
34
import org.modelix.model.lazy.RepositoryId
29
35
import org.modelix.model.lazy.computeDelta
30
36
import org.modelix.model.metameta.MetaModelBranch
31
37
import org.modelix.model.server.store.IStoreClient
32
38
import org.modelix.model.server.store.LocalModelClient
33
39
import org.modelix.model.server.store.pollEntry
40
+ import java.lang.ref.SoftReference
34
41
import java.util.UUID
35
42
36
43
class RepositoriesManager (val client : LocalModelClient ) {
@@ -200,18 +207,35 @@ class RepositoriesManager(val client: LocalModelClient) {
200
207
? : throw IllegalStateException (" No version found for branch '${branch.branchName} ' in repository '${branch.repositoryId} '" )
201
208
}
202
209
203
- private val deltaCache = LRUMap <Pair <String , String ?>, Lazy <Map <String , String >>>(10 )
204
- fun computeDelta (versionHash : String , baseVersionHash : String? ): Map <String , String > {
205
- return runSynchronized(deltaCache) {
206
- deltaCache.getOrPut(versionHash to baseVersionHash) {
207
- // lazy { ... } allows to run the computation without locking deltaCache
208
- lazy {
209
- val version = CLVersion (versionHash, client.storeCache)
210
- val baseVersion = baseVersionHash?.let { CLVersion (it, client.storeCache) }
211
- version.computeDelta(baseVersion)
210
+ private val deltaCache = LRUMap <Pair <String , String ?>, SoftReference <Lazy <Map <String , String >>>>(10 )
211
+ fun computeDelta (versionHash : String , baseVersionHash : String? ): Flow <Pair <String , String >> {
212
+ if (versionHash == baseVersionHash) return emptyFlow()
213
+ if (baseVersionHash == null ) {
214
+ // no need to cache anything if there is no delta computation happening
215
+ return flow {
216
+ suspend fun emitObjects (entry : KVEntryReference <* >) {
217
+ emit(entry.getHash() to client.get(entry.getHash())!! )
218
+ for (referencedEntry in entry.getValue(client.storeCache).getReferencedEntries()) {
219
+ emitObjects(referencedEntry)
220
+ }
212
221
}
222
+
223
+ emit(versionHash to client.get(versionHash)!! )
224
+ val version = CLVersion (versionHash, client.storeCache)
225
+ emitObjects(version.treeHash!! )
213
226
}
214
- }.value
227
+ }
228
+
229
+ return runSynchronized(deltaCache) {
230
+ val key = versionHash to baseVersionHash
231
+ deltaCache.get(key)?.get() ? : lazy {
232
+ // lazy { ... } allows to run the computation without locking deltaCache
233
+ // SoftReference because deltas can be very large
234
+ val version = CLVersion (versionHash, client.storeCache)
235
+ val baseVersion = CLVersion (baseVersionHash, client.storeCache)
236
+ version.computeDelta(baseVersion)
237
+ }.also { deltaCache[key] = SoftReference (it) }
238
+ }.value.entries.asFlow().map { it.toPair() }
215
239
}
216
240
217
241
private fun branchKey (branch : BranchReference ): String {
0 commit comments