Skip to content

Commit 62112cf

Browse files
Sascha Lissonslisson
authored andcommitted
feat(model-client): lazy loading support for IModelClientV2
1 parent c0ef585 commit 62112cf

File tree

9 files changed

+254
-4
lines changed

9 files changed

+254
-4
lines changed

model-api/src/commonMain/kotlin/org/modelix/model/api/INode.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,10 @@ interface IReplaceableNode : INode {
314314
@Deprecated("Use .key(INode), .key(IBranch), .key(ITransaction) or .key(ITree)")
315315
fun IRole.key(): String = RoleAccessContext.getKey(this)
316316
fun IRole.key(node: INode): String = if (node.usesRoleIds()) getUID() else getSimpleName()
317+
fun IChildLink.key(node: INode): String? = when (this) {
318+
is NullChildLink -> null
319+
else -> (this as IRole).key(node)
320+
}
317321
fun INode.usesRoleIds(): Boolean = if (this is INodeEx) this.usesRoleIds() else false
318322
fun INode.getChildren(link: IChildLink): Iterable<INode> = if (this is INodeEx) getChildren(link) else getChildren(link.key(this))
319323
fun INode.moveChild(role: IChildLink, index: Int, child: INode): Unit = if (this is INodeEx) moveChild(role, index, child) else moveChild(role.key(this), index, child)
@@ -433,3 +437,5 @@ fun INode.getContainmentLink() = if (this is INodeEx) {
433437
fun INode.getRoot(): INode = parent?.getRoot() ?: this
434438
fun INode.isInstanceOf(superConcept: IConcept?): Boolean = concept.let { it != null && it.isSubConceptOf(superConcept) }
435439
fun INode.isInstanceOfSafe(superConcept: IConcept): Boolean = tryGetConcept()?.isSubConceptOf(superConcept) ?: false
440+
441+
fun INode.addNewChild(role: IChildLink, index: Int) = addNewChild(role, index, null as IConceptReference?)

model-client/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ val ktorVersion: String by rootProject
2121
val kotlinxSerializationVersion: String by rootProject
2222

2323
kotlin {
24+
jvmToolchain(11)
2425
jvm()
2526
js(IR) {
2627
browser {

model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,8 @@ interface IModelClientV2 {
8686
suspend fun <R> query(branch: BranchReference, body: (IMonoStep<INode>) -> IMonoStep<R>): R
8787

8888
suspend fun <R> query(repositoryId: RepositoryId, versionHash: String, body: (IMonoStep<INode>) -> IMonoStep<R>): R
89+
90+
suspend fun getObjects(repository: RepositoryId, keys: Sequence<String>): Map<String, String>
91+
92+
suspend fun pushObjects(repository: RepositoryId, objects: Sequence<Pair<String, String>>)
8993
}

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,29 @@ class ModelClientV2(
265265
}
266266
}
267267

268+
override suspend fun getObjects(repository: RepositoryId, keys: Sequence<String>): Map<String, String> {
269+
val response = httpClient.post {
270+
url {
271+
takeFrom(baseUrl)
272+
appendPathSegments("repositories", repository.id, "objects", "getAll")
273+
}
274+
setBody(keys.joinToString("\n"))
275+
}
276+
277+
val content = response.bodyAsChannel()
278+
val objects = HashMap<String, String>()
279+
while (true) {
280+
val key = checkNotNull(content.readUTF8Line()) { "Empty line expected at the end of the stream" }
281+
if (key == "") {
282+
check(content.readUTF8Line() == null) { "Empty line is only allowed at the end of the stream" }
283+
break
284+
}
285+
val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" }
286+
objects[key] = value
287+
}
288+
return objects
289+
}
290+
268291
override suspend fun push(branch: BranchReference, version: IVersion, baseVersion: IVersion?): IVersion {
269292
LOG.debug { "${clientId.toString(16)}.push($branch, $version, $baseVersion)" }
270293
require(version is CLVersion)
@@ -274,7 +297,7 @@ class ModelClientV2(
274297
HashUtil.checkObjectHashes(objects)
275298
val delta = if (objects.size > 1000) {
276299
// large HTTP requests and large Json objects don't scale well
277-
uploadObjects(branch.repositoryId, objects.asSequence().map { it.key to it.value })
300+
pushObjects(branch.repositoryId, objects.asSequence().map { it.key to it.value })
278301
VersionDelta(version.getContentHash(), null)
279302
} else {
280303
VersionDelta(version.getContentHash(), null, objectsMap = objects)
@@ -292,7 +315,7 @@ class ModelClientV2(
292315
}
293316
}
294317

295-
private suspend fun uploadObjects(repository: RepositoryId, objects: Sequence<Pair<String, String>>) {
318+
override suspend fun pushObjects(repository: RepositoryId, objects: Sequence<Pair<String, String>>) {
296319
LOG.debug { "${clientId.toString(16)}.pushObjects($repository)" }
297320
objects.chunked(100_000).forEach { unsortedChunk ->
298321
// Entries are sorted to avoid deadlocks on the server side between transactions.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.client2
18+
19+
import kotlinx.coroutines.runBlocking
20+
import org.modelix.model.IKeyListener
21+
import org.modelix.model.IKeyValueStore
22+
import org.modelix.model.IVersion
23+
import org.modelix.model.lazy.BranchReference
24+
import org.modelix.model.lazy.CLVersion
25+
import org.modelix.model.lazy.ObjectStoreCache
26+
import org.modelix.model.lazy.RepositoryId
27+
28+
fun IModelClientV2.lazyLoadVersion(repositoryId: RepositoryId, versionHash: String, cacheSize: Int = 100_000): IVersion {
29+
val store = ObjectStoreCache(ModelClientAsStore(this, repositoryId), cacheSize)
30+
return CLVersion.loadFromHash(versionHash, store)
31+
}
32+
33+
suspend fun IModelClientV2.lazyLoadVersion(branchRef: BranchReference, cacheSize: Int = 100_000): IVersion {
34+
return lazyLoadVersion(branchRef.repositoryId, pullHash(branchRef), cacheSize)
35+
}
36+
37+
class ModelClientAsStore(val client: IModelClientV2, val repositoryId: RepositoryId) : IKeyValueStore {
38+
override fun get(key: String): String? {
39+
return getAll(listOf(key))[key]
40+
}
41+
42+
override fun put(key: String, value: String?) {
43+
TODO("Not yet implemented")
44+
}
45+
46+
override fun getAll(keys: Iterable<String>): Map<String, String?> {
47+
return runBlocking {
48+
client.getObjects(repositoryId, keys.asSequence())
49+
}
50+
}
51+
52+
override fun putAll(entries: Map<String, String?>) {
53+
TODO("Not yet implemented")
54+
}
55+
56+
override fun prefetch(key: String) {
57+
TODO("Not yet implemented")
58+
}
59+
60+
override fun listen(key: String, listener: IKeyListener) {
61+
TODO("Not yet implemented")
62+
}
63+
64+
override fun removeListener(key: String, listener: IKeyListener) {
65+
TODO("Not yet implemented")
66+
}
67+
68+
override fun getPendingSize(): Int {
69+
TODO("Not yet implemented")
70+
}
71+
}

model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/ObjectStoreCache.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ package org.modelix.model.lazy
1717

1818
import org.modelix.model.IKeyValueStore
1919
import org.modelix.model.createLRUMap
20+
import kotlin.jvm.JvmOverloads
2021

21-
class ObjectStoreCache(override val keyValueStore: IKeyValueStore) : IDeserializingKeyValueStore {
22-
private val cache: MutableMap<String?, Any> = createLRUMap(100000)
22+
class ObjectStoreCache @JvmOverloads constructor(override val keyValueStore: IKeyValueStore, cacheSize: Int = 100_000) : IDeserializingKeyValueStore {
23+
private val cache: MutableMap<String?, Any> = createLRUMap(cacheSize)
2324

2425
override fun <T> getAll(hashes_: Iterable<String>, deserializer: (String, String) -> T): Iterable<T> {
2526
val hashes = hashes_.toList()

model-server-openapi/specifications/model-server-v2.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,18 @@ paths:
7575
$ref: '#/components/responses/200json'
7676
default:
7777
$ref: '#/components/responses/GeneralError'
78+
/repositories/{repository}/objects/getAll:
79+
post:
80+
operationId: postRepositoryObjectsGetAll
81+
parameters:
82+
- name: repository
83+
in: "path"
84+
required: true
85+
schema:
86+
type: string
87+
responses:
88+
"200":
89+
$ref: '#/components/responses/200'
7890
/repositories/{repository}/branches:
7991
get:
8092
operationId: getRepositoryBranches

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import io.ktor.server.request.receiveStream
2525
import io.ktor.server.response.respond
2626
import io.ktor.server.response.respondBytesWriter
2727
import io.ktor.server.response.respondText
28+
import io.ktor.server.response.respondTextWriter
2829
import io.ktor.server.routing.route
2930
import io.ktor.server.routing.routing
3031
import io.ktor.util.cio.use
@@ -200,6 +201,23 @@ class ModelReplicationServer(
200201
}
201202
}
202203

204+
override suspend fun PipelineContext<Unit, ApplicationCall>.postRepositoryObjectsGetAll(repository: String) {
205+
val keys = call.receiveStream().bufferedReader().use { reader ->
206+
reader.lineSequence().toHashSet()
207+
}
208+
val objects = withContext(Dispatchers.IO) { modelClient.store.getAll(keys) }
209+
call.respondTextWriter(contentType = VersionDeltaStream.CONTENT_TYPE) {
210+
objects.forEach {
211+
append(it.key)
212+
append("\n")
213+
append(it.value)
214+
append("\n")
215+
}
216+
// additional empty line indicates end of stream and can be used to verify completeness of data transfer
217+
append("\n")
218+
}
219+
}
220+
203221
override suspend fun PipelineContext<Unit, ApplicationCall>.pollRepositoryBranchHash(
204222
repository: String,
205223
branch: String,
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.server
18+
19+
import io.ktor.server.testing.ApplicationTestBuilder
20+
import io.ktor.server.testing.testApplication
21+
import org.modelix.authorization.installAuthentication
22+
import org.modelix.model.api.INode
23+
import org.modelix.model.api.NullChildLink
24+
import org.modelix.model.api.TreePointer
25+
import org.modelix.model.api.addNewChild
26+
import org.modelix.model.api.getDescendants
27+
import org.modelix.model.api.getRootNode
28+
import org.modelix.model.client2.lazyLoadVersion
29+
import org.modelix.model.client2.runWrite
30+
import org.modelix.model.lazy.RepositoryId
31+
import org.modelix.model.persistent.MapBasedStore
32+
import org.modelix.model.server.api.v2.ObjectHash
33+
import org.modelix.model.server.api.v2.SerializedObject
34+
import org.modelix.model.server.handlers.IdsApiImpl
35+
import org.modelix.model.server.handlers.ModelReplicationServer
36+
import org.modelix.model.server.store.InMemoryStoreClient
37+
import kotlin.test.Test
38+
import kotlin.test.assertTrue
39+
40+
class LazyLoadingTest {
41+
42+
private lateinit var statistics: StoreClientWithStatistics
43+
44+
private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication {
45+
application {
46+
installAuthentication(unitTestMode = true)
47+
installDefaultServerPlugins()
48+
statistics = StoreClientWithStatistics(InMemoryStoreClient())
49+
ModelReplicationServer(statistics).init(this)
50+
IdsApiImpl(statistics).init(this)
51+
}
52+
block()
53+
}
54+
55+
private fun assertRequestCount(atLeast: Long, body: () -> Unit): Long {
56+
val requestCount = measureRequests(body)
57+
assertTrue(requestCount >= atLeast, "At least $atLeast requests expected, but was $requestCount")
58+
return requestCount
59+
}
60+
61+
private fun measureRequests(body: () -> Unit): Long {
62+
val before = statistics.getTotalRequests()
63+
body()
64+
val after = statistics.getTotalRequests()
65+
val requestCount = after - before
66+
println("Requests: $requestCount")
67+
return requestCount
68+
}
69+
70+
@Test
71+
fun `model data is loaded on demand`() = runTest {
72+
// After optimizing the lazy loading to send less (but bigger) requests, this test might fail.
73+
// Just update the model size, cache size and expected request count to fix it.
74+
75+
val client = createModelClient()
76+
val branchRef = RepositoryId("my-repo").getBranchReference()
77+
client.runWrite(branchRef) {
78+
fun createNodes(parentNode: INode, numberOfNodes: Int) {
79+
if (numberOfNodes == 0) return
80+
if (numberOfNodes == 1) {
81+
parentNode.addNewChild(NullChildLink, 0)
82+
return
83+
}
84+
val subtreeSize1 = numberOfNodes / 2
85+
val subtreeSize2 = numberOfNodes - subtreeSize1
86+
createNodes(parentNode.addNewChild(NullChildLink, 0), subtreeSize1 - 1)
87+
createNodes(parentNode.addNewChild(NullChildLink, 1), subtreeSize2 - 1)
88+
}
89+
90+
createNodes(it, 5_000)
91+
}
92+
val version = client.lazyLoadVersion(branchRef, cacheSize = 500)
93+
94+
val rootNode = TreePointer(version.getTree()).getRootNode()
95+
96+
// Traverse to the first leaf node. This should load some data, but not the whole model.
97+
assertRequestCount(1) {
98+
generateSequence(rootNode) { it.allChildren.firstOrNull() }.count()
99+
}
100+
101+
// Traverse the whole model.
102+
val requestCountFirstTraversal = assertRequestCount(10) {
103+
rootNode.getDescendants(true).count()
104+
}
105+
106+
// Traverse the whole model a second time. The model doesn't fit into the cache and some parts are already
107+
// unloaded during the first traversal. The unloaded parts need to be requested again.
108+
// But the navigation to the first leaf is like a warmup of the cache for the whole model traversal.
109+
// The previous traversal can benefit from that, but the next one cannot and is expected to need more requests.
110+
assertRequestCount(requestCountFirstTraversal + 1) {
111+
rootNode.getDescendants(true).count()
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)