Skip to content

Commit a0c11c2

Browse files
slissonmhuster23
authored andcommitted
perf(model-client): send large changes as multiple HTTP requests
1 parent 078cf6b commit a0c11c2

File tree

2 files changed

+55
-3
lines changed

2 files changed

+55
-3
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
2323
import io.ktor.client.plugins.expectSuccess
2424
import io.ktor.client.request.get
2525
import io.ktor.client.request.post
26+
import io.ktor.client.request.put
2627
import io.ktor.client.request.setBody
2728
import io.ktor.client.statement.bodyAsText
2829
import io.ktor.http.ContentType
@@ -178,20 +179,40 @@ class ModelClientV2(
178179
require(baseVersion is CLVersion?)
179180
version.write()
180181
val objects = version.computeDelta(baseVersion)
182+
HashUtil.checkObjectHashes(objects)
183+
val delta = if (objects.size > 1000) {
184+
// large HTTP requests and large Json objects don't scale well
185+
uploadObjects(branch.repositoryId, objects.asSequence().map { it.key to it.value })
186+
VersionDelta(version.getContentHash(), null)
187+
} else {
188+
VersionDelta(version.getContentHash(), null, objectsMap = objects)
189+
}
181190
val response = httpClient.post {
182191
url {
183192
takeFrom(baseUrl)
184193
appendPathSegmentsEncodingSlash("repositories", branch.repositoryId.id, "branches", branch.branchName)
185194
}
186195
contentType(ContentType.Application.Json)
187-
val body = VersionDelta(version.getContentHash(), null, objectsMap = objects)
188-
body.checkObjectHashes()
189-
setBody(body)
196+
setBody(delta)
190197
}
191198
val mergedVersionDelta = response.body<VersionDelta>()
192199
return createVersion(version, mergedVersionDelta)
193200
}
194201

202+
private suspend fun uploadObjects(repository: RepositoryId, objects: Sequence<Pair<String, String>>) {
203+
LOG.debug { "${clientId.toString(16)}.pushObjects($repository)" }
204+
objects.chunked(100_000).forEach { chunk ->
205+
httpClient.put {
206+
url {
207+
takeFrom(baseUrl)
208+
appendPathSegmentsEncodingSlash("repositories", repository.id, "objects")
209+
}
210+
contentType(ContentType.Text.Plain)
211+
setBody(chunk.flatMap { it.toList() }.joinToString("\n"))
212+
}
213+
}
214+
}
215+
195216
override suspend fun pull(branch: BranchReference, lastKnownVersion: IVersion?): IVersion {
196217
require(lastKnownVersion is CLVersion?)
197218
val response = httpClient.get {

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,21 @@ import io.ktor.server.application.ApplicationCall
2020
import io.ktor.server.application.call
2121
import io.ktor.server.plugins.origin
2222
import io.ktor.server.request.receive
23+
import io.ktor.server.request.receiveStream
2324
import io.ktor.server.response.respond
2425
import io.ktor.server.response.respondText
2526
import io.ktor.server.routing.Route
2627
import io.ktor.server.routing.get
2728
import io.ktor.server.routing.post
29+
import io.ktor.server.routing.put
2830
import io.ktor.server.routing.route
2931
import io.ktor.server.routing.routing
3032
import io.ktor.server.websocket.webSocket
3133
import io.ktor.util.pipeline.PipelineContext
3234
import io.ktor.websocket.send
35+
import kotlinx.coroutines.Dispatchers
3336
import kotlinx.coroutines.Job
37+
import kotlinx.coroutines.withContext
3438
import kotlinx.serialization.encodeToString
3539
import kotlinx.serialization.json.Json
3640
import org.modelix.authorization.getUserName
@@ -44,6 +48,7 @@ import org.modelix.model.lazy.CLTree
4448
import org.modelix.model.lazy.CLVersion
4549
import org.modelix.model.lazy.RepositoryId
4650
import org.modelix.model.operations.OTBranch
51+
import org.modelix.model.persistent.HashUtil
4752
import org.modelix.model.server.api.v2.VersionDelta
4853
import org.modelix.model.server.store.IStoreClient
4954
import org.modelix.model.server.store.LocalModelClient
@@ -211,6 +216,32 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
211216
}
212217
}
213218
}
219+
route("objects") {
220+
put {
221+
var writtenEntries = 0
222+
withContext(Dispatchers.IO) {
223+
var isKey = true
224+
var key = ""
225+
call.receiveStream().bufferedReader().lineSequence().forEach { line ->
226+
if (isKey) {
227+
key = line
228+
} else {
229+
val value = line
230+
require(HashUtil.isSha256(key)) {
231+
"This API cannot be used to store other entries than serialized objects." +
232+
" The key is expected to be a SHA256 hash over the value: $key -> $value"
233+
}
234+
val expectedKey = HashUtil.sha256(value)
235+
require(expectedKey == key) { "Hash mismatch. Expected $expectedKey, but $key was provided. Value: $value" }
236+
storeClient.put(key, value, true)
237+
writtenEntries++
238+
}
239+
isKey = !isKey
240+
}
241+
}
242+
call.respondText("$writtenEntries objects received")
243+
}
244+
}
214245
}
215246
}
216247
route("versions") {

0 commit comments

Comments
 (0)