Skip to content

Commit 2a10185

Browse files
committed
fix(model-server): @RequiresTransaction annotation to help avoiding MissingTransactionException
1 parent 2670d49 commit 2a10185

32 files changed

+529
-255
lines changed

model-server/src/main/kotlin/org/modelix/model/server/Main.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import org.modelix.model.server.handlers.ui.RepositoryOverview
5858
import org.modelix.model.server.store.IgniteStoreClient
5959
import org.modelix.model.server.store.InMemoryStoreClient
6060
import org.modelix.model.server.store.IsolatingStore
61+
import org.modelix.model.server.store.RequiresTransaction
6162
import org.modelix.model.server.store.forGlobalRepository
6263
import org.modelix.model.server.store.loadDump
6364
import org.modelix.model.server.store.writeDump
@@ -149,9 +150,12 @@ object Main {
149150
}
150151
var i = 0
151152
val globalStoreClient = storeClient.forGlobalRepository()
152-
while (i < cmdLineArgs.setValues.size) {
153-
globalStoreClient.put(cmdLineArgs.setValues[i], cmdLineArgs.setValues[i + 1])
154-
i += 2
153+
@OptIn(RequiresTransaction::class)
154+
globalStoreClient.getTransactionManager().runWrite {
155+
while (i < cmdLineArgs.setValues.size) {
156+
globalStoreClient.put(cmdLineArgs.setValues[i], cmdLineArgs.setValues[i + 1])
157+
i += 2
158+
}
155159
}
156160
val repositoriesManager = RepositoriesManager(storeClient)
157161
val modelServer = KeyValueLikeModelServer(repositoriesManager)

model-server/src/main/kotlin/org/modelix/model/server/handlers/HealthApiImpl.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import io.ktor.server.application.call
77
import io.ktor.server.response.respondText
88
import io.ktor.util.pipeline.PipelineContext
99
import org.modelix.model.server.handlers.KeyValueLikeModelServer.Companion.PROTECTED_PREFIX
10+
import org.modelix.model.server.store.RequiresTransaction
1011
import org.modelix.model.server.store.StoreManager
1112

1213
class HealthApiImpl(
@@ -25,6 +26,7 @@ class HealthApiImpl(
2526

2627
private fun isHealthy(): Boolean {
2728
val store = stores.getGlobalStoreClient()
29+
@OptIn(RequiresTransaction::class)
2830
return store.getTransactionManager().runWrite {
2931
val value = toLong(store[HEALTH_KEY]) + 1
3032
store.put(HEALTH_KEY, java.lang.Long.toString(value))

model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import org.modelix.model.lazy.IDeserializingKeyValueStore
77
import org.modelix.model.lazy.RepositoryId
88
import org.modelix.model.server.store.IStoreClient
99
import org.modelix.model.server.store.ITransactionManager
10+
import org.modelix.model.server.store.RequiresTransaction
1011
import org.modelix.model.server.store.StoreManager
1112

1213
interface IRepositoriesManager {
@@ -19,29 +20,38 @@ interface IRepositoriesManager {
1920
* If the server ID was created previously but is only stored under a legacy database key,
2021
* it also gets stored under the current and all legacy database keys.
2122
*/
23+
@RequiresTransaction
2224
fun maybeInitAndGetSeverId(): String
25+
26+
@RequiresTransaction
2327
fun getRepositories(): Set<RepositoryId>
28+
29+
@RequiresTransaction
2430
fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true, legacyGlobalStorage: Boolean = false): CLVersion
31+
32+
@RequiresTransaction
2533
fun removeRepository(repository: RepositoryId): Boolean
2634

35+
@RequiresTransaction
2736
fun getBranches(repositoryId: RepositoryId): Set<BranchReference>
2837

2938
/**
3039
* Same as [removeBranches] but blocking.
3140
* Caller is expected to execute it outside the request thread.
3241
*/
42+
@RequiresTransaction
3343
fun removeBranches(repository: RepositoryId, branchNames: Set<String>)
44+
45+
@RequiresTransaction
3446
fun getVersion(branch: BranchReference): CLVersion?
3547
fun getVersion(repository: RepositoryId, versionHash: String): CLVersion?
48+
49+
@RequiresTransaction
3650
fun getVersionHash(branch: BranchReference): String?
3751
suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String
38-
fun mergeChanges(branch: BranchReference, newVersionHash: String): String
3952

40-
/**
41-
* Same as [mergeChanges] but blocking.
42-
* Caller is expected to execute it outside the request thread.
43-
*/
44-
fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String
53+
@RequiresTransaction
54+
fun mergeChanges(branch: BranchReference, newVersionHash: String): String
4555
suspend fun computeDelta(repository: RepositoryId?, versionHash: String, baseVersionHash: String?): ObjectData
4656

4757
/**
@@ -56,6 +66,7 @@ interface IRepositoriesManager {
5666
fun getTransactionManager(): ITransactionManager
5767
}
5868

69+
@RequiresTransaction
5970
fun IRepositoriesManager.getBranchNames(repositoryId: RepositoryId): Set<String> {
6071
return getBranches(repositoryId).map { it.branchName }.toSet()
6172
}

model-server/src/main/kotlin/org/modelix/model/server/handlers/IdsApiImpl.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import io.ktor.server.routing.routing
99
import io.ktor.util.pipeline.PipelineContext
1010
import org.modelix.authorization.getUserName
1111
import org.modelix.authorization.requiresLogin
12+
import org.modelix.model.server.store.RequiresTransaction
13+
import org.modelix.model.server.store.runReadIO
1214

1315
/**
1416
* Implementation of the REST API that is responsible for handling client and server IDs.
@@ -24,7 +26,11 @@ class IdsApiImpl(
2426
//
2527
// Functionally, it does not matter if the server ID is created eagerly or lazily,
2628
// as long as the same server ID is returned from the same server.
27-
val serverId = repositoriesManager.maybeInitAndGetSeverId()
29+
val serverId =
30+
@OptIn(RequiresTransaction::class)
31+
repositoriesManager.getTransactionManager().runReadIO {
32+
repositoriesManager.maybeInitAndGetSeverId()
33+
}
2834
call.respondText(serverId)
2935
}
3036

model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.modelix.model.lazy.BranchReference
2828
import org.modelix.model.persistent.HashUtil
2929
import org.modelix.model.server.ModelServerPermissionSchema
3030
import org.modelix.model.server.store.ObjectInRepository
31+
import org.modelix.model.server.store.RequiresTransaction
3132
import org.modelix.model.server.store.StoreManager
3233
import org.modelix.model.server.store.pollEntry
3334
import org.modelix.model.server.store.runReadIO
@@ -61,6 +62,7 @@ class KeyValueLikeModelServer(
6162
// request to initialize it lazily, would make the code less robust.
6263
// Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need
6364
// the special conditions in the affected requests to be updated.
65+
@OptIn(RequiresTransaction::class)
6466
repositoriesManager.getTransactionManager().runWrite { repositoriesManager.maybeInitAndGetSeverId() }
6567
application.apply {
6668
modelServerModule()
@@ -89,6 +91,7 @@ class KeyValueLikeModelServer(
8991
get<Paths.getKeyGet> {
9092
val key = call.parameters["key"]!!
9193
checkKeyPermission(key, EPermissionType.READ)
94+
@OptIn(RequiresTransaction::class)
9295
val value = runRead { stores.getGlobalStoreClient()[key] }
9396
respondValue(key, value)
9497
}
@@ -113,13 +116,15 @@ class KeyValueLikeModelServer(
113116
get<Paths.getRecursivelyKeyGet> {
114117
val key = call.parameters["key"]!!
115118
checkKeyPermission(key, EPermissionType.READ)
119+
@OptIn(RequiresTransaction::class)
116120
call.respondText(runRead { collect(key, this) }.toString(2), contentType = ContentType.Application.Json)
117121
}
118122

119123
put<Paths.putKeyPut> {
120124
val key = call.parameters["key"]!!
121125
val value = call.receiveText()
122126
try {
127+
@OptIn(RequiresTransaction::class)
123128
runWrite {
124129
putEntries(mapOf(key to value))
125130
}
@@ -141,6 +146,7 @@ class KeyValueLikeModelServer(
141146
}
142147
entries = sortByDependency(entries)
143148
try {
149+
@OptIn(RequiresTransaction::class)
144150
runWrite {
145151
putEntries(entries)
146152
}
@@ -162,6 +168,7 @@ class KeyValueLikeModelServer(
162168
checkKeyPermission(key, EPermissionType.READ)
163169
keys.add(key)
164170
}
171+
@OptIn(RequiresTransaction::class)
165172
val values = runRead { stores.getGlobalStoreClient(false).getAll(keys) }
166173
for (i in keys.indices) {
167174
val respEntry = JSONObject()
@@ -203,6 +210,7 @@ class KeyValueLikeModelServer(
203210
return sorted
204211
}
205212

213+
@RequiresTransaction
206214
fun collect(rootKey: String, callContext: CallContext?): JSONArray {
207215
val result = JSONArray()
208216
val processed: MutableSet<String> = HashSet()
@@ -244,6 +252,7 @@ class KeyValueLikeModelServer(
244252
return result
245253
}
246254

255+
@RequiresTransaction
247256
private fun CallContext.putEntries(newEntries: Map<String, String?>) {
248257
val referencedKeys: MutableSet<String> = HashSet()
249258
for ((key, value) in newEntries) {
@@ -312,7 +321,7 @@ class KeyValueLikeModelServer(
312321
repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName))
313322
} else {
314323
checkPermission(ModelServerPermissionSchema.branch(branch).push)
315-
repositoriesManager.mergeChangesBlocking(branch, value)
324+
repositoriesManager.mergeChanges(branch, value)
316325
}
317326
}
318327
}

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import org.modelix.model.server.api.v2.ImmutableObjectsStream
4848
import org.modelix.model.server.api.v2.VersionDelta
4949
import org.modelix.model.server.api.v2.VersionDeltaStream
5050
import org.modelix.model.server.api.v2.VersionDeltaStreamV2
51+
import org.modelix.model.server.store.RequiresTransaction
5152
import org.modelix.model.server.store.StoreManager
5253
import org.modelix.model.server.store.runReadIO
5354
import org.modelix.model.server.store.runWriteIO
@@ -91,6 +92,7 @@ class ModelReplicationServer(
9192

9293
override suspend fun PipelineContext<Unit, ApplicationCall>.getRepositories() {
9394
call.respondText(
95+
@OptIn(RequiresTransaction::class)
9496
runRead { repositoriesManager.getRepositories() }
9597
.filter { call.hasPermission(ModelServerPermissionSchema.repository(it).list) }
9698
.joinToString("\n") { it.id },
@@ -99,6 +101,7 @@ class ModelReplicationServer(
99101

100102
override suspend fun PipelineContext<Unit, ApplicationCall>.getRepositoryBranches(repository: String) {
101103
call.respondText(
104+
@OptIn(RequiresTransaction::class)
102105
runRead { repositoriesManager.getBranchNames(repositoryId(repository)) }
103106
.filter { call.hasPermission(ModelServerPermissionSchema.repository(repository).branch(it).list) }
104107
.joinToString("\n"),
@@ -112,6 +115,8 @@ class ModelReplicationServer(
112115
) {
113116
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
114117
val branchRef = repositoryId(repository).getBranchReference(branch)
118+
119+
@OptIn(RequiresTransaction::class)
115120
val versionHash = runRead {
116121
repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
117122
}
@@ -125,7 +130,11 @@ class ModelReplicationServer(
125130
) {
126131
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
127132
val branchRef = repositoryId(repository).getBranchReference(branch)
128-
val versionHash = runRead { repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) }
133+
134+
@OptIn(RequiresTransaction::class)
135+
val versionHash = runRead {
136+
repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
137+
}
129138
call.respond(BranchV1(branch, versionHash))
130139
}
131140

@@ -141,6 +150,7 @@ class ModelReplicationServer(
141150

142151
checkPermission(ModelServerPermissionSchema.repository(repositoryId).branch(branch).delete)
143152

153+
@OptIn(RequiresTransaction::class)
144154
runWrite {
145155
if (!repositoriesManager.getBranchNames(repositoryId).contains(branch)) {
146156
throw BranchNotFoundException(branch, repositoryId.id)
@@ -158,7 +168,11 @@ class ModelReplicationServer(
158168
) {
159169
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
160170
val branchRef = repositoryId(repository).getBranchReference(branch)
161-
val versionHash = runRead { repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef) }
171+
172+
@OptIn(RequiresTransaction::class)
173+
val versionHash = runRead {
174+
repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
175+
}
162176
call.respondText(versionHash)
163177
}
164178

@@ -168,6 +182,7 @@ class ModelReplicationServer(
168182
legacyGlobalStorage: Boolean?,
169183
) {
170184
checkPermission(ModelServerPermissionSchema.repository(repository).create)
185+
@OptIn(RequiresTransaction::class)
171186
val initialVersion = runWrite {
172187
repositoriesManager.createRepository(
173188
repositoryId(repository),
@@ -182,6 +197,7 @@ class ModelReplicationServer(
182197
override suspend fun PipelineContext<Unit, ApplicationCall>.deleteRepository(repository: String) {
183198
checkPermission(ModelServerPermissionSchema.repository(repository).delete)
184199

200+
@OptIn(RequiresTransaction::class)
185201
val foundAndDeleted = runWrite {
186202
repositoriesManager.removeRepository(repositoryId(repository))
187203
}
@@ -200,7 +216,9 @@ class ModelReplicationServer(
200216
val branchRef = repositoryId(repository).getBranchReference(branch)
201217
val deltaFromClient = call.receive<VersionDelta>()
202218
deltaFromClient.checkObjectHashes()
219+
@OptIn(RequiresTransaction::class) // no transactions required for immutable store
203220
repositoriesManager.getStoreClient(RepositoryId(repository), true).putAll(deltaFromClient.getAllObjects())
221+
@OptIn(RequiresTransaction::class)
204222
val mergedHash = runWrite {
205223
repositoriesManager.mergeChanges(branchRef, deltaFromClient.versionHash)
206224
}
@@ -228,6 +246,7 @@ class ModelReplicationServer(
228246
}
229247

230248
val objects = withContext(Dispatchers.IO) {
249+
@OptIn(RequiresTransaction::class) // no transactions required for immutable store
231250
repositoriesManager.getStoreClient(RepositoryId(repository), true).getAll(keys)
232251
}
233252

@@ -271,6 +290,7 @@ class ModelReplicationServer(
271290
) {
272291
val branchRef = repositoryId(repository).getBranchReference(branchName)
273292
checkPermission(ModelServerPermissionSchema.branch(branchRef).query)
293+
@OptIn(RequiresTransaction::class)
274294
val version = runRead { repositoriesManager.getVersion(branchRef) ?: throw BranchNotFoundException(branchRef) }
275295
LOG.trace("Running query on {} @ {}", branchRef, version)
276296
val initialTree = version.getTree()
@@ -301,6 +321,7 @@ class ModelReplicationServer(
301321
baseVersion = version,
302322
operations = ops.map { it.getOriginalOp() }.toTypedArray(),
303323
)
324+
@OptIn(RequiresTransaction::class)
304325
runWrite {
305326
repositoriesManager.mergeChanges(branchRef, newVersion.getContentHash())
306327
}
@@ -343,6 +364,7 @@ class ModelReplicationServer(
343364
}
344365

345366
withContext(Dispatchers.IO) {
367+
@OptIn(RequiresTransaction::class) // no transactions required for immutable store
346368
repositoriesManager.getStoreClient(RepositoryId(repository), true).putAll(entries, true)
347369
}
348370
call.respondText("${entries.size} objects received")
@@ -354,6 +376,7 @@ class ModelReplicationServer(
354376
lastKnown: String?,
355377
) {
356378
checkPermission(ModelServerPermissionSchema.legacyGlobalObjects.read)
379+
@OptIn(RequiresTransaction::class)
357380
if (runRead { stores.getGlobalStoreClient()[versionHash] } == null) {
358381
throw VersionNotFoundException(versionHash)
359382
}

0 commit comments

Comments
 (0)