Skip to content

Commit 99ca38b

Browse files
author
Oleksandr Dzhychko
authored
Merge pull request #534 from modelix/fix/send-all-objects-when-channel-becomes-full
fix(model-server): RepositoriesManager.computeDelta should wait for channel to have capacity
2 parents 4d83d2d + 4316835 commit 99ca38b

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import kotlinx.coroutines.flow.asFlow
1818
import kotlinx.coroutines.flow.channelFlow
1919
import kotlinx.coroutines.flow.emptyFlow
2020
import kotlinx.coroutines.flow.map
21+
import kotlinx.coroutines.runBlocking
2122
import kotlinx.datetime.Clock
2223
import org.apache.commons.collections4.map.LRUMap
2324
import org.modelix.model.IKeyValueStore
@@ -30,7 +31,6 @@ import org.modelix.model.api.IdGeneratorDummy
3031
import org.modelix.model.api.PBranch
3132
import org.modelix.model.api.runSynchronized
3233
import org.modelix.model.lazy.BranchReference
33-
import org.modelix.model.lazy.BulkQuery
3434
import org.modelix.model.lazy.CLTree
3535
import org.modelix.model.lazy.CLVersion
3636
import org.modelix.model.lazy.IDeserializingKeyValueStore
@@ -46,6 +46,7 @@ import java.lang.ref.SoftReference
4646
import java.util.UUID
4747

4848
class RepositoriesManager(val client: LocalModelClient) {
49+
4950
init {
5051
migrateLegacyRepositoriesList()
5152
}
@@ -246,19 +247,23 @@ class RepositoriesManager(val client: LocalModelClient) {
246247
// we have to do this to not emit objects more than once.
247248
val seenHashes = mutableSetOf<String>()
248249
fun emitObjects(entry: KVEntryReference<*>) {
250+
if (seenHashes.contains(entry.getHash())) return
251+
seenHashes.add(entry.getHash())
249252
bulkQuery.get(entry).onSuccess {
250-
channel.trySend(entry.getHash() to it!!.serialize())
251-
for (referencedEntry in it.getReferencedEntries()) {
252-
val wasSeenBefore = !seenHashes.add(referencedEntry.getHash())
253-
// Do not emit the object if we already emitted it.
254-
if (!wasSeenBefore) {
255-
emitObjects(referencedEntry)
256-
}
253+
val value = checkNotNull(it) { "No value received for ${entry.getHash()}" }
254+
// Use `send` instead of `trySend`,
255+
// because `trySend` fails if the channel capacity is full.
256+
// This might happen if the data is produced faster than consumed.
257+
// A better solution would be to have bulk queries which itself are asynchronous
258+
// but doing that needs more consideration.
259+
runBlocking { channel.send(entry.getHash() to value.serialize()) }
260+
for (referencedEntry in value.getReferencedEntries()) {
261+
emitObjects(referencedEntry)
257262
}
258263
}
259264
}
260265
emitObjects(KVEntryReference(versionHash, CPVersion.DESERIALIZER))
261-
(bulkQuery as? BulkQuery)?.process()
266+
bulkQuery.process()
262267
}
263268
}
264269

model-server/src/test/kotlin/org/modelix/model/server/ModelClientV2Test.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,10 @@ class ModelClientV2Test {
196196
val repositoryId = RepositoryId("repo1")
197197
val branchId = repositoryId.getBranchReference("my-branch")
198198
modelClientForArrange.runWrite(branchId) { root ->
199-
root.addNewChild("aChild", -1, null as IConceptReference?)
199+
// Creating many children makes the flow emitting many values at once.
200+
repeat(100) {
201+
root.addNewChild("aChild", -1, null as IConceptReference?)
202+
}
200203
}
201204

202205
// Act

0 commit comments

Comments
 (0)