Skip to content

Commit dc5c5c1

Browse files
authored
Merge pull request #558 from modelix/MODELIX-801-2
MODELIX-801 Parallel queries to different branches seem to create a deadlock
2 parents e942564 + 03614e9 commit dc5c5c1

File tree

5 files changed

+52
-30
lines changed

5 files changed

+52
-30
lines changed

model-datastructure/src/jvmMain/kotlin/org/modelix/model/InMemoryModel.kt

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ package org.modelix.model
1818

1919
import gnu.trove.map.TLongObjectMap
2020
import gnu.trove.map.hash.TLongObjectHashMap
21+
import kotlinx.coroutines.CompletableDeferred
2122
import kotlinx.coroutines.CoroutineScope
23+
import kotlinx.coroutines.Deferred
2224
import kotlinx.coroutines.Dispatchers
23-
import kotlinx.coroutines.Job
24-
import kotlinx.coroutines.launch
25+
import kotlinx.coroutines.async
26+
import kotlinx.coroutines.cancel
2527
import org.modelix.model.api.ConceptReference
2628
import org.modelix.model.api.IBranch
2729
import org.modelix.model.api.IConcept
@@ -42,52 +44,66 @@ import org.modelix.model.lazy.NonCachingObjectStore
4244
import org.modelix.model.persistent.CPHamtNode
4345
import org.modelix.model.persistent.CPNode
4446
import org.modelix.model.persistent.CPNodeRef
47+
import java.util.Collections
4548
import kotlin.system.measureTimeMillis
4649
import kotlin.time.Duration.Companion.milliseconds
4750
import kotlin.time.DurationUnit
4851

4952
private val LOG = mu.KotlinLogging.logger { }
5053

51-
class InMemoryModelLoader(val model: IncrementalInMemoryModel) {
52-
private val coroutineScope = CoroutineScope(Dispatchers.IO)
53-
private var modelLoadingJob: Job? = null
54-
55-
/**
56-
* Should be called repeatedly by a readiness probe until it returns true.
57-
*
58-
* @return true if the model is done loading
59-
*/
60-
@Synchronized
61-
fun loadModelAsync(tree: CLTree): Boolean {
62-
if (model.getLoadedModel()?.loadedMapRef?.getHash() == tree.nodesMap!!.hash) return true
63-
if (modelLoadingJob?.isActive != true) {
64-
modelLoadingJob = coroutineScope.launch {
65-
try {
66-
model.getModel(tree)
67-
} catch (ex: Throwable) {
68-
LOG.error(ex) { "Failed loading model ${tree.hash}" }
54+
class InMemoryModelLoader(val incrementalModel: IncrementalInMemoryModel, val coroutineScope: CoroutineScope) {
55+
private val treeHash2modelLoadJob = Collections.synchronizedMap(HashMap<String, Deferred<InMemoryModel>>())
56+
57+
fun getModel(tree: CLTree): Deferred<InMemoryModel> {
58+
val loadedModel = incrementalModel.getLoadedModel()
59+
if (loadedModel != null && loadedModel.loadedMapRef.getHash() == tree.nodesMap?.hash) return CompletableDeferred(loadedModel)
60+
61+
return synchronized(treeHash2modelLoadJob) {
62+
val activeJobs = treeHash2modelLoadJob.values.toList()
63+
val loadJob = treeHash2modelLoadJob.getOrPut(tree.hash) {
64+
coroutineScope.async {
65+
// There should only be one active loading job, because we want to reuse as much data as possible
66+
// from a previously loaded model, so we have to wait for its completion.
67+
// This also limits the number of thread used from the IO dispatcher.
68+
activeJobs.forEach { it.join() }
69+
70+
// This is a long-running method that should be executed only once for a new tree version.
71+
// It's executed on the IO dispatcher, because it's not a suspendable function and blocks
72+
// the thread.
73+
incrementalModel.getModel(tree)
6974
}
7075
}
76+
77+
// cleanup finished jobs
78+
treeHash2modelLoadJob -= treeHash2modelLoadJob.entries.filter { !it.value.isActive }.map { it.key }.toSet()
79+
80+
loadJob
7181
}
72-
return false
7382
}
7483
}
7584

7685
class InMemoryModels {
77-
private val models = HashMap<String, InMemoryModelLoader>()
86+
private val coroutineScope = CoroutineScope(Dispatchers.IO)
87+
private val branchId2modelLoader = Collections.synchronizedMap(HashMap<String, InMemoryModelLoader>())
7888

79-
@Synchronized
80-
fun getModel(id: String) = models.getOrPut(id) { InMemoryModelLoader(IncrementalInMemoryModel()) }
89+
fun dispose() {
90+
coroutineScope.cancel("disposed")
91+
}
8192

82-
fun getModel(tree: CLTree) = getModel(tree.getId()).model.getModel(tree)
93+
private fun getModelLoader(branchId: String): InMemoryModelLoader {
94+
return synchronized(branchId2modelLoader) {
95+
branchId2modelLoader.getOrPut(branchId) { InMemoryModelLoader(IncrementalInMemoryModel(), coroutineScope) }
96+
}
97+
}
8398

84-
fun loadModelAsync(tree: CLTree) = getModel(tree.getId()).loadModelAsync(tree)
99+
fun getModel(tree: CLTree): Deferred<InMemoryModel> {
100+
return getModelLoader(tree.getId()).getModel(tree)
101+
}
85102
}
86103

87104
class IncrementalInMemoryModel {
88105
private var lastModel: InMemoryModel? = null
89106

90-
@Synchronized
91107
fun getModel(tree: CLTree): InMemoryModel {
92108
val reusable = lastModel?.takeIf { it.branchId == tree.getId() }
93109
val newModel = if (reusable == null) {

model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
9999
?.getBranchReference(System.getenv("MODELIX_SERVER_MODELQL_WARMUP_BRANCH"))
100100
if (branchRef != null) {
101101
val version = repositoriesManager.getVersion(branchRef)
102-
if (!repositoriesManager.inMemoryModels.loadModelAsync(version!!.getTree())) {
102+
if (repositoriesManager.inMemoryModels.getModel(version!!.getTree()).isActive) {
103103
call.respondText(
104104
status = HttpStatusCode.ServiceUnavailable,
105105
text = "Waiting for version $version to be loaded into memory",

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
280280
if (writeAccess) {
281281
branch.getRootNode() to branch.getArea()
282282
} else {
283-
val model = repositoriesManager.inMemoryModels.getModel(initialTree)
283+
val model = repositoriesManager.inMemoryModels.getModel(initialTree).await()
284284
model.getNode(ITree.ROOT_ID) to model.getArea()
285285
}
286286
}, {

model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ class RepositoriesManager(val client: LocalModelClient) {
5656
private val objectStore: IDeserializingKeyValueStore get() = client.storeCache
5757
val inMemoryModels = InMemoryModels()
5858

59+
fun dispose() {
60+
// TODO find instance creations and add a dispose() call if needed. Whoever creates an instance is responsible
61+
// for its lifecycle.
62+
inMemoryModels.dispose()
63+
}
64+
5965
fun generateClientId(repositoryId: RepositoryId): Long {
6066
return client.store.generateId("$KEY_PREFIX:${repositoryId.id}:clientId")
6167
}

modelql-server/src/main/kotlin/org/modelix/modelql/server/ModelQLServer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class ModelQLServer private constructor(val rootNodeProvider: () -> INode?, val
8080
handleCall(call, { rootNode to area }, {})
8181
}
8282

83-
suspend fun handleCall(call: ApplicationCall, input: (write: Boolean) -> Pair<INode, IArea>, afterQueryExecution: () -> Unit = {}) {
83+
suspend fun handleCall(call: ApplicationCall, input: suspend (write: Boolean) -> Pair<INode, IArea>, afterQueryExecution: () -> Unit = {}) {
8484
try {
8585
val serializedQuery = call.receiveText()
8686
val json = UntypedModelQL.json

0 commit comments

Comments
 (0)