Skip to content

Commit 8f02751

Browse files
author
Oleksandr Dzhychko
committed
chore(model-server): use non-blocking method to send version delta
Use `respondBytesWriter` because it has a non-blocking `writeStringUtf8` method. `respondTextWriter` has only blocking methods to write data. Using a `java.io.Writer` is just an indirection without purpose. Using `respondBytesWriter` needed to take a bug with Ktor in consideration. For the expected behavior, a test was added.
1 parent 472bfa7 commit 8f02751

File tree

4 files changed

+151
-15
lines changed

4 files changed

+151
-15
lines changed

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@ import io.ktor.server.resources.get
2727
import io.ktor.server.resources.post
2828
import io.ktor.server.resources.put
2929
import io.ktor.server.response.respond
30+
import io.ktor.server.response.respondBytesWriter
3031
import io.ktor.server.response.respondText
31-
import io.ktor.server.response.respondTextWriter
3232
import io.ktor.server.routing.Route
3333
import io.ktor.server.routing.route
3434
import io.ktor.server.routing.routing
3535
import io.ktor.server.websocket.webSocket
36+
import io.ktor.util.cio.use
3637
import io.ktor.util.pipeline.PipelineContext
38+
import io.ktor.utils.io.ByteWriteChannel
39+
import io.ktor.utils.io.close
40+
import io.ktor.utils.io.writeStringUtf8
3741
import io.ktor.websocket.send
3842
import kotlinx.coroutines.Dispatchers
3943
import kotlinx.coroutines.Job
@@ -84,6 +88,7 @@ class ModelReplicationServer(
8488
companion object {
8589
private val LOG = LoggerFactory.getLogger(ModelReplicationServer::class.java)
8690
}
91+
8792
private val storeClient: IStoreClient get() = modelClient.store
8893

8994
fun init(application: Application) {
@@ -378,24 +383,47 @@ class ModelReplicationServer(
378383
respond(delta)
379384
}
380385

381-
private suspend fun ApplicationCall.respondDeltaAsObjectStream(versionHash: String, baseVersionHash: String?, plainText: Boolean) {
386+
private suspend fun ApplicationCall.respondDeltaAsObjectStream(
387+
versionHash: String,
388+
baseVersionHash: String?,
389+
plainText: Boolean,
390+
) {
382391
// Call `computeDelta` before starting to respond.
383392
// It could already throw an exception, and in that case we do not want a successful response status.
384393
val objectData = repositoriesManager.computeDelta(versionHash, baseVersionHash)
385-
respondTextWriter(contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE) {
386-
objectData.asFlow()
387-
.flatten()
388-
.withSeparator("\n")
389-
.onEmpty { emit(versionHash) }
390-
.withIndex()
391-
.collect {
392-
if (it.index == 0) check(it.value == versionHash) { "First object should be the version" }
393-
append(it.value)
394-
}
394+
val contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE
395+
respondBytesWriter(contentType) {
396+
this.useClosingWithoutCause {
397+
objectData.asFlow()
398+
.flatten()
399+
.withSeparator("\n")
400+
.onEmpty { emit(versionHash) }
401+
.withIndex()
402+
.collect {
403+
if (it.index == 0) check(it.value == versionHash) { "First object should be the version" }
404+
writeStringUtf8(it.value)
405+
}
406+
}
395407
}
396408
}
397409
}
398410

411+
/**
412+
* Same as [[ByteWriteChannel.use]] but closing without a cause in case of an exception.
413+
*
414+
* Calling [[ByteWriteChannel.close]] with a cause results in not closing the connection properly.
415+
* See ModelReplicationServerTest.`server closes connection when failing to compute delta after starting to respond`
416+
* This will only be fixed in Ktor 3.
417+
* See https://youtrack.jetbrains.com/issue/KTOR-4862/Ktor-hangs-if-exception-occurs-during-write-response-body
418+
*/
419+
private inline fun ByteWriteChannel.useClosingWithoutCause(block: ByteWriteChannel.() -> Unit) {
420+
try {
421+
block()
422+
} finally {
423+
close()
424+
}
425+
}
426+
399427
private fun <T> Flow<Pair<T, T>>.flatten() = flow<T> {
400428
collect {
401429
emit(it.first)

model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory
5050
import java.lang.ref.SoftReference
5151
import java.util.UUID
5252

53+
// The methods in this class are almost cohesive, so the number of functions is fine.
54+
@Suppress("complexity.TooManyFunctions")
5355
class RepositoriesManager(val client: LocalModelClient) : IRepositoriesManager {
5456

5557
init {

model-server/src/test/kotlin/org/modelix/model/server/ModelServerTestUtil.kt

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ package org.modelix.model.server
1919
import io.ktor.serialization.kotlinx.json.json
2020
import io.ktor.server.application.Application
2121
import io.ktor.server.application.install
22+
import io.ktor.server.netty.Netty
23+
import io.ktor.server.netty.NettyApplicationEngine
2224
import io.ktor.server.plugins.contentnegotiation.ContentNegotiation
2325
import io.ktor.server.resources.Resources
2426
import io.ktor.server.routing.IgnoreTrailingSlash
2527
import io.ktor.server.testing.ApplicationTestBuilder
2628
import io.ktor.server.websocket.WebSockets
29+
import kotlinx.coroutines.runBlocking
30+
import org.modelix.authorization.installAuthentication
2731
import org.modelix.model.client2.ModelClientV2
2832

2933
suspend fun ApplicationTestBuilder.createModelClient(): ModelClientV2 {
@@ -37,3 +41,33 @@ fun Application.installDefaultServerPlugins() {
3741
install(Resources)
3842
install(IgnoreTrailingSlash)
3943
}
44+
45+
/**
46+
* Allow running a model server for test with Netty.
47+
*
48+
* This allows testing properties that would not be testable with [io.ktor.server.testing.testApplication]
49+
* because the requests run on proper request threads.
50+
*
51+
* Examples for such properties are
52+
* (1) errors while writing responses and
53+
* (2) effects of blocking code on request threads.
54+
*/
55+
fun runWithNettyServer(
56+
setupBlock: (application: Application) -> Unit,
57+
testBlock: suspend (server: NettyApplicationEngine) -> Unit,
58+
) {
59+
val nettyServer: NettyApplicationEngine = io.ktor.server.engine.embeddedServer(Netty, port = 0) {
60+
installAuthentication(unitTestMode = true)
61+
installDefaultServerPlugins()
62+
setupBlock(this)
63+
}
64+
65+
try {
66+
nettyServer.start(wait = false)
67+
runBlocking {
68+
testBlock(nettyServer)
69+
}
70+
} finally {
71+
nettyServer.stop()
72+
}
73+
}

model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,25 @@
1515

1616
package org.modelix.model.server.handlers
1717

18+
import io.ktor.client.HttpClient
19+
import io.ktor.client.engine.cio.CIO
20+
import io.ktor.client.plugins.HttpTimeout
21+
import io.ktor.client.plugins.defaultRequest
1822
import io.ktor.client.request.get
1923
import io.ktor.http.HttpStatusCode
2024
import io.ktor.http.appendPathSegments
2125
import io.ktor.http.takeFrom
26+
import io.ktor.server.application.Application
27+
import io.ktor.server.netty.NettyApplicationEngine
2228
import io.ktor.server.testing.ApplicationTestBuilder
2329
import io.ktor.server.testing.testApplication
2430
import kotlinx.coroutines.CoroutineScope
2531
import kotlinx.coroutines.coroutineScope
32+
import kotlinx.coroutines.flow.channelFlow
33+
import kotlinx.coroutines.flow.emitAll
34+
import kotlinx.coroutines.flow.flow
35+
import kotlinx.coroutines.test.runTest
36+
import kotlinx.coroutines.withTimeout
2637
import org.modelix.authorization.installAuthentication
2738
import org.modelix.model.InMemoryModels
2839
import org.modelix.model.api.IConceptReference
@@ -32,11 +43,13 @@ import org.modelix.model.client2.runWrite
3243
import org.modelix.model.client2.useVersionStreamFormat
3344
import org.modelix.model.lazy.RepositoryId
3445
import org.modelix.model.server.installDefaultServerPlugins
46+
import org.modelix.model.server.runWithNettyServer
3547
import org.modelix.model.server.store.InMemoryStoreClient
3648
import org.modelix.model.server.store.LocalModelClient
3749
import kotlin.test.Test
3850
import kotlin.test.assertEquals
3951
import kotlin.test.fail
52+
import kotlin.time.Duration.Companion.seconds
4053

4154
class ModelReplicationServerTest {
4255

@@ -47,7 +60,7 @@ class ModelReplicationServerTest {
4760
return ModelReplicationServer(repositoriesManager, modelClient, InMemoryModels())
4861
}
4962

50-
private fun runTest(
63+
private fun runWithTestModelServer(
5164
modelReplicationServer: ModelReplicationServer = getDefaultModelReplicationServer(),
5265
block: suspend ApplicationTestBuilder.(scope: CoroutineScope) -> Unit,
5366
) = testApplication {
@@ -63,7 +76,7 @@ class ModelReplicationServerTest {
6376
}
6477

6578
@Test
66-
fun `pulling delta does not return objects twice`() = runTest {
79+
fun `pulling delta does not return objects twice`() = runWithTestModelServer {
6780
// Arrange
6881
val url = "http://localhost/v2"
6982
val modelClient = ModelClientV2.builder().url(url).client(client).build().also { it.init() }
@@ -117,7 +130,7 @@ class ModelReplicationServerTest {
117130
val repositoryId = RepositoryId("repo1")
118131
val branchRef = repositoryId.getBranchReference()
119132

120-
runTest(modelReplicationServer) {
133+
runWithTestModelServer(modelReplicationServer) {
121134
repositoriesManager.createRepository(repositoryId, null)
122135

123136
// Act
@@ -133,4 +146,63 @@ class ModelReplicationServerTest {
133146
assertEquals(HttpStatusCode.InternalServerError, response.status)
134147
}
135148
}
149+
150+
@Test
151+
fun `server closes connection when failing to compute delta after starting to respond`() = runTest {
152+
// Arrange
153+
val repositoryId = RepositoryId("repo1")
154+
val branchRef = repositoryId.getBranchReference()
155+
val storeClient = InMemoryStoreClient()
156+
val modelClient = LocalModelClient(storeClient)
157+
val repositoriesManager = RepositoriesManager(modelClient)
158+
val faultyRepositoriesManager = object :
159+
IRepositoriesManager by repositoriesManager {
160+
override suspend fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData {
161+
val originalFlow = repositoriesManager.computeDelta(versionHash, baseVersionHash).asFlow()
162+
val brokenFlow = channelFlow<Pair<String, String>> {
163+
error("Unexpected error.")
164+
}
165+
return ObjectDataFlow(
166+
flow {
167+
emitAll(originalFlow)
168+
emitAll(brokenFlow)
169+
},
170+
)
171+
}
172+
}
173+
repositoriesManager.createRepository(repositoryId, null)
174+
175+
suspend fun createClient(server: NettyApplicationEngine): HttpClient {
176+
val port = server.resolvedConnectors().first().port
177+
return HttpClient(CIO) {
178+
defaultRequest {
179+
url("http://localhost:$port")
180+
}
181+
install(HttpTimeout) {
182+
requestTimeoutMillis = 5_000
183+
}
184+
}
185+
}
186+
187+
val modelReplicationServer = ModelReplicationServer(faultyRepositoriesManager, modelClient, InMemoryModels())
188+
val setupBlock = { application: Application -> modelReplicationServer.init(application) }
189+
val testBlock: suspend (server: NettyApplicationEngine) -> Unit = { server ->
190+
withTimeout(10.seconds) {
191+
val client = createClient(server)
192+
// Act
193+
val response = client.get {
194+
url {
195+
takeFrom(url)
196+
appendPathSegments("v2", "repositories", repositoryId.id, "branches", branchRef.branchName)
197+
}
198+
useVersionStreamFormat()
199+
}
200+
201+
// Assert
202+
// The response should be delivered even if an exception is thrown inside the flow.
203+
assertEquals(HttpStatusCode.OK, response.status)
204+
}
205+
}
206+
runWithNettyServer(setupBlock, testBlock)
207+
}
136208
}

0 commit comments

Comments
 (0)