Skip to content

Commit 65ca207

Browse files
committed
perf(model-client): prefetch data to reduce the number of small requests
1 parent ff4b503 commit 65ca207

File tree

26 files changed

+937
-220
lines changed

26 files changed

+937
-220
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/client/GarbageFilteringStore.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ class GarbageFilteringStore(private val store: IKeyValueStore) : IKeyValueStoreW
2929
return if (pendingEntries.containsKey(key)) pendingEntries[key] else store[key]
3030
}
3131

32+
override fun getIfCached(key: String): String? {
33+
return if (pendingEntries.containsKey(key)) pendingEntries[key] else store.getIfCached(key)
34+
}
35+
3236
override fun getPendingSize(): Int = store.getPendingSize() + pendingEntries.size
3337

3438
override fun getWrapped(): IKeyValueStore = store

model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,4 @@ interface IModelClientV2 {
8686
suspend fun <R> query(branch: BranchReference, body: (IMonoStep<INode>) -> IMonoStep<R>): R
8787

8888
suspend fun <R> query(repositoryId: RepositoryId, versionHash: String, body: (IMonoStep<INode>) -> IMonoStep<R>): R
89-
90-
suspend fun getObjects(repository: RepositoryId, keys: Sequence<String>): Map<String, String>
91-
92-
suspend fun pushObjects(repository: RepositoryId, objects: Sequence<Pair<String, String>>)
9389
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.client2
18+
19+
import org.modelix.model.lazy.RepositoryId
20+
import org.modelix.model.server.api.v2.ObjectHash
21+
import org.modelix.model.server.api.v2.ObjectHashAndSerializedObject
22+
import org.modelix.model.server.api.v2.SerializedObject
23+
24+
/**
25+
* Should only be used by Modelix components.
26+
*/
27+
interface IModelClientV2Internal : IModelClientV2 {
28+
/**
29+
* Required for lazy loading.
30+
* Use [IModelClientV2.lazyLoadVersion]
31+
*/
32+
suspend fun getObjects(repository: RepositoryId, keys: Sequence<ObjectHash>): Map<ObjectHash, SerializedObject>
33+
34+
/**
35+
* Required for lazy loading.
36+
* Use [IModelClientV2.lazyLoadVersion]
37+
*/
38+
suspend fun pushObjects(repository: RepositoryId, objects: Sequence<ObjectHashAndSerializedObject>)
39+
}

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ import org.modelix.model.lazy.computeDelta
6868
import org.modelix.model.operations.OTBranch
6969
import org.modelix.model.persistent.HashUtil
7070
import org.modelix.model.persistent.MapBasedStore
71+
import org.modelix.model.server.api.v2.ImmutableObjectsStream
72+
import org.modelix.model.server.api.v2.ObjectHash
73+
import org.modelix.model.server.api.v2.ObjectHashAndSerializedObject
74+
import org.modelix.model.server.api.v2.SerializedObject
7175
import org.modelix.model.server.api.v2.VersionDelta
7276
import org.modelix.model.server.api.v2.VersionDeltaStream
7377
import org.modelix.model.server.api.v2.VersionDeltaStreamV2
@@ -83,7 +87,7 @@ class ModelClientV2(
8387
private val httpClient: HttpClient,
8488
val baseUrl: String,
8589
private var clientProvidedUserId: String?,
86-
) : IModelClientV2, Closable {
90+
) : IModelClientV2, IModelClientV2Internal, Closable {
8791
private var clientId: Int = 0
8892
private var idGenerator: IIdGenerator = IdGeneratorDummy()
8993
private var serverProvidedUserId: String? = null
@@ -265,27 +269,16 @@ class ModelClientV2(
265269
}
266270
}
267271

268-
override suspend fun getObjects(repository: RepositoryId, keys: Sequence<String>): Map<String, String> {
269-
val response = httpClient.post {
272+
override suspend fun getObjects(repository: RepositoryId, keys: Sequence<ObjectHash>): Map<ObjectHash, SerializedObject> {
273+
return httpClient.preparePost {
270274
url {
271275
takeFrom(baseUrl)
272276
appendPathSegments("repositories", repository.id, "objects", "getAll")
273277
}
274278
setBody(keys.joinToString("\n"))
279+
}.execute { response ->
280+
ImmutableObjectsStream.decode(response.bodyAsChannel())
275281
}
276-
277-
val content = response.bodyAsChannel()
278-
val objects = HashMap<String, String>()
279-
while (true) {
280-
val key = checkNotNull(content.readUTF8Line()) { "Empty line expected at the end of the stream" }
281-
if (key == "") {
282-
check(content.readUTF8Line() == null) { "Empty line is only allowed at the end of the stream" }
283-
break
284-
}
285-
val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" }
286-
objects[key] = value
287-
}
288-
return objects
289282
}
290283

291284
override suspend fun push(branch: BranchReference, version: IVersion, baseVersion: IVersion?): IVersion {
@@ -315,7 +308,7 @@ class ModelClientV2(
315308
}
316309
}
317310

318-
override suspend fun pushObjects(repository: RepositoryId, objects: Sequence<Pair<String, String>>) {
311+
override suspend fun pushObjects(repository: RepositoryId, objects: Sequence<ObjectHashAndSerializedObject>) {
319312
LOG.debug { "${clientId.toString(16)}.pushObjects($repository)" }
320313
objects.chunked(100_000).forEach { unsortedChunk ->
321314
// Entries are sorted to avoid deadlocks on the server side between transactions.

model-client/src/jvmMain/kotlin/org/modelix/model/KeyValueStoreCache.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ class KeyValueStoreCache(private val store: IKeyValueStore) : IKeyValueStoreWrap
6161
return getAll(setOf(key))[key]
6262
}
6363

64+
override fun getIfCached(key: String): String? {
65+
return cache[key] ?: store.getIfCached(key)
66+
}
67+
6468
override fun getAll(keys: Iterable<String>): Map<String, String?> {
6569
val remainingKeys = toStream(keys).collect(Collectors.toList())
6670
val result: MutableMap<String, String?> = LinkedHashMap(16, 0.75.toFloat(), false)

model-client/src/jvmMain/kotlin/org/modelix/model/client/AsyncStore.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ class AsyncStore(private val store: IKeyValueStore) : IKeyValueStoreWrapper {
3333
return store[key]
3434
}
3535

36+
override fun getIfCached(key: String): String? {
37+
return synchronized(pendingWrites) {
38+
pendingWrites[key]
39+
} ?: store.getIfCached(key)
40+
}
41+
3642
override fun getWrapped(): IKeyValueStore = store
3743

3844
override fun getPendingSize(): Int = store.getPendingSize() + pendingWrites.size

model-client/src/jvmMain/kotlin/org/modelix/model/client/RestWebModelClient.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,10 @@ class RestWebModelClient @JvmOverloads constructor(
371371
return runBlocking { getA(key) }
372372
}
373373

374+
override fun getIfCached(key: String): String? {
375+
return null // doesn't contain any caches
376+
}
377+
374378
override suspend fun getA(key: String): String? {
375379
val isHash = HashUtil.isSha256(key)
376380
if (isHash) {
Lines changed: 102 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,102 @@
1-
/*
2-
* Copyright (c) 2024.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
17-
package org.modelix.model.client2
18-
19-
import kotlinx.coroutines.runBlocking
20-
import org.modelix.model.IKeyListener
21-
import org.modelix.model.IKeyValueStore
22-
import org.modelix.model.IVersion
23-
import org.modelix.model.lazy.BranchReference
24-
import org.modelix.model.lazy.CLVersion
25-
import org.modelix.model.lazy.ObjectStoreCache
26-
import org.modelix.model.lazy.RepositoryId
27-
28-
fun IModelClientV2.lazyLoadVersion(repositoryId: RepositoryId, versionHash: String, cacheSize: Int = 100_000): IVersion {
29-
val store = ObjectStoreCache(ModelClientAsStore(this, repositoryId), cacheSize)
30-
return CLVersion.loadFromHash(versionHash, store)
31-
}
32-
33-
suspend fun IModelClientV2.lazyLoadVersion(branchRef: BranchReference, cacheSize: Int = 100_000): IVersion {
34-
return lazyLoadVersion(branchRef.repositoryId, pullHash(branchRef), cacheSize)
35-
}
36-
37-
class ModelClientAsStore(val client: IModelClientV2, val repositoryId: RepositoryId) : IKeyValueStore {
38-
override fun get(key: String): String? {
39-
return getAll(listOf(key))[key]
40-
}
41-
42-
override fun put(key: String, value: String?) {
43-
TODO("Not yet implemented")
44-
}
45-
46-
override fun getAll(keys: Iterable<String>): Map<String, String?> {
47-
return runBlocking {
48-
client.getObjects(repositoryId, keys.asSequence())
49-
}
50-
}
51-
52-
override fun putAll(entries: Map<String, String?>) {
53-
TODO("Not yet implemented")
54-
}
55-
56-
override fun prefetch(key: String) {
57-
TODO("Not yet implemented")
58-
}
59-
60-
override fun listen(key: String, listener: IKeyListener) {
61-
TODO("Not yet implemented")
62-
}
63-
64-
override fun removeListener(key: String, listener: IKeyListener) {
65-
TODO("Not yet implemented")
66-
}
67-
68-
override fun getPendingSize(): Int {
69-
TODO("Not yet implemented")
70-
}
71-
}
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.client2
18+
19+
import kotlinx.coroutines.runBlocking
20+
import org.modelix.model.IKeyListener
21+
import org.modelix.model.IKeyValueStore
22+
import org.modelix.model.IVersion
23+
import org.modelix.model.lazy.BranchReference
24+
import org.modelix.model.lazy.CLVersion
25+
import org.modelix.model.lazy.CacheConfiguration
26+
import org.modelix.model.lazy.ObjectStoreCache
27+
import org.modelix.model.lazy.RepositoryId
28+
import org.modelix.model.persistent.HashUtil
29+
30+
/**
31+
* This function loads parts of the model lazily while it is iterated and limits the amount of data that is cached on
32+
* the client side.
33+
*
34+
* IModelClientV2#loadVersion eagerly loads the whole model. For large models this can be slow and requires lots of
35+
* memory.
36+
* To reduce the relative overhead of requests to the server, the lazy loading algorithm tries to predict which nodes
37+
* are required next and fill a "prefetch cache" by using "free capacity" of the regular requests. That means,
38+
* the number of requests doesn't change by this prefetching, but small requests are filled to up to their limit with
39+
* additional prefetch requests.
40+
*/
41+
fun IModelClientV2.lazyLoadVersion(repositoryId: RepositoryId, versionHash: String, config: CacheConfiguration = CacheConfiguration()): IVersion {
42+
val store = ObjectStoreCache(ModelClientAsStore(this, repositoryId), config)
43+
return CLVersion.loadFromHash(versionHash, store)
44+
}
45+
46+
/**
47+
* An overload of [IModelClientV2.lazyLoadVersion] that reads the current version hash of the branch from the server and
48+
* then loads that version with lazy loading support.
49+
*/
50+
suspend fun IModelClientV2.lazyLoadVersion(branchRef: BranchReference, config: CacheConfiguration = CacheConfiguration()): IVersion {
51+
return lazyLoadVersion(branchRef.repositoryId, pullHash(branchRef), config)
52+
}
53+
54+
private class ModelClientAsStore(client: IModelClientV2, val repositoryId: RepositoryId) : IKeyValueStore {
55+
private val client: IModelClientV2Internal = client as IModelClientV2Internal
56+
57+
override fun get(key: String): String? {
58+
return getAll(listOf(key))[key]
59+
}
60+
61+
override fun getIfCached(key: String): String? {
62+
return null
63+
}
64+
65+
override fun put(key: String, value: String?) {
66+
putAll(mapOf(key to value))
67+
}
68+
69+
override fun getAll(keys: Iterable<String>): Map<String, String?> {
70+
return runBlocking {
71+
client.getObjects(repositoryId, keys.asSequence())
72+
}
73+
}
74+
75+
override fun putAll(entries: Map<String, String?>) {
76+
runBlocking {
77+
client.pushObjects(
78+
repositoryId,
79+
entries.asSequence().map { (key, value) ->
80+
require(HashUtil.isSha256(key) && value != null) { "Only immutable objects are allowed: $key -> $value" }
81+
key to value
82+
},
83+
)
84+
}
85+
}
86+
87+
override fun prefetch(key: String) {
88+
throw UnsupportedOperationException()
89+
}
90+
91+
override fun listen(key: String, listener: IKeyListener) {
92+
throw UnsupportedOperationException()
93+
}
94+
95+
override fun removeListener(key: String, listener: IKeyListener) {
96+
throw UnsupportedOperationException()
97+
}
98+
99+
override fun getPendingSize(): Int {
100+
return 0
101+
}
102+
}

model-datastructure/src/commonMain/kotlin/org/modelix/model/IKeyValueStore.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
package org.modelix.model
1717

1818
import org.modelix.model.lazy.BulkQuery
19+
import org.modelix.model.lazy.BulkQueryConfiguration
1920
import org.modelix.model.lazy.IBulkQuery
2021
import org.modelix.model.lazy.IDeserializingKeyValueStore
2122

2223
interface IKeyValueStore {
23-
fun newBulkQuery(deserializingCache: IDeserializingKeyValueStore): IBulkQuery = BulkQuery(deserializingCache)
24+
fun newBulkQuery(deserializingCache: IDeserializingKeyValueStore, config: BulkQueryConfiguration): IBulkQuery = BulkQuery(deserializingCache, config)
2425
operator fun get(key: String): String?
26+
fun getIfCached(key: String): String?
2527
suspend fun getA(key: String): String? = get(key)
2628
fun put(key: String, value: String?)
2729
fun getAll(keys: Iterable<String>): Map<String, String?>

0 commit comments

Comments
 (0)