Skip to content

Commit 2670d49

Browse files
committed
fix(model-server): deadlock caused by non-existing lock ordering
1 parent 9d36c70 commit 2670d49

27 files changed

+1036
-392
lines changed

model-server/src/main/kotlin/org/modelix/model/server/handlers/HealthApiImpl.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ class HealthApiImpl(
2525

2626
private fun isHealthy(): Boolean {
2727
val store = stores.getGlobalStoreClient()
28-
val value = toLong(store[HEALTH_KEY]) + 1
29-
store.put(HEALTH_KEY, java.lang.Long.toString(value))
30-
return toLong(store[HEALTH_KEY]) >= value
28+
return store.getTransactionManager().runWrite {
29+
val value = toLong(store[HEALTH_KEY]) + 1
30+
store.put(HEALTH_KEY, java.lang.Long.toString(value))
31+
toLong(store[HEALTH_KEY]) >= value
32+
}
3133
}
3234

3335
private fun toLong(value: String?): Long {

model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import org.modelix.model.lazy.CLVersion
66
import org.modelix.model.lazy.IDeserializingKeyValueStore
77
import org.modelix.model.lazy.RepositoryId
88
import org.modelix.model.server.store.IStoreClient
9+
import org.modelix.model.server.store.ITransactionManager
910
import org.modelix.model.server.store.StoreManager
1011

1112
interface IRepositoriesManager {
@@ -18,25 +19,23 @@ interface IRepositoriesManager {
1819
* If the server ID was created previously but is only stored under a legacy database key,
1920
* it also gets stored under the current and all legacy database keys.
2021
*/
21-
suspend fun maybeInitAndGetSeverId(): String
22+
fun maybeInitAndGetSeverId(): String
2223
fun getRepositories(): Set<RepositoryId>
23-
suspend fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true, legacyGlobalStorage: Boolean = false): CLVersion
24-
suspend fun removeRepository(repository: RepositoryId): Boolean
24+
fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true, legacyGlobalStorage: Boolean = false): CLVersion
25+
fun removeRepository(repository: RepositoryId): Boolean
2526

2627
fun getBranches(repositoryId: RepositoryId): Set<BranchReference>
2728

28-
suspend fun removeBranches(repository: RepositoryId, branchNames: Set<String>)
29-
3029
/**
3130
* Same as [removeBranches] but blocking.
3231
* Caller is expected to execute it outside the request thread.
3332
*/
34-
fun removeBranchesBlocking(repository: RepositoryId, branchNames: Set<String>)
35-
suspend fun getVersion(branch: BranchReference): CLVersion?
36-
suspend fun getVersion(repository: RepositoryId, versionHash: String): CLVersion?
37-
suspend fun getVersionHash(branch: BranchReference): String?
33+
fun removeBranches(repository: RepositoryId, branchNames: Set<String>)
34+
fun getVersion(branch: BranchReference): CLVersion?
35+
fun getVersion(repository: RepositoryId, versionHash: String): CLVersion?
36+
fun getVersionHash(branch: BranchReference): String?
3837
suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String
39-
suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String
38+
fun mergeChanges(branch: BranchReference, newVersionHash: String): String
4039

4140
/**
4241
* Same as [mergeChanges] but blocking.
@@ -54,14 +53,15 @@ interface IRepositoriesManager {
5453
fun isIsolated(repository: RepositoryId): Boolean?
5554

5655
fun getStoreManager(): StoreManager
56+
fun getTransactionManager(): ITransactionManager
5757
}
5858

5959
fun IRepositoriesManager.getBranchNames(repositoryId: RepositoryId): Set<String> {
6060
return getBranches(repositoryId).map { it.branchName }.toSet()
6161
}
6262

63-
fun IRepositoriesManager.getStoreClient(repository: RepositoryId?): IStoreClient {
64-
return getStoreManager().getStoreClient(repository?.takeIf { isIsolated(it) ?: false })
63+
fun IRepositoriesManager.getStoreClient(repository: RepositoryId?, immutable: Boolean): IStoreClient {
64+
return getStoreManager().getStoreClient(repository?.takeIf { isIsolated(it) ?: false }, immutable)
6565
}
6666

6767
fun IRepositoriesManager.getAsyncStore(repository: RepositoryId?): IAsyncObjectStore {

model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import io.ktor.server.resources.put
1313
import io.ktor.server.response.respondText
1414
import io.ktor.server.routing.routing
1515
import io.ktor.util.pipeline.PipelineContext
16-
import kotlinx.coroutines.runBlocking
1716
import kotlinx.html.br
1817
import kotlinx.html.div
1918
import kotlinx.html.h1
@@ -31,7 +30,8 @@ import org.modelix.model.server.ModelServerPermissionSchema
3130
import org.modelix.model.server.store.ObjectInRepository
3231
import org.modelix.model.server.store.StoreManager
3332
import org.modelix.model.server.store.pollEntry
34-
import org.modelix.model.server.store.runTransactionSuspendable
33+
import org.modelix.model.server.store.runReadIO
34+
import org.modelix.model.server.store.runWriteIO
3535
import org.modelix.model.server.templates.PageWithMenuBar
3636
import java.io.IOException
3737
import java.util.*
@@ -61,7 +61,7 @@ class KeyValueLikeModelServer(
6161
// request to initialize it lazily, would make the code less robust.
6262
// Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need
6363
// the special conditions in the affected requests to be updated.
64-
runBlocking { repositoriesManager.maybeInitAndGetSeverId() }
64+
repositoriesManager.getTransactionManager().runWrite { repositoriesManager.maybeInitAndGetSeverId() }
6565
application.apply {
6666
modelServerModule()
6767
}
@@ -89,7 +89,7 @@ class KeyValueLikeModelServer(
8989
get<Paths.getKeyGet> {
9090
val key = call.parameters["key"]!!
9191
checkKeyPermission(key, EPermissionType.READ)
92-
val value = stores.getGlobalKeyValueStore()[key]
92+
val value = runRead { stores.getGlobalStoreClient()[key] }
9393
respondValue(key, value)
9494
}
9595
get<Paths.pollKeyGet> {
@@ -106,21 +106,23 @@ class KeyValueLikeModelServer(
106106
post<Paths.counterKeyPost> {
107107
val key = call.parameters["key"]!!
108108
checkKeyPermission(key, EPermissionType.WRITE)
109-
val value = stores.getGlobalStoreClient().generateId(key)
109+
val value = stores.getGlobalStoreClient(false).generateId(key)
110110
call.respondText(text = value.toString())
111111
}
112112

113113
get<Paths.getRecursivelyKeyGet> {
114114
val key = call.parameters["key"]!!
115115
checkKeyPermission(key, EPermissionType.READ)
116-
call.respondText(collect(key, this).toString(2), contentType = ContentType.Application.Json)
116+
call.respondText(runRead { collect(key, this) }.toString(2), contentType = ContentType.Application.Json)
117117
}
118118

119119
put<Paths.putKeyPut> {
120120
val key = call.parameters["key"]!!
121121
val value = call.receiveText()
122122
try {
123-
putEntries(mapOf(key to value))
123+
runWrite {
124+
putEntries(mapOf(key to value))
125+
}
124126
call.respondText("OK")
125127
} catch (e: NotFoundException) {
126128
throw HttpException(HttpStatusCode.NotFound, title = "Not found", details = e.message, cause = e)
@@ -139,7 +141,9 @@ class KeyValueLikeModelServer(
139141
}
140142
entries = sortByDependency(entries)
141143
try {
142-
putEntries(entries)
144+
runWrite {
145+
putEntries(entries)
146+
}
143147
call.respondText(entries.size.toString() + " entries written")
144148
} catch (e: NotFoundException) {
145149
throw HttpException(HttpStatusCode.NotFound, title = "Not found", details = e.message, cause = e)
@@ -158,7 +162,7 @@ class KeyValueLikeModelServer(
158162
checkKeyPermission(key, EPermissionType.READ)
159163
keys.add(key)
160164
}
161-
val values = stores.getGlobalStoreClient().getAll(keys)
165+
val values = runRead { stores.getGlobalStoreClient(false).getAll(keys) }
162166
for (i in keys.indices) {
163167
val respEntry = JSONObject()
164168
respEntry.put("key", keys[i])
@@ -210,7 +214,7 @@ class KeyValueLikeModelServer(
210214
if (callContext != null) {
211215
keys.forEach { callContext.checkKeyPermission(it, EPermissionType.READ) }
212216
}
213-
val values = stores.getGlobalStoreClient().getAll(keys)
217+
val values = stores.getGlobalStoreClient(false).getAll(keys)
214218
for (i in keys.indices) {
215219
val key = keys[i]
216220
val value = values[i]
@@ -240,7 +244,7 @@ class KeyValueLikeModelServer(
240244
return result
241245
}
242246

243-
private suspend fun CallContext.putEntries(newEntries: Map<String, String?>) {
247+
private fun CallContext.putEntries(newEntries: Map<String, String?>) {
244248
val referencedKeys: MutableSet<String> = HashSet()
245249
for ((key, value) in newEntries) {
246250
checkKeyPermission(key, EPermissionType.WRITE)
@@ -300,17 +304,15 @@ class KeyValueLikeModelServer(
300304
// We could try to move the objects later, but since this API is deprecated, it's not worth the effort.
301305
}
302306

303-
stores.getGlobalStoreClient().runTransactionSuspendable {
304-
stores.genericStore.putAll(hashedObjects.mapKeys { ObjectInRepository.global(it.key) })
305-
stores.genericStore.putAll(userDefinedEntries.mapKeys { ObjectInRepository.global(it.key) })
306-
for ((branch, value) in branchChanges) {
307-
if (value == null) {
308-
checkPermission(ModelServerPermissionSchema.branch(branch).delete)
309-
repositoriesManager.removeBranchesBlocking(branch.repositoryId, setOf(branch.branchName))
310-
} else {
311-
checkPermission(ModelServerPermissionSchema.branch(branch).push)
312-
repositoriesManager.mergeChangesBlocking(branch, value)
313-
}
307+
stores.genericStore.putAll(hashedObjects.mapKeys { ObjectInRepository.global(it.key) })
308+
stores.genericStore.putAll(userDefinedEntries.mapKeys { ObjectInRepository.global(it.key) })
309+
for ((branch, value) in branchChanges) {
310+
if (value == null) {
311+
checkPermission(ModelServerPermissionSchema.branch(branch).delete)
312+
repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName))
313+
} else {
314+
checkPermission(ModelServerPermissionSchema.branch(branch).push)
315+
repositoriesManager.mergeChangesBlocking(branch, value)
314316
}
315317
}
316318
}
@@ -363,4 +365,12 @@ class KeyValueLikeModelServer(
363365
else -> unknown()
364366
}
365367
}
368+
369+
private suspend fun <R> runRead(body: () -> R): R {
370+
return repositoriesManager.getTransactionManager().runReadIO(body)
371+
}
372+
373+
private suspend fun <R> runWrite(body: () -> R): R {
374+
return repositoriesManager.getTransactionManager().runWriteIO(body)
375+
}
366376
}

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ import org.modelix.model.server.api.v2.VersionDelta
4949
import org.modelix.model.server.api.v2.VersionDeltaStream
5050
import org.modelix.model.server.api.v2.VersionDeltaStreamV2
5151
import org.modelix.model.server.store.StoreManager
52+
import org.modelix.model.server.store.runReadIO
53+
import org.modelix.model.server.store.runWriteIO
5254
import org.modelix.modelql.core.IMemoizationPersistence
5355
import org.modelix.modelql.core.IStepOutput
5456
import org.modelix.modelql.core.MonoUnboundQuery
@@ -89,16 +91,15 @@ class ModelReplicationServer(
8991

9092
override suspend fun PipelineContext<Unit, ApplicationCall>.getRepositories() {
9193
call.respondText(
92-
repositoriesManager.getRepositories()
94+
runRead { repositoriesManager.getRepositories() }
9395
.filter { call.hasPermission(ModelServerPermissionSchema.repository(it).list) }
9496
.joinToString("\n") { it.id },
9597
)
9698
}
9799

98100
override suspend fun PipelineContext<Unit, ApplicationCall>.getRepositoryBranches(repository: String) {
99101
call.respondText(
100-
repositoriesManager
101-
.getBranchNames(repositoryId(repository))
102+
runRead { repositoriesManager.getBranchNames(repositoryId(repository)) }
102103
.filter { call.hasPermission(ModelServerPermissionSchema.repository(repository).branch(it).list) }
103104
.joinToString("\n"),
104105
)
@@ -111,7 +112,9 @@ class ModelReplicationServer(
111112
) {
112113
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
113114
val branchRef = repositoryId(repository).getBranchReference(branch)
114-
val versionHash = repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
115+
val versionHash = runRead {
116+
repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
117+
}
115118
call.respondDelta(RepositoryId(repository), versionHash, lastKnown)
116119
}
117120

@@ -122,7 +125,7 @@ class ModelReplicationServer(
122125
) {
123126
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
124127
val branchRef = repositoryId(repository).getBranchReference(branch)
125-
val versionHash = repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
128+
val versionHash = runRead { repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) }
126129
call.respond(BranchV1(branch, versionHash))
127130
}
128131

@@ -138,11 +141,13 @@ class ModelReplicationServer(
138141

139142
checkPermission(ModelServerPermissionSchema.repository(repositoryId).branch(branch).delete)
140143

141-
if (!repositoriesManager.getBranchNames(repositoryId).contains(branch)) {
142-
throw BranchNotFoundException(branch, repositoryId.id)
143-
}
144+
runWrite {
145+
if (!repositoriesManager.getBranchNames(repositoryId).contains(branch)) {
146+
throw BranchNotFoundException(branch, repositoryId.id)
147+
}
144148

145-
repositoriesManager.removeBranches(repositoryId, setOf(branch))
149+
repositoriesManager.removeBranches(repositoryId, setOf(branch))
150+
}
146151

147152
call.respond(HttpStatusCode.NoContent)
148153
}
@@ -153,7 +158,7 @@ class ModelReplicationServer(
153158
) {
154159
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
155160
val branchRef = repositoryId(repository).getBranchReference(branch)
156-
val versionHash = repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
161+
val versionHash = runRead { repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) }
157162
call.respondText(versionHash)
158163
}
159164

@@ -163,19 +168,23 @@ class ModelReplicationServer(
163168
legacyGlobalStorage: Boolean?,
164169
) {
165170
checkPermission(ModelServerPermissionSchema.repository(repository).create)
166-
val initialVersion = repositoriesManager.createRepository(
167-
repositoryId(repository),
168-
call.getUserName(),
169-
useRoleIds ?: true,
170-
legacyGlobalStorage ?: false,
171-
)
171+
val initialVersion = runWrite {
172+
repositoriesManager.createRepository(
173+
repositoryId(repository),
174+
call.getUserName(),
175+
useRoleIds ?: true,
176+
legacyGlobalStorage ?: false,
177+
)
178+
}
172179
call.respondDelta(RepositoryId(repository), initialVersion.getContentHash(), null)
173180
}
174181

175182
override suspend fun PipelineContext<Unit, ApplicationCall>.deleteRepository(repository: String) {
176183
checkPermission(ModelServerPermissionSchema.repository(repository).delete)
177184

178-
val foundAndDeleted = repositoriesManager.removeRepository(repositoryId(repository))
185+
val foundAndDeleted = runWrite {
186+
repositoriesManager.removeRepository(repositoryId(repository))
187+
}
179188
if (foundAndDeleted) {
180189
call.respond(HttpStatusCode.NoContent)
181190
} else {
@@ -191,8 +200,10 @@ class ModelReplicationServer(
191200
val branchRef = repositoryId(repository).getBranchReference(branch)
192201
val deltaFromClient = call.receive<VersionDelta>()
193202
deltaFromClient.checkObjectHashes()
194-
repositoriesManager.getStoreClient(RepositoryId(repository)).putAll(deltaFromClient.getAllObjects())
195-
val mergedHash = repositoriesManager.mergeChanges(branchRef, deltaFromClient.versionHash)
203+
repositoriesManager.getStoreClient(RepositoryId(repository), true).putAll(deltaFromClient.getAllObjects())
204+
val mergedHash = runWrite {
205+
repositoriesManager.mergeChanges(branchRef, deltaFromClient.versionHash)
206+
}
196207
call.respondDelta(RepositoryId(repository), mergedHash, deltaFromClient.versionHash)
197208
}
198209

@@ -217,7 +228,7 @@ class ModelReplicationServer(
217228
}
218229

219230
val objects = withContext(Dispatchers.IO) {
220-
repositoriesManager.getStoreClient(RepositoryId(repository)).getAll(keys)
231+
repositoriesManager.getStoreClient(RepositoryId(repository), true).getAll(keys)
221232
}
222233

223234
for (entry in objects) {
@@ -260,7 +271,7 @@ class ModelReplicationServer(
260271
) {
261272
val branchRef = repositoryId(repository).getBranchReference(branchName)
262273
checkPermission(ModelServerPermissionSchema.branch(branchRef).query)
263-
val version = repositoriesManager.getVersion(branchRef) ?: throw BranchNotFoundException(branchRef)
274+
val version = runRead { repositoriesManager.getVersion(branchRef) ?: throw BranchNotFoundException(branchRef) }
264275
LOG.trace("Running query on {} @ {}", branchRef, version)
265276
val initialTree = version.getTree()
266277

@@ -290,7 +301,9 @@ class ModelReplicationServer(
290301
baseVersion = version,
291302
operations = ops.map { it.getOriginalOp() }.toTypedArray(),
292303
)
293-
repositoriesManager.mergeChanges(branchRef, newVersion.getContentHash())
304+
runWrite {
305+
repositoriesManager.mergeChanges(branchRef, newVersion.getContentHash())
306+
}
294307
}
295308
}
296309
})
@@ -330,7 +343,7 @@ class ModelReplicationServer(
330343
}
331344

332345
withContext(Dispatchers.IO) {
333-
repositoriesManager.getStoreClient(RepositoryId(repository)).putAll(entries, true)
346+
repositoriesManager.getStoreClient(RepositoryId(repository), true).putAll(entries, true)
334347
}
335348
call.respondText("${entries.size} objects received")
336349
}
@@ -341,7 +354,7 @@ class ModelReplicationServer(
341354
lastKnown: String?,
342355
) {
343356
checkPermission(ModelServerPermissionSchema.legacyGlobalObjects.read)
344-
if (stores.getGlobalStoreClient()[versionHash] == null) {
357+
if (runRead { stores.getGlobalStoreClient()[versionHash] } == null) {
345358
throw VersionNotFoundException(versionHash)
346359
}
347360
call.respondDelta(null, versionHash, lastKnown)
@@ -402,6 +415,14 @@ class ModelReplicationServer(
402415
}
403416
}
404417
}
418+
419+
private suspend fun <R> runRead(body: () -> R): R {
420+
return repositoriesManager.getTransactionManager().runReadIO(body)
421+
}
422+
423+
private suspend fun <R> runWrite(body: () -> R): R {
424+
return repositoriesManager.getTransactionManager().runWriteIO(body)
425+
}
405426
}
406427

407428
/**

0 commit comments

Comments
 (0)