Skip to content

Commit 03614e9

Browse files
committed
fix(model-server): InMemoryModel was blocking the ktor request handler thread pool
The model was loaded by a synchronized method and all HTTP requests were blocking the thread while waiting for the result. Since the default pool is constrained in size it was running out of thread for handling other request such as the /health endpoint. Replaced the synchronized method by coroutine features that don't block the threads.
1 parent 9a49a08 commit 03614e9

File tree

5 files changed

+52
-30
lines changed

5 files changed

+52
-30
lines changed

model-datastructure/src/jvmMain/kotlin/org/modelix/model/InMemoryModel.kt

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ package org.modelix.model
1818

1919
import gnu.trove.map.TLongObjectMap
2020
import gnu.trove.map.hash.TLongObjectHashMap
21+
import kotlinx.coroutines.CompletableDeferred
2122
import kotlinx.coroutines.CoroutineScope
23+
import kotlinx.coroutines.Deferred
2224
import kotlinx.coroutines.Dispatchers
23-
import kotlinx.coroutines.Job
24-
import kotlinx.coroutines.launch
25+
import kotlinx.coroutines.async
26+
import kotlinx.coroutines.cancel
2527
import org.modelix.model.api.ConceptReference
2628
import org.modelix.model.api.IBranch
2729
import org.modelix.model.api.IConcept
@@ -42,52 +44,66 @@ import org.modelix.model.lazy.NonCachingObjectStore
4244
import org.modelix.model.persistent.CPHamtNode
4345
import org.modelix.model.persistent.CPNode
4446
import org.modelix.model.persistent.CPNodeRef
47+
import java.util.Collections
4548
import kotlin.system.measureTimeMillis
4649
import kotlin.time.Duration.Companion.milliseconds
4750
import kotlin.time.DurationUnit
4851

4952
private val LOG = mu.KotlinLogging.logger { }
5053

51-
class InMemoryModelLoader(val model: IncrementalInMemoryModel) {
52-
private val coroutineScope = CoroutineScope(Dispatchers.IO)
53-
private var modelLoadingJob: Job? = null
54-
55-
/**
56-
* Should be called repeatedly by a readiness probe until it returns true.
57-
*
58-
* @return true if the model is done loading
59-
*/
60-
@Synchronized
61-
fun loadModelAsync(tree: CLTree): Boolean {
62-
if (model.getLoadedModel()?.loadedMapRef?.getHash() == tree.nodesMap!!.hash) return true
63-
if (modelLoadingJob?.isActive != true) {
64-
modelLoadingJob = coroutineScope.launch {
65-
try {
66-
model.getModel(tree)
67-
} catch (ex: Throwable) {
68-
LOG.error(ex) { "Failed loading model ${tree.hash}" }
54+
class InMemoryModelLoader(val incrementalModel: IncrementalInMemoryModel, val coroutineScope: CoroutineScope) {
55+
private val treeHash2modelLoadJob = Collections.synchronizedMap(HashMap<String, Deferred<InMemoryModel>>())
56+
57+
fun getModel(tree: CLTree): Deferred<InMemoryModel> {
58+
val loadedModel = incrementalModel.getLoadedModel()
59+
if (loadedModel != null && loadedModel.loadedMapRef.getHash() == tree.nodesMap?.hash) return CompletableDeferred(loadedModel)
60+
61+
return synchronized(treeHash2modelLoadJob) {
62+
val activeJobs = treeHash2modelLoadJob.values.toList()
63+
val loadJob = treeHash2modelLoadJob.getOrPut(tree.hash) {
64+
coroutineScope.async {
65+
// There should only be one active loading job, because we want to reuse as much data as possible
66+
// from a previously loaded model, so we have to wait for its completion.
67+
// This also limits the number of thread used from the IO dispatcher.
68+
activeJobs.forEach { it.join() }
69+
70+
// This is a long-running method that should be executed only once for a new tree version.
71+
// It's executed on the IO dispatcher, because it's not a suspendable function and blocks
72+
// the thread.
73+
incrementalModel.getModel(tree)
6974
}
7075
}
76+
77+
// cleanup finished jobs
78+
treeHash2modelLoadJob -= treeHash2modelLoadJob.entries.filter { !it.value.isActive }.map { it.key }.toSet()
79+
80+
loadJob
7181
}
72-
return false
7382
}
7483
}
7584

7685
class InMemoryModels {
77-
private val models = HashMap<String, InMemoryModelLoader>()
86+
private val coroutineScope = CoroutineScope(Dispatchers.IO)
87+
private val branchId2modelLoader = Collections.synchronizedMap(HashMap<String, InMemoryModelLoader>())
7888

79-
@Synchronized
80-
fun getModel(id: String) = models.getOrPut(id) { InMemoryModelLoader(IncrementalInMemoryModel()) }
89+
fun dispose() {
90+
coroutineScope.cancel("disposed")
91+
}
8192

82-
fun getModel(tree: CLTree) = getModel(tree.getId()).model.getModel(tree)
93+
private fun getModelLoader(branchId: String): InMemoryModelLoader {
94+
return synchronized(branchId2modelLoader) {
95+
branchId2modelLoader.getOrPut(branchId) { InMemoryModelLoader(IncrementalInMemoryModel(), coroutineScope) }
96+
}
97+
}
8398

84-
fun loadModelAsync(tree: CLTree) = getModel(tree.getId()).loadModelAsync(tree)
99+
fun getModel(tree: CLTree): Deferred<InMemoryModel> {
100+
return getModelLoader(tree.getId()).getModel(tree)
101+
}
85102
}
86103

87104
class IncrementalInMemoryModel {
88105
private var lastModel: InMemoryModel? = null
89106

90-
@Synchronized
91107
fun getModel(tree: CLTree): InMemoryModel {
92108
val reusable = lastModel?.takeIf { it.branchId == tree.getId() }
93109
val newModel = if (reusable == null) {

model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
9999
?.getBranchReference(System.getenv("MODELIX_SERVER_MODELQL_WARMUP_BRANCH"))
100100
if (branchRef != null) {
101101
val version = repositoriesManager.getVersion(branchRef)
102-
if (!repositoriesManager.inMemoryModels.loadModelAsync(version!!.getTree())) {
102+
if (repositoriesManager.inMemoryModels.getModel(version!!.getTree()).isActive) {
103103
call.respondText(
104104
status = HttpStatusCode.ServiceUnavailable,
105105
text = "Waiting for version $version to be loaded into memory",

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
280280
if (writeAccess) {
281281
branch.getRootNode() to branch.getArea()
282282
} else {
283-
val model = repositoriesManager.inMemoryModels.getModel(initialTree)
283+
val model = repositoriesManager.inMemoryModels.getModel(initialTree).await()
284284
model.getNode(ITree.ROOT_ID) to model.getArea()
285285
}
286286
}, {

model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ class RepositoriesManager(val client: LocalModelClient) {
5656
private val objectStore: IDeserializingKeyValueStore get() = client.storeCache
5757
val inMemoryModels = InMemoryModels()
5858

59+
fun dispose() {
60+
// TODO find instance creations and add a dispose() call if needed. Whoever creates an instance is responsible
61+
// for its lifecycle.
62+
inMemoryModels.dispose()
63+
}
64+
5965
fun generateClientId(repositoryId: RepositoryId): Long {
6066
return client.store.generateId("$KEY_PREFIX:${repositoryId.id}:clientId")
6167
}

modelql-server/src/main/kotlin/org/modelix/modelql/server/ModelQLServer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class ModelQLServer private constructor(val rootNodeProvider: () -> INode?, val
8080
handleCall(call, { rootNode to area }, {})
8181
}
8282

83-
suspend fun handleCall(call: ApplicationCall, input: (write: Boolean) -> Pair<INode, IArea>, afterQueryExecution: () -> Unit = {}) {
83+
suspend fun handleCall(call: ApplicationCall, input: suspend (write: Boolean) -> Pair<INode, IArea>, afterQueryExecution: () -> Unit = {}) {
8484
try {
8585
val serializedQuery = call.receiveText()
8686
val json = UntypedModelQL.json

0 commit comments

Comments
 (0)