Skip to content

Commit eb38620

Browse files
author
Oleksandr Dzhychko
committed
fix(model-server): use non-blocking network calls to receive request data
Fixes: MODELIX-998
1 parent 706f620 commit eb38620

File tree

1 file changed

+45
-29
lines changed

1 file changed

+45
-29
lines changed

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import io.ktor.server.application.ApplicationCall
2121
import io.ktor.server.application.call
2222
import io.ktor.server.request.acceptItems
2323
import io.ktor.server.request.receive
24-
import io.ktor.server.request.receiveStream
24+
import io.ktor.server.request.receiveChannel
2525
import io.ktor.server.response.respond
2626
import io.ktor.server.response.respondBytesWriter
2727
import io.ktor.server.response.respondText
@@ -31,6 +31,7 @@ import io.ktor.util.cio.use
3131
import io.ktor.util.pipeline.PipelineContext
3232
import io.ktor.utils.io.ByteWriteChannel
3333
import io.ktor.utils.io.close
34+
import io.ktor.utils.io.readUTF8Line
3435
import io.ktor.utils.io.writeStringUtf8
3536
import kotlinx.coroutines.Dispatchers
3637
import kotlinx.coroutines.flow.Flow
@@ -239,17 +240,29 @@ class ModelReplicationServer(
239240

240241
override suspend fun PipelineContext<Unit, ApplicationCall>.postRepositoryObjectsGetAll(repository: String) {
241242
checkPermission(ModelServerPermissionSchema.repository(repository).objects.read)
242-
runWithRepository(repository) {
243-
val keys = call.receiveStream().bufferedReader().use { reader ->
244-
reader.lineSequence().toHashSet()
245-
}
246-
val objects = withContext(Dispatchers.IO) { modelClient.store.getAll(keys) }.checkValuesNotNull {
247-
"Object not found: $it"
243+
val channel = call.receiveChannel()
244+
val keys = hashSetOf<String>()
245+
while (true) {
246+
val line = channel.readUTF8Line() ?: break
247+
keys.add(line)
248+
}
249+
250+
val objects = runWithRepository(repository) {
251+
withContext(Dispatchers.IO) {
252+
modelClient.store.getAll(keys)
248253
}
249-
call.respondTextWriter(contentType = ImmutableObjectsStream.CONTENT_TYPE) {
250-
ImmutableObjectsStream.encode(this, objects)
254+
}
255+
256+
for (entry in objects) {
257+
if (entry.value == null) {
258+
throw IllegalStateException("Object not found: ${entry.value}")
251259
}
252260
}
261+
@Suppress("UNCHECKED_CAST")
262+
objects as Map<String, String>
263+
call.respondTextWriter(contentType = ImmutableObjectsStream.CONTENT_TYPE) {
264+
ImmutableObjectsStream.encode(this, objects)
265+
}
253266
}
254267

255268
override suspend fun PipelineContext<Unit, ApplicationCall>.pollRepositoryBranchHash(
@@ -336,30 +349,33 @@ class ModelReplicationServer(
336349

337350
override suspend fun PipelineContext<Unit, ApplicationCall>.putRepositoryObjects(repository: String) {
338351
checkPermission(ModelServerPermissionSchema.repository(parameter("repository")).objects.add)
339-
runWithRepository(repository) {
340-
val writtenEntries = withContext(Dispatchers.IO) {
341-
val entries = call.receiveStream().bufferedReader().use { reader ->
342-
reader.lineSequence().windowed(2, 2).map {
343-
val key = it[0]
344-
val value = it[1]
345-
346-
require(HashUtil.isSha256(key)) {
347-
"This API cannot be used to store other entries than serialized objects." +
348-
" The key is expected to be a SHA256 hash over the value: $key -> $value"
349-
}
350-
val expectedKey = HashUtil.sha256(value)
351-
require(expectedKey == key) { "Hash mismatch. Expected $expectedKey, but $key was provided. Value: $value" }
352-
353-
key to value
354-
}.toMap()
355-
}
356352

357-
storeClient.putAll(entries, true)
353+
val channel = call.receiveChannel()
354+
// Hash map can be used. Server does not expect entries in any order.
355+
val entries = hashMapOf<String, String>()
356+
357+
while (true) {
358+
val key = channel.readUTF8Line() ?: break
359+
val value = channel.readUTF8Line()!!
358360

359-
entries.size
361+
if (!HashUtil.isSha256(key)) {
362+
throw IllegalStateException(
363+
"This API cannot be used to store other entries than serialized objects." +
364+
" The key is expected to be a SHA256 hash over the value: $key -> $value",
365+
)
360366
}
361-
call.respondText("$writtenEntries objects received")
367+
368+
val expectedKey = HashUtil.sha256(value)
369+
if (expectedKey != key) {
370+
throw IllegalStateException(
371+
"Hash mismatch. Expected $expectedKey, but $key was provided. Value: $value",
372+
)
373+
}
374+
entries[key] = value
362375
}
376+
377+
runWithRepository(repository) { withContext(Dispatchers.IO) { storeClient.putAll(entries, true) } }
378+
call.respondText("${entries.size} objects received")
363379
}
364380

365381
@Deprecated("deprecated flag is set in the OpenAPI specification")

0 commit comments

Comments
 (0)