@@ -21,7 +21,7 @@ import io.ktor.server.application.ApplicationCall
21
21
import io.ktor.server.application.call
22
22
import io.ktor.server.request.acceptItems
23
23
import io.ktor.server.request.receive
24
- import io.ktor.server.request.receiveStream
24
+ import io.ktor.server.request.receiveChannel
25
25
import io.ktor.server.response.respond
26
26
import io.ktor.server.response.respondBytesWriter
27
27
import io.ktor.server.response.respondText
@@ -31,6 +31,7 @@ import io.ktor.util.cio.use
31
31
import io.ktor.util.pipeline.PipelineContext
32
32
import io.ktor.utils.io.ByteWriteChannel
33
33
import io.ktor.utils.io.close
34
+ import io.ktor.utils.io.readUTF8Line
34
35
import io.ktor.utils.io.writeStringUtf8
35
36
import kotlinx.coroutines.Dispatchers
36
37
import kotlinx.coroutines.flow.Flow
@@ -153,7 +154,7 @@ class ModelReplicationServer(
153
154
val repositoryId = try {
154
155
RepositoryId (repository)
155
156
} catch (e: IllegalArgumentException ) {
156
- throw BadRequestException ( " Invalid repository name " , " invalid-request " , cause = e)
157
+ throw InvalidRepositoryIdException ( repository, e)
157
158
}
158
159
159
160
checkPermission(ModelServerPermissionSchema .repository(repositoryId).branch(branch).delete)
@@ -239,17 +240,27 @@ class ModelReplicationServer(
239
240
240
241
override suspend fun PipelineContext <Unit , ApplicationCall >.postRepositoryObjectsGetAll (repository : String ) {
241
242
checkPermission(ModelServerPermissionSchema .repository(repository).objects.read)
242
- runWithRepository(repository) {
243
- val keys = call.receiveStream().bufferedReader().use { reader ->
244
- reader.lineSequence().toHashSet()
245
- }
246
- val objects = withContext(Dispatchers .IO ) { modelClient.store.getAll(keys) }.checkValuesNotNull {
247
- " Object not found: $it "
248
- }
249
- call.respondTextWriter(contentType = ImmutableObjectsStream .CONTENT_TYPE ) {
250
- ImmutableObjectsStream .encode(this , objects)
243
+ val channel = call.receiveChannel()
244
+ val keys = hashSetOf<String >()
245
+ while (true ) {
246
+ val line = channel.readUTF8Line() ? : break
247
+ keys.add(line)
248
+ }
249
+
250
+ val objects = runWithRepository(repository) {
251
+ withContext(Dispatchers .IO ) {
252
+ modelClient.store.getAll(keys)
251
253
}
252
254
}
255
+
256
+ for (entry in objects) {
257
+ if (entry.value == null ) { throw ObjectValueNotFoundException (entry.key) }
258
+ }
259
+ @Suppress(" UNCHECKED_CAST" )
260
+ objects as Map <String , String >
261
+ call.respondTextWriter(contentType = ImmutableObjectsStream .CONTENT_TYPE ) {
262
+ ImmutableObjectsStream .encode(this , objects)
263
+ }
253
264
}
254
265
255
266
override suspend fun PipelineContext <Unit , ApplicationCall >.pollRepositoryBranchHash (
@@ -336,30 +347,27 @@ class ModelReplicationServer(
336
347
337
348
override suspend fun PipelineContext <Unit , ApplicationCall >.putRepositoryObjects (repository : String ) {
338
349
checkPermission(ModelServerPermissionSchema .repository(parameter(" repository" )).objects.add)
339
- runWithRepository(repository) {
340
- val writtenEntries = withContext(Dispatchers .IO ) {
341
- val entries = call.receiveStream().bufferedReader().use { reader ->
342
- reader.lineSequence().windowed(2 , 2 ).map {
343
- val key = it[0 ]
344
- val value = it[1 ]
345
-
346
- require(HashUtil .isSha256(key)) {
347
- " This API cannot be used to store other entries than serialized objects." +
348
- " The key is expected to be a SHA256 hash over the value: $key -> $value "
349
- }
350
- val expectedKey = HashUtil .sha256(value)
351
- require(expectedKey == key) { " Hash mismatch. Expected $expectedKey , but $key was provided. Value: $value " }
352
-
353
- key to value
354
- }.toMap()
355
- }
356
350
357
- storeClient.putAll(entries, true )
351
+ val channel = call.receiveChannel()
352
+ // Hash map can be used. Server does not expect entries in any order.
353
+ val entries = hashMapOf<String , String >()
354
+
355
+ while (true ) {
356
+ val key = channel.readUTF8Line() ? : break
358
357
359
- entries.size
358
+ if (! HashUtil .isSha256(key)) {
359
+ throw InvalidObjectKeyException (key)
360
360
}
361
- call.respondText(" $writtenEntries objects received" )
361
+
362
+ val value = channel.readUTF8Line() ? : throw ObjectKeyWithoutObjectValueException (key)
363
+
364
+ val expectedKey = HashUtil .sha256(value)
365
+ if (expectedKey != key) { throw MismatchingObjectKeyAndValueException (key, expectedKey, value) }
366
+ entries[key] = value
362
367
}
368
+
369
+ runWithRepository(repository) { withContext(Dispatchers .IO ) { storeClient.putAll(entries, true ) } }
370
+ call.respondText(" ${entries.size} objects received" )
363
371
}
364
372
365
373
@Deprecated(" deprecated flag is set in the OpenAPI specification" )
0 commit comments