@@ -13,7 +13,6 @@ import io.ktor.server.resources.put
13
13
import io.ktor.server.response.respondText
14
14
import io.ktor.server.routing.routing
15
15
import io.ktor.util.pipeline.PipelineContext
16
- import kotlinx.coroutines.runBlocking
17
16
import kotlinx.html.br
18
17
import kotlinx.html.div
19
18
import kotlinx.html.h1
@@ -29,9 +28,11 @@ import org.modelix.model.lazy.BranchReference
29
28
import org.modelix.model.persistent.HashUtil
30
29
import org.modelix.model.server.ModelServerPermissionSchema
31
30
import org.modelix.model.server.store.ObjectInRepository
31
+ import org.modelix.model.server.store.RequiresTransaction
32
32
import org.modelix.model.server.store.StoreManager
33
33
import org.modelix.model.server.store.pollEntry
34
- import org.modelix.model.server.store.runTransactionSuspendable
34
+ import org.modelix.model.server.store.runReadIO
35
+ import org.modelix.model.server.store.runWriteIO
35
36
import org.modelix.model.server.templates.PageWithMenuBar
36
37
import java.io.IOException
37
38
import java.util.*
@@ -61,7 +62,8 @@ class KeyValueLikeModelServer(
61
62
// request to initialize it lazily, would make the code less robust.
62
63
// Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need
63
64
// the special conditions in the affected requests to be updated.
64
- runBlocking { repositoriesManager.maybeInitAndGetSeverId() }
65
+ @OptIn(RequiresTransaction ::class )
66
+ repositoriesManager.getTransactionManager().runWrite { repositoriesManager.maybeInitAndGetSeverId() }
65
67
application.apply {
66
68
modelServerModule()
67
69
}
@@ -89,7 +91,8 @@ class KeyValueLikeModelServer(
89
91
get<Paths .getKeyGet> {
90
92
val key = call.parameters[" key" ]!!
91
93
checkKeyPermission(key, EPermissionType .READ )
92
- val value = stores.getGlobalKeyValueStore()[key]
94
+ @OptIn(RequiresTransaction ::class )
95
+ val value = runRead { stores.getGlobalStoreClient()[key] }
93
96
respondValue(key, value)
94
97
}
95
98
get<Paths .pollKeyGet> {
@@ -106,21 +109,25 @@ class KeyValueLikeModelServer(
106
109
post<Paths .counterKeyPost> {
107
110
val key = call.parameters[" key" ]!!
108
111
checkKeyPermission(key, EPermissionType .WRITE )
109
- val value = stores.getGlobalStoreClient().generateId(key)
112
+ val value = stores.getGlobalStoreClient(false ).generateId(key)
110
113
call.respondText(text = value.toString())
111
114
}
112
115
113
116
get<Paths .getRecursivelyKeyGet> {
114
117
val key = call.parameters[" key" ]!!
115
118
checkKeyPermission(key, EPermissionType .READ )
116
- call.respondText(collect(key, this ).toString(2 ), contentType = ContentType .Application .Json )
119
+ @OptIn(RequiresTransaction ::class )
120
+ call.respondText(runRead { collect(key, this ) }.toString(2 ), contentType = ContentType .Application .Json )
117
121
}
118
122
119
123
put<Paths .putKeyPut> {
120
124
val key = call.parameters[" key" ]!!
121
125
val value = call.receiveText()
122
126
try {
123
- putEntries(mapOf (key to value))
127
+ @OptIn(RequiresTransaction ::class )
128
+ runWrite {
129
+ putEntries(mapOf (key to value))
130
+ }
124
131
call.respondText(" OK" )
125
132
} catch (e: NotFoundException ) {
126
133
throw HttpException (HttpStatusCode .NotFound , title = " Not found" , details = e.message, cause = e)
@@ -139,7 +146,10 @@ class KeyValueLikeModelServer(
139
146
}
140
147
entries = sortByDependency(entries)
141
148
try {
142
- putEntries(entries)
149
+ @OptIn(RequiresTransaction ::class )
150
+ runWrite {
151
+ putEntries(entries)
152
+ }
143
153
call.respondText(entries.size.toString() + " entries written" )
144
154
} catch (e: NotFoundException ) {
145
155
throw HttpException (HttpStatusCode .NotFound , title = " Not found" , details = e.message, cause = e)
@@ -158,7 +168,8 @@ class KeyValueLikeModelServer(
158
168
checkKeyPermission(key, EPermissionType .READ )
159
169
keys.add(key)
160
170
}
161
- val values = stores.getGlobalStoreClient().getAll(keys)
171
+ @OptIn(RequiresTransaction ::class )
172
+ val values = runRead { stores.getGlobalStoreClient(false ).getAll(keys) }
162
173
for (i in keys.indices) {
163
174
val respEntry = JSONObject ()
164
175
respEntry.put(" key" , keys[i])
@@ -199,6 +210,7 @@ class KeyValueLikeModelServer(
199
210
return sorted
200
211
}
201
212
213
+ @RequiresTransaction
202
214
fun collect (rootKey : String , callContext : CallContext ? ): JSONArray {
203
215
val result = JSONArray ()
204
216
val processed: MutableSet <String > = HashSet ()
@@ -210,7 +222,7 @@ class KeyValueLikeModelServer(
210
222
if (callContext != null ) {
211
223
keys.forEach { callContext.checkKeyPermission(it, EPermissionType .READ ) }
212
224
}
213
- val values = stores.getGlobalStoreClient().getAll(keys)
225
+ val values = stores.getGlobalStoreClient(false ).getAll(keys)
214
226
for (i in keys.indices) {
215
227
val key = keys[i]
216
228
val value = values[i]
@@ -240,7 +252,8 @@ class KeyValueLikeModelServer(
240
252
return result
241
253
}
242
254
243
- private suspend fun CallContext.putEntries (newEntries : Map <String , String ?>) {
255
+ @RequiresTransaction
256
+ private fun CallContext.putEntries (newEntries : Map <String , String ?>) {
244
257
val referencedKeys: MutableSet <String > = HashSet ()
245
258
for ((key, value) in newEntries) {
246
259
checkKeyPermission(key, EPermissionType .WRITE )
@@ -300,17 +313,15 @@ class KeyValueLikeModelServer(
300
313
// We could try to move the objects later, but since this API is deprecated, it's not worth the effort.
301
314
}
302
315
303
- stores.getGlobalStoreClient().runTransactionSuspendable {
304
- stores.genericStore.putAll(hashedObjects.mapKeys { ObjectInRepository .global(it.key) })
305
- stores.genericStore.putAll(userDefinedEntries.mapKeys { ObjectInRepository .global(it.key) })
306
- for ((branch, value) in branchChanges) {
307
- if (value == null ) {
308
- checkPermission(ModelServerPermissionSchema .branch(branch).delete)
309
- repositoriesManager.removeBranchesBlocking(branch.repositoryId, setOf (branch.branchName))
310
- } else {
311
- checkPermission(ModelServerPermissionSchema .branch(branch).push)
312
- repositoriesManager.mergeChangesBlocking(branch, value)
313
- }
316
+ stores.genericStore.putAll(hashedObjects.mapKeys { ObjectInRepository .global(it.key) })
317
+ stores.genericStore.putAll(userDefinedEntries.mapKeys { ObjectInRepository .global(it.key) })
318
+ for ((branch, value) in branchChanges) {
319
+ if (value == null ) {
320
+ checkPermission(ModelServerPermissionSchema .branch(branch).delete)
321
+ repositoriesManager.removeBranches(branch.repositoryId, setOf (branch.branchName))
322
+ } else {
323
+ checkPermission(ModelServerPermissionSchema .branch(branch).push)
324
+ repositoriesManager.mergeChanges(branch, value)
314
325
}
315
326
}
316
327
}
@@ -363,4 +374,12 @@ class KeyValueLikeModelServer(
363
374
else -> unknown()
364
375
}
365
376
}
377
+
378
+ private suspend fun <R > runRead (body : () -> R ): R {
379
+ return repositoriesManager.getTransactionManager().runReadIO(body)
380
+ }
381
+
382
+ private suspend fun <R > runWrite (body : () -> R ): R {
383
+ return repositoriesManager.getTransactionManager().runWriteIO(body)
384
+ }
366
385
}
0 commit comments