Skip to content

Commit 8fb905e

Browse files
author
Oleksandr Dzhychko
authored
Merge pull request #451 from modelix/MODELIX-719
MODELIX-719 Performance of IModelClient2.pull
2 parents 97f7017 + b698990 commit 8fb905e

File tree

4 files changed

+187
-10
lines changed

4 files changed

+187
-10
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import kotlinx.coroutines.flow.flow
4545
import kotlinx.coroutines.launch
4646
import org.modelix.kotlin.utils.DeprecationInfo
4747
import org.modelix.model.IVersion
48+
import org.modelix.model.api.IBranch
4849
import org.modelix.model.api.IIdGenerator
4950
import org.modelix.model.api.INode
5051
import org.modelix.model.api.IdGeneratorDummy
@@ -508,8 +509,21 @@ fun VersionDelta.getAllObjects(): Map<String, String> = objectsMap + objects.ass
508509

509510
/**
510511
* Performs a write transaction on the root node of the given branch.
512+
*
513+
* [IModelClientV2.runWriteOnBranch] can be used access to the underlying branch is needed.
511514
*/
512515
suspend fun <T> IModelClientV2.runWrite(branchRef: BranchReference, body: (INode) -> T): T {
516+
return runWriteOnBranch(branchRef) {
517+
body(it.getRootNode())
518+
}
519+
}
520+
521+
/**
522+
* Performs a write transaction on the root node of the given branch.
523+
*
524+
* [IModelClientV2.runWrite] can be used if access to the underlying branch is not needed.
525+
*/
526+
suspend fun <T> IModelClientV2.runWriteOnBranch(branchRef: BranchReference, body: (IBranch) -> T): T {
513527
val client = this
514528
val baseVersion = client.pullIfExists(branchRef)
515529
?: branchRef.repositoryId.getBranchReference()
@@ -518,7 +532,7 @@ suspend fun <T> IModelClientV2.runWrite(branchRef: BranchReference, body: (INode
518532
?: client.initRepository(branchRef.repositoryId)
519533
val branch = OTBranch(TreePointer(baseVersion.getTree(), client.getIdGenerator()), client.getIdGenerator(), (client as ModelClientV2).store)
520534
val result = branch.computeWrite {
521-
body(branch.getRootNode())
535+
body(branch)
522536
}
523537
val (ops, newTree) = branch.getPendingChanges()
524538
val newVersion = CLVersion.createRegularVersion(

model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ package org.modelix.model.server.handlers
1515

1616
import kotlinx.coroutines.flow.Flow
1717
import kotlinx.coroutines.flow.asFlow
18+
import kotlinx.coroutines.flow.channelFlow
1819
import kotlinx.coroutines.flow.emptyFlow
19-
import kotlinx.coroutines.flow.flow
2020
import kotlinx.coroutines.flow.map
2121
import kotlinx.datetime.Clock
2222
import org.apache.commons.collections4.map.LRUMap
23+
import org.modelix.model.IKeyValueStore
2324
import org.modelix.model.VersionMerger
2425
import org.modelix.model.api.IBranch
2526
import org.modelix.model.api.IReadTransaction
@@ -28,8 +29,10 @@ import org.modelix.model.api.IdGeneratorDummy
2829
import org.modelix.model.api.PBranch
2930
import org.modelix.model.api.runSynchronized
3031
import org.modelix.model.lazy.BranchReference
32+
import org.modelix.model.lazy.BulkQuery
3133
import org.modelix.model.lazy.CLTree
3234
import org.modelix.model.lazy.CLVersion
35+
import org.modelix.model.lazy.IDeserializingKeyValueStore
3336
import org.modelix.model.lazy.KVEntryReference
3437
import org.modelix.model.lazy.RepositoryId
3538
import org.modelix.model.lazy.computeDelta
@@ -46,6 +49,8 @@ class RepositoriesManager(val client: LocalModelClient) {
4649
}
4750

4851
private val store: IStoreClient get() = client.store
52+
private val kvStore: IKeyValueStore get() = client.asyncStore
53+
private val objectStore: IDeserializingKeyValueStore get() = client.storeCache
4954

5055
fun generateClientId(repositoryId: RepositoryId): Long {
5156
return client.store.generateId("$KEY_PREFIX:${repositoryId.id}:clientId")
@@ -226,17 +231,22 @@ class RepositoriesManager(val client: LocalModelClient) {
226231
if (versionHash == baseVersionHash) return emptyFlow()
227232
if (baseVersionHash == null) {
228233
// no need to cache anything if there is no delta computation happening
229-
return flow {
230-
suspend fun emitObjects(entry: KVEntryReference<*>) {
231-
emit(entry.getHash() to client.get(entry.getHash())!!)
232-
for (referencedEntry in entry.getValue(client.storeCache).getReferencedEntries()) {
233-
emitObjects(referencedEntry)
234+
235+
return channelFlow {
236+
val version = CLVersion(versionHash, objectStore)
237+
// Use a bulk query to make as few request to the underlying store as possible.
238+
val bulkQuery = objectStore.newBulkQuery()
239+
fun emitObjects(entry: KVEntryReference<*>) {
240+
bulkQuery.get(entry).onSuccess {
241+
channel.trySend(entry.getHash() to it!!.serialize())
242+
for (referencedEntry in it!!.getReferencedEntries()) {
243+
emitObjects(referencedEntry)
244+
}
234245
}
235246
}
236-
237-
emit(versionHash to client.get(versionHash)!!)
238-
val version = CLVersion(versionHash, client.storeCache)
247+
channel.send(versionHash to kvStore.get(versionHash)!!)
239248
emitObjects(version.treeHash!!)
249+
(bulkQuery as? BulkQuery)?.process()
240250
}
241251
}
242252

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.server
18+
19+
import io.ktor.serialization.kotlinx.json.json
20+
import io.ktor.server.application.install
21+
import io.ktor.server.plugins.contentnegotiation.ContentNegotiation
22+
import io.ktor.server.resources.Resources
23+
import io.ktor.server.routing.IgnoreTrailingSlash
24+
import io.ktor.server.testing.ApplicationTestBuilder
25+
import io.ktor.server.testing.testApplication
26+
import io.ktor.server.websocket.WebSockets
27+
import kotlinx.coroutines.coroutineScope
28+
import org.modelix.authorization.installAuthentication
29+
import org.modelix.model.api.IChildLink
30+
import org.modelix.model.api.IConceptReference
31+
import org.modelix.model.api.INode
32+
import org.modelix.model.api.getRootNode
33+
import org.modelix.model.client2.ModelClientV2
34+
import org.modelix.model.client2.runWriteOnBranch
35+
import org.modelix.model.lazy.RepositoryId
36+
import org.modelix.model.server.handlers.KeyValueLikeModelServer
37+
import org.modelix.model.server.handlers.ModelReplicationServer
38+
import org.modelix.model.server.handlers.RepositoriesManager
39+
import org.modelix.model.server.store.InMemoryStoreClient
40+
import org.modelix.model.server.store.LocalModelClient
41+
import kotlin.random.Random
42+
import kotlin.test.Test
43+
import kotlin.test.assertTrue
44+
45+
class PullPerformanceTest {
46+
private fun runTest(block: suspend ApplicationTestBuilder.(storeClientWithStatistics: StoreClientWithStatistics) -> Unit) = testApplication {
47+
val storeClientWithStatistics = StoreClientWithStatistics(InMemoryStoreClient())
48+
val repositoriesManager = RepositoriesManager(LocalModelClient(storeClientWithStatistics))
49+
application {
50+
installAuthentication(unitTestMode = true)
51+
install(ContentNegotiation) {
52+
json()
53+
}
54+
install(WebSockets)
55+
install(Resources)
56+
install(IgnoreTrailingSlash)
57+
ModelReplicationServer(repositoriesManager).init(this)
58+
KeyValueLikeModelServer(repositoriesManager).init(this)
59+
}
60+
61+
coroutineScope {
62+
block(storeClientWithStatistics)
63+
}
64+
}
65+
66+
/**
67+
* Tests the performance of the `GET /v2/repositories/{repository}/branches/{branch}` endpoint.
68+
* Many small request to an IgniteStoreClient lead to a poor performance. This test ensure that bulk requests are
69+
* used for loading a model.
70+
*/
71+
@Test
72+
fun `bulk requests are used`() = runTest { storeClientWithStatistics ->
73+
val rand = Random(1056343)
74+
val url = "http://localhost/v2"
75+
val preparingModelClient = ModelClientV2.builder().url(url).client(client).build().also { it.init() }
76+
val repositoryId = RepositoryId("repo1")
77+
preparingModelClient.initRepository(repositoryId)
78+
preparingModelClient.runWriteOnBranch(repositoryId.getBranchReference()) { branch ->
79+
val rootNode = branch.getRootNode()
80+
repeat(10_000) {
81+
val randomNode = rootNode.getRandomNode(rand)
82+
randomNode.addNewChild(IChildLink.fromName("roleA"), -1, null as IConceptReference?)
83+
}
84+
}
85+
86+
val requestingModelClient = ModelClientV2.builder().url(url).client(client).build().also { it.init() }
87+
val totalRequestsBeforePull = storeClientWithStatistics.getTotalRequests()
88+
requestingModelClient.pull(repositoryId.getBranchReference(), null)
89+
val totalRequestsAfterPull = storeClientWithStatistics.getTotalRequests()
90+
val actualRequestCount = totalRequestsAfterPull - totalRequestsBeforePull
91+
92+
// The request count when not using a bulk query is a couple ten thousand.
93+
// Using a bulk query reduces the number of separate requests to the underling store to much fewer.
94+
assertTrue(actualRequestCount < 20, "Too many request: $actualRequestCount")
95+
}
96+
}
97+
98+
private fun INode.getRandomNode(rand: Random): INode {
99+
val node = this
100+
val children = node.allChildren.toList()
101+
val index = rand.nextInt(children.size + 1)
102+
return if (index < children.size) {
103+
children.get(index).getRandomNode(rand)
104+
} else {
105+
node
106+
}
107+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.server
18+
19+
import org.modelix.model.server.store.IStoreClient
20+
import java.util.concurrent.atomic.AtomicLong
21+
22+
class StoreClientWithStatistics(val store: IStoreClient) : IStoreClient by store {
23+
private val totalRequests = AtomicLong()
24+
25+
fun getTotalRequests() = totalRequests.get()
26+
27+
override fun get(key: String): String? {
28+
totalRequests.incrementAndGet()
29+
return store.get(key)
30+
}
31+
32+
override fun getAll(keys: List<String>): List<String?> {
33+
totalRequests.incrementAndGet()
34+
return store.getAll(keys)
35+
}
36+
37+
override fun getAll(keys: Set<String>): Map<String, String?> {
38+
totalRequests.incrementAndGet()
39+
return store.getAll(keys)
40+
}
41+
42+
override fun getAll(): Map<String, String?> {
43+
totalRequests.incrementAndGet()
44+
return store.getAll()
45+
}
46+
}

0 commit comments

Comments
 (0)