Skip to content

Commit 78c95b5

Browse files
author
Oleksandr Dzhychko
authored
Merge pull request #646 from modelix/use-respondBytesWriter
chore(model-server): use non-blocking method to send version delta
2 parents 3e38727 + 8f02751 commit 78c95b5

File tree

12 files changed

+322
-63
lines changed

12 files changed

+322
-63
lines changed

model-server/src/main/kotlin/org/modelix/model/server/Main.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import org.apache.commons.io.FileUtils
4848
import org.apache.ignite.Ignition
4949
import org.modelix.authorization.KeycloakUtils
5050
import org.modelix.authorization.installAuthentication
51+
import org.modelix.model.InMemoryModels
5152
import org.modelix.model.server.handlers.ContentExplorer
5253
import org.modelix.model.server.handlers.DeprecatedLightModelServer
5354
import org.modelix.model.server.handlers.HistoryHandler
@@ -150,8 +151,9 @@ object Main {
150151
i += 2
151152
}
152153
val localModelClient = LocalModelClient(storeClient)
154+
val inMemoryModels = InMemoryModels()
153155
val repositoriesManager = RepositoriesManager(localModelClient)
154-
val modelServer = KeyValueLikeModelServer(repositoriesManager)
156+
val modelServer = KeyValueLikeModelServer(repositoriesManager, storeClient, inMemoryModels)
155157
val sharedSecretFile = cmdLineArgs.secretFile
156158
if (sharedSecretFile.exists()) {
157159
modelServer.setSharedSecret(
@@ -162,7 +164,7 @@ object Main {
162164
val repositoryOverview = RepositoryOverview(repositoriesManager)
163165
val historyHandler = HistoryHandler(localModelClient, repositoriesManager)
164166
val contentExplorer = ContentExplorer(localModelClient, repositoriesManager)
165-
val modelReplicationServer = ModelReplicationServer(repositoriesManager)
167+
val modelReplicationServer = ModelReplicationServer(repositoriesManager, localModelClient, inMemoryModels)
166168
val metricsHandler = MetricsHandler()
167169

168170
val configureNetty: NettyApplicationEngine.Configuration.() -> Unit = {

model-server/src/main/kotlin/org/modelix/model/server/handlers/ContentExplorer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import org.modelix.model.lazy.RepositoryId
5050
import org.modelix.model.server.templates.PageWithMenuBar
5151
import kotlin.collections.set
5252

53-
class ContentExplorer(private val client: IModelClient, private val repoManager: RepositoriesManager) {
53+
class ContentExplorer(private val client: IModelClient, private val repoManager: IRepositoriesManager) {
5454

5555
fun init(application: Application) {
5656
application.routing {

model-server/src/main/kotlin/org/modelix/model/server/handlers/HistoryHandler.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ import org.modelix.model.server.templates.PageWithMenuBar
5555
import java.time.LocalDateTime
5656
import java.time.format.DateTimeFormatter
5757

58-
class HistoryHandler(val client: IModelClient, private val repositoriesManager: RepositoriesManager) {
58+
class HistoryHandler(val client: IModelClient, private val repositoriesManager: IRepositoriesManager) {
5959

6060
fun init(application: Application) {
6161
application.routing {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.server.handlers
18+
19+
import org.modelix.model.lazy.BranchReference
20+
import org.modelix.model.lazy.CLVersion
21+
import org.modelix.model.lazy.RepositoryId
22+
23+
interface IRepositoriesManager {
24+
/**
25+
* Used to retrieve the server ID. If needed, the server ID is created and stored.
26+
*
27+
* If a server ID was not created yet, it is generated and saved in the database.
28+
* It gets stored under the current and all legacy database keys.
29+
*
30+
* If the server ID was created previously but is only stored under a legacy database key,
31+
* it also gets stored under the current and all legacy database keys.
32+
*/
33+
suspend fun maybeInitAndGetSeverId(): String
34+
fun getRepositories(): Set<RepositoryId>
35+
suspend fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true): CLVersion
36+
suspend fun removeRepository(repository: RepositoryId): Boolean
37+
38+
fun getBranches(repositoryId: RepositoryId): Set<BranchReference>
39+
40+
suspend fun removeBranches(repository: RepositoryId, branchNames: Set<String>)
41+
42+
/**
43+
* Same as [removeBranches] but blocking.
44+
* Caller is expected to execute it outside the request thread.
45+
*/
46+
fun removeBranchesBlocking(repository: RepositoryId, branchNames: Set<String>)
47+
suspend fun getVersion(branch: BranchReference): CLVersion?
48+
suspend fun getVersionHash(branch: BranchReference): String?
49+
suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String
50+
suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String
51+
52+
/**
53+
* Same as [mergeChanges] but blocking.
54+
* Caller is expected to execute it outside the request thread.
55+
*/
56+
fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String
57+
suspend fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData
58+
}
59+
60+
fun IRepositoriesManager.getBranchNames(repositoryId: RepositoryId): Set<String> {
61+
return getBranches(repositoryId).map { it.branchName }.toSet()
62+
}

model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import org.modelix.authorization.checkPermission
4444
import org.modelix.authorization.getUserName
4545
import org.modelix.authorization.requiresPermission
4646
import org.modelix.authorization.toKeycloakScope
47+
import org.modelix.model.InMemoryModels
4748
import org.modelix.model.lazy.BranchReference
4849
import org.modelix.model.lazy.RepositoryId
4950
import org.modelix.model.persistent.HashUtil
@@ -67,16 +68,22 @@ private class NotFoundException(description: String?) : RuntimeException(descrip
6768

6869
typealias CallContext = PipelineContext<Unit, ApplicationCall>
6970

70-
class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
71+
class KeyValueLikeModelServer(
72+
private val repositoriesManager: IRepositoriesManager,
73+
private val storeClient: IStoreClient,
74+
private val inMemoryModels: InMemoryModels,
75+
) {
76+
77+
constructor(repositoriesManager: RepositoriesManager) :
78+
this(repositoriesManager, repositoriesManager.client.store, InMemoryModels())
79+
7180
companion object {
7281
private val LOG = LoggerFactory.getLogger(KeyValueLikeModelServer::class.java)
7382
val HASH_PATTERN = Pattern.compile("[a-zA-Z0-9\\-_]{5}\\*[a-zA-Z0-9\\-_]{38}")
7483
const val PROTECTED_PREFIX = "$$$"
7584
val HEALTH_KEY = PROTECTED_PREFIX + "health2"
7685
}
7786

78-
val storeClient: IStoreClient get() = repositoriesManager.client.store
79-
8087
fun init(application: Application) {
8188
// Functionally, it does not matter if the server ID
8289
// is created eagerly on startup or lazily on the first request,
@@ -100,7 +107,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
100107
?.getBranchReference(System.getenv("MODELIX_SERVER_MODELQL_WARMUP_BRANCH"))
101108
if (branchRef != null) {
102109
val version = repositoriesManager.getVersion(branchRef)
103-
if (repositoriesManager.inMemoryModels.getModel(version!!.getTree()).isActive) {
110+
if (inMemoryModels.getModel(version!!.getTree()).isActive) {
104111
call.respondText(
105112
status = HttpStatusCode.ServiceUnavailable,
106113
text = "Waiting for version $version to be loaded into memory",
@@ -346,7 +353,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
346353

347354
HashUtil.checkObjectHashes(hashedObjects)
348355

349-
repositoriesManager.client.store.runTransactionSuspendable {
356+
storeClient.runTransactionSuspendable {
350357
storeClient.putAll(hashedObjects)
351358
storeClient.putAll(userDefinedEntries)
352359
for ((branch, value) in branchChanges) {

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@ import io.ktor.server.resources.get
2727
import io.ktor.server.resources.post
2828
import io.ktor.server.resources.put
2929
import io.ktor.server.response.respond
30+
import io.ktor.server.response.respondBytesWriter
3031
import io.ktor.server.response.respondText
31-
import io.ktor.server.response.respondTextWriter
3232
import io.ktor.server.routing.Route
3333
import io.ktor.server.routing.route
3434
import io.ktor.server.routing.routing
3535
import io.ktor.server.websocket.webSocket
36+
import io.ktor.util.cio.use
3637
import io.ktor.util.pipeline.PipelineContext
38+
import io.ktor.utils.io.ByteWriteChannel
39+
import io.ktor.utils.io.close
40+
import io.ktor.utils.io.writeStringUtf8
3741
import io.ktor.websocket.send
3842
import kotlinx.coroutines.Dispatchers
3943
import kotlinx.coroutines.Job
@@ -46,6 +50,7 @@ import kotlinx.serialization.encodeToString
4650
import kotlinx.serialization.json.Json
4751
import org.modelix.api.public.Paths
4852
import org.modelix.authorization.getUserName
53+
import org.modelix.model.InMemoryModels
4954
import org.modelix.model.api.ITree
5055
import org.modelix.model.api.PBranch
5156
import org.modelix.model.api.TreePointer
@@ -69,15 +74,21 @@ import org.slf4j.LoggerFactory
6974
* Implements the endpoints used by the 'model-client', but compared to KeyValueLikeModelServer also understands what
7075
* client sends. This allows more validations and more responsibilities on the server side.
7176
*/
72-
class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
73-
constructor(modelClient: LocalModelClient) : this(RepositoriesManager(modelClient))
77+
class ModelReplicationServer(
78+
private val repositoriesManager: IRepositoriesManager,
79+
private val modelClient: LocalModelClient,
80+
private val inMemoryModels: InMemoryModels,
81+
) {
82+
constructor(repositoriesManager: RepositoriesManager) :
83+
this(repositoriesManager, repositoriesManager.client, InMemoryModels())
84+
85+
constructor(modelClient: LocalModelClient) : this(RepositoriesManager(modelClient), modelClient, InMemoryModels())
7486
constructor(storeClient: IStoreClient) : this(LocalModelClient(storeClient))
7587

7688
companion object {
7789
private val LOG = LoggerFactory.getLogger(ModelReplicationServer::class.java)
7890
}
7991

80-
private val modelClient: LocalModelClient get() = repositoriesManager.client
8192
private val storeClient: IStoreClient get() = modelClient.store
8293

8394
fun init(application: Application) {
@@ -268,16 +279,16 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
268279
LOG.trace("Running query on {} @ {}", branchRef, version)
269280
val initialTree = version!!.getTree()
270281
val branch = OTBranch(
271-
PBranch(initialTree, repositoriesManager.client.idGenerator),
272-
repositoriesManager.client.idGenerator,
273-
repositoriesManager.client.storeCache,
282+
PBranch(initialTree, modelClient.idGenerator),
283+
modelClient.idGenerator,
284+
modelClient.storeCache,
274285
)
275286

276287
ModelQLServer.handleCall(call, { writeAccess ->
277288
if (writeAccess) {
278289
branch.getRootNode() to branch.getArea()
279290
} else {
280-
val model = repositoriesManager.inMemoryModels.getModel(initialTree).await()
291+
val model = inMemoryModels.getModel(initialTree).await()
281292
model.getNode(ITree.ROOT_ID) to model.getArea()
282293
}
283294
}, {
@@ -286,7 +297,7 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
286297
val (ops, newTree) = branch.getPendingChanges()
287298
if (newTree != initialTree) {
288299
val newVersion = CLVersion.createRegularVersion(
289-
id = repositoriesManager.client.idGenerator.generate(),
300+
id = modelClient.idGenerator.generate(),
290301
author = getUserName(),
291302
tree = newTree as CLTree,
292303
baseVersion = version,
@@ -299,7 +310,7 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
299310

300311
post<Paths.postRepositoryVersionHashQuery> {
301312
val versionHash = call.parameters["versionHash"]!!
302-
val version = CLVersion.loadFromHash(versionHash, repositoriesManager.client.storeCache)
313+
val version = CLVersion.loadFromHash(versionHash, modelClient.storeCache)
303314
val initialTree = version.getTree()
304315
val branch = TreePointer(initialTree)
305316
ModelQLServer.handleCall(call, branch.getRootNode(), branch.getArea())
@@ -372,21 +383,47 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
372383
respond(delta)
373384
}
374385

375-
private suspend fun ApplicationCall.respondDeltaAsObjectStream(versionHash: String, baseVersionHash: String?, plainText: Boolean) {
376-
respondTextWriter(contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE) {
377-
repositoriesManager.computeDelta(versionHash, baseVersionHash).asFlow()
378-
.flatten()
379-
.withSeparator("\n")
380-
.onEmpty { emit(versionHash) }
381-
.withIndex()
382-
.collect {
383-
if (it.index == 0) check(it.value == versionHash) { "First object should be the version" }
384-
append(it.value)
385-
}
386+
private suspend fun ApplicationCall.respondDeltaAsObjectStream(
387+
versionHash: String,
388+
baseVersionHash: String?,
389+
plainText: Boolean,
390+
) {
391+
// Call `computeDelta` before starting to respond.
392+
// It could already throw an exception, and in that case we do not want a successful response status.
393+
val objectData = repositoriesManager.computeDelta(versionHash, baseVersionHash)
394+
val contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE
395+
respondBytesWriter(contentType) {
396+
this.useClosingWithoutCause {
397+
objectData.asFlow()
398+
.flatten()
399+
.withSeparator("\n")
400+
.onEmpty { emit(versionHash) }
401+
.withIndex()
402+
.collect {
403+
if (it.index == 0) check(it.value == versionHash) { "First object should be the version" }
404+
writeStringUtf8(it.value)
405+
}
406+
}
386407
}
387408
}
388409
}
389410

411+
/**
412+
* Same as [[ByteWriteChannel.use]] but closing without a cause in case of an exception.
413+
*
414+
* Calling [[ByteWriteChannel.close]] with a cause results in not closing the connection properly.
415+
* See ModelReplicationServerTest.`server closes connection when failing to compute delta after starting to respond`
416+
* This will only be fixed in Ktor 3.
417+
* See https://youtrack.jetbrains.com/issue/KTOR-4862/Ktor-hangs-if-exception-occurs-during-write-response-body
418+
*/
419+
private inline fun ByteWriteChannel.useClosingWithoutCause(block: ByteWriteChannel.() -> Unit) {
420+
try {
421+
block()
422+
} finally {
423+
close()
424+
}
425+
}
426+
390427
private fun <T> Flow<Pair<T, T>>.flatten() = flow<T> {
391428
collect {
392429
emit(it.first)

0 commit comments

Comments
 (0)