Skip to content

Commit 45f7d53

Browse files
author
Oleksandr Dzhychko
authored
Merge pull request #563 from modelix/fix/compute-delta-outside-request-thread
fix(model-server): do not execute long-running repository operation on request threads
2 parents d560134 + 79af748 commit 45f7d53

File tree

7 files changed

+202
-75
lines changed

7 files changed

+202
-75
lines changed

model-server/src/main/kotlin/org/modelix/model/server/handlers/HistoryHandler.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
6868
val params = call.request.queryParameters
6969
val limit = toInt(params["limit"], 500)
7070
val skip = toInt(params["skip"], 0)
71+
val latestVersion = repositoriesManager.getVersion(branch)
72+
checkNotNull(latestVersion) { "Branch not found: $branch" }
7173
call.respondHtmlTemplate(PageWithMenuBar("repos/", "../../..")) {
7274
headContent {
7375
style {
@@ -80,7 +82,7 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
8082
repositoryPageStyle()
8183
}
8284
bodyContent {
83-
buildRepositoryPage(branch, params["head"], skip, limit)
85+
buildRepositoryPage(branch, latestVersion, params["head"], skip, limit)
8486
}
8587
}
8688
}
@@ -105,7 +107,7 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
105107
}
106108
}
107109

108-
fun revert(repositoryAndBranch: BranchReference, from: String?, to: String?, author: String?) {
110+
suspend fun revert(repositoryAndBranch: BranchReference, from: String?, to: String?, author: String?) {
109111
val version = repositoriesManager.getVersion(repositoryAndBranch) ?: throw RuntimeException("Branch doesn't exist: $repositoryAndBranch")
110112
val branch = OTBranch(PBranch(version.tree, client.idGenerator), client.idGenerator, client.storeCache!!)
111113
branch.runWriteT { t ->
@@ -160,8 +162,13 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
160162
}
161163
}
162164

163-
private fun FlowContent.buildRepositoryPage(repositoryAndBranch: BranchReference, headHash: String?, skip: Int, limit: Int) {
164-
val latestVersion = repositoriesManager.getVersion(repositoryAndBranch) ?: throw RuntimeException("Branch not found: $repositoryAndBranch")
165+
private fun FlowContent.buildRepositoryPage(
166+
repositoryAndBranch: BranchReference,
167+
latestVersion: CLVersion,
168+
headHash: String?,
169+
skip: Int,
170+
limit: Int,
171+
) {
165172
val headVersion = if (headHash == null || headHash.length == 0) latestVersion else CLVersion(headHash, client.storeCache!!)
166173
var rowIndex = 0
167174
h1 {

model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import io.ktor.server.resources.put
2727
import io.ktor.server.response.respondText
2828
import io.ktor.server.routing.routing
2929
import io.ktor.util.pipeline.PipelineContext
30+
import kotlinx.coroutines.runBlocking
3031
import kotlinx.html.br
3132
import kotlinx.html.div
3233
import kotlinx.html.h1
@@ -48,12 +49,12 @@ import org.modelix.model.lazy.RepositoryId
4849
import org.modelix.model.persistent.HashUtil
4950
import org.modelix.model.server.store.IStoreClient
5051
import org.modelix.model.server.store.pollEntry
52+
import org.modelix.model.server.store.runTransactionSuspendable
5153
import org.modelix.model.server.templates.PageWithMenuBar
5254
import org.slf4j.LoggerFactory
5355
import java.io.IOException
5456
import java.util.*
5557
import java.util.regex.Pattern
56-
import kotlin.collections.LinkedHashMap
5758

5859
val PERMISSION_MODEL_SERVER = "model-server".asResource()
5960
val MODEL_SERVER_ENTRY = KeycloakResourceType("model-server-entry", KeycloakScope.READ_WRITE_DELETE)
@@ -85,7 +86,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
8586
// request to initialize it lazily, would make the code less robust.
8687
// Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need
8788
// the special conditions in the affected requests to be updated.
88-
repositoriesManager.maybeInitAndGetSeverId()
89+
runBlocking { repositoriesManager.maybeInitAndGetSeverId() }
8990
application.apply {
9091
modelServerModule()
9192
}
@@ -111,7 +112,11 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
111112
if (isHealthy()) {
112113
call.respondText(text = "healthy", contentType = ContentType.Text.Plain, status = HttpStatusCode.OK)
113114
} else {
114-
call.respondText(text = "not healthy", contentType = ContentType.Text.Plain, status = HttpStatusCode.InternalServerError)
115+
call.respondText(
116+
text = "not healthy",
117+
contentType = ContentType.Text.Plain,
118+
status = HttpStatusCode.InternalServerError,
119+
)
115120
}
116121
}
117122
get<Paths.getHeaders> {
@@ -283,7 +288,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
283288
return result
284289
}
285290

286-
protected fun CallContext.putEntries(newEntries: Map<String, String?>) {
291+
protected suspend fun CallContext.putEntries(newEntries: Map<String, String?>) {
287292
val referencedKeys: MutableSet<String> = HashSet()
288293
for ((key, value) in newEntries) {
289294
checkKeyPermission(key, EPermissionType.WRITE)
@@ -316,18 +321,23 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
316321
HashUtil.isSha256(key) -> {
317322
hashedObjects[key] = value ?: throw IllegalArgumentException("No value provided for $key")
318323
}
324+
319325
BranchReference.tryParseBranch(key) != null -> {
320326
branchChanges[BranchReference.tryParseBranch(key)!!] = value
321327
}
328+
322329
key.startsWith(PROTECTED_PREFIX) -> {
323330
throw NoPermissionException("Access to keys starting with '$PROTECTED_PREFIX' is only permitted to the model server itself.")
324331
}
332+
325333
key.startsWith(RepositoriesManager.KEY_PREFIX) -> {
326334
throw NoPermissionException("Access to keys starting with '${RepositoriesManager.KEY_PREFIX}' is only permitted to the model server itself.")
327335
}
336+
328337
key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2 -> {
329338
throw NoPermissionException("'$key' is read-only.")
330339
}
340+
331341
else -> {
332342
userDefinedEntries[key] = value
333343
}
@@ -336,14 +346,14 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
336346

337347
HashUtil.checkObjectHashes(hashedObjects)
338348

339-
repositoriesManager.client.store.runTransaction {
349+
repositoriesManager.client.store.runTransactionSuspendable {
340350
storeClient.putAll(hashedObjects)
341351
storeClient.putAll(userDefinedEntries)
342352
for ((branch, value) in branchChanges) {
343353
if (value == null) {
344-
repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName))
354+
repositoriesManager.removeBranchesBlocking(branch.repositoryId, setOf(branch.branchName))
345355
} else {
346-
repositoriesManager.mergeChanges(branch, value)
356+
repositoriesManager.mergeChangesBlocking(branch, value)
347357
}
348358
}
349359
}
@@ -365,7 +375,10 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
365375
if (key.startsWith(RepositoriesManager.KEY_PREFIX)) {
366376
throw NoPermissionException("Access to keys starting with '${RepositoriesManager.KEY_PREFIX}' is only permitted to the model server itself.")
367377
}
368-
if ((key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2) && type.includes(EPermissionType.WRITE)) {
378+
if ((key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2) && type.includes(
379+
EPermissionType.WRITE,
380+
)
381+
) {
369382
throw NoPermissionException("'$key' is read-only.")
370383
}
371384
if (HashUtil.isSha256(key)) {

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import kotlinx.coroutines.Dispatchers
3939
import kotlinx.coroutines.Job
4040
import kotlinx.coroutines.flow.Flow
4141
import kotlinx.coroutines.flow.flow
42-
import kotlinx.coroutines.flow.onEach
4342
import kotlinx.coroutines.flow.onEmpty
4443
import kotlinx.coroutines.flow.withIndex
4544
import kotlinx.coroutines.withContext
@@ -61,7 +60,6 @@ import org.modelix.model.operations.OTBranch
6160
import org.modelix.model.persistent.HashUtil
6261
import org.modelix.model.server.api.v2.VersionDelta
6362
import org.modelix.model.server.api.v2.VersionDeltaStream
64-
import org.modelix.model.server.api.v2.toMap
6563
import org.modelix.model.server.store.IStoreClient
6664
import org.modelix.model.server.store.LocalModelClient
6765
import org.modelix.modelql.server.ModelQLServer
@@ -231,9 +229,8 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
231229
val delta = VersionDelta(
232230
newVersionHash,
233231
lastVersionHash,
234-
objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).toMap(),
232+
objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).asMap(),
235233
)
236-
delta.checkObjectHashes()
237234
send(Json.encodeToString(delta))
238235
lastVersionHash = newVersionHash
239236
}
@@ -370,16 +367,14 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
370367
val delta = VersionDelta(
371368
versionHash,
372369
baseVersionHash,
373-
objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).toMap(),
370+
objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).asMap(),
374371
)
375-
delta.checkObjectHashes()
376372
respond(delta)
377373
}
378374

379375
private suspend fun ApplicationCall.respondDeltaAsObjectStream(versionHash: String, baseVersionHash: String?, plainText: Boolean) {
380376
respondTextWriter(contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE) {
381-
repositoriesManager.computeDelta(versionHash, baseVersionHash)
382-
.checkObjectHashes()
377+
repositoriesManager.computeDelta(versionHash, baseVersionHash).asFlow()
383378
.flatten()
384379
.withSeparator("\n")
385380
.onEmpty { emit(versionHash) }
@@ -410,7 +405,3 @@ private fun Flow<String>.withSeparator(separator: String) = flow {
410405
emit(it)
411406
}
412407
}
413-
414-
private fun <V : String?> Flow<Pair<String, V>>.checkObjectHashes(): Flow<Pair<String, V>> {
415-
return onEach { HashUtil.checkObjectHash(it.first, it.second) }
416-
}

0 commit comments

Comments
 (0)