Skip to content

Commit 71dc86b

Browse files
committed
feat(model-client): lazy loading support for JS
1 parent 1839f55 commit 71dc86b

File tree

9 files changed

+147
-108
lines changed

9 files changed

+147
-108
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package org.modelix.kotlin.utils
2+
3+
expect fun <R> runBlockingIfJvm(body: suspend () -> R): R
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.modelix.kotlin.utils
2+
3+
actual fun <R> runBlockingIfJvm(body: suspend () -> R): R {
4+
throw UnsupportedOperationException("runBlocking not support by JS")
5+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.modelix.kotlin.utils
2+
3+
import kotlinx.coroutines.runBlocking
4+
5+
actual fun <R> runBlockingIfJvm(body: suspend () -> R): R {
6+
return runBlocking {
7+
body()
8+
}
9+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.modelix.model.client2
2+
3+
import org.modelix.model.IVersion
4+
import org.modelix.model.async.BulkAsyncStore
5+
import org.modelix.model.async.CachingAsyncStore
6+
import org.modelix.model.lazy.BranchReference
7+
import org.modelix.model.lazy.CLVersion
8+
import org.modelix.model.lazy.CacheConfiguration
9+
import org.modelix.model.lazy.RepositoryId
10+
11+
/**
12+
* This function loads parts of the model lazily while it is iterated and limits the amount of data that is cached on
13+
* the client side.
14+
*
15+
* IModelClientV2#loadVersion eagerly loads the whole model. For large models this can be slow and requires lots of
16+
* memory.
17+
*/
18+
fun IModelClientV2.lazyLoadVersion(repositoryId: RepositoryId, versionHash: String, config: CacheConfiguration = CacheConfiguration()): IVersion {
19+
val store = BulkAsyncStore(
20+
CachingAsyncStore(
21+
ModelClientAsStore(this, repositoryId),
22+
cacheSize = config.cacheSize,
23+
),
24+
batchSize = config.requestBatchSize,
25+
)
26+
return store.getStreamExecutor().query {
27+
CLVersion.Companion.tryLoadFromHash(versionHash, store).assertNotEmpty { "Version not found: $versionHash" }
28+
}
29+
}
30+
31+
/**
32+
* An overload of [IModelClientV2.lazyLoadVersion] that reads the current version hash of the branch from the server and
33+
* then loads that version with lazy loading support.
34+
*/
35+
suspend fun IModelClientV2.lazyLoadVersion(branchRef: BranchReference, config: CacheConfiguration = CacheConfiguration()): IVersion {
36+
return lazyLoadVersion(branchRef.repositoryId, pullHash(branchRef), config)
37+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.modelix.model.client2
2+
3+
import kotlinx.coroutines.flow.flow
4+
import org.modelix.model.IKeyValueStore
5+
import org.modelix.model.async.AsyncStoreAsLegacyDeserializingStore
6+
import org.modelix.model.async.IAsyncObjectStore
7+
import org.modelix.model.async.ObjectHash
8+
import org.modelix.model.lazy.IDeserializingKeyValueStore
9+
import org.modelix.model.lazy.RepositoryId
10+
import org.modelix.model.persistent.HashUtil
11+
import org.modelix.model.persistent.IKVValue
12+
import org.modelix.streams.IStream
13+
import org.modelix.streams.IStreamExecutor
14+
import org.modelix.streams.SimpleStreamExecutor
15+
import org.modelix.streams.withFlows
16+
17+
class ModelClientAsStore(client: IModelClientV2, val repositoryId: RepositoryId) : IAsyncObjectStore {
18+
private val client: IModelClientV2Internal = client as IModelClientV2Internal
19+
20+
override fun getLegacyKeyValueStore(): IKeyValueStore {
21+
throw UnsupportedOperationException()
22+
}
23+
24+
override fun getLegacyObjectStore(): IDeserializingKeyValueStore {
25+
return AsyncStoreAsLegacyDeserializingStore(this)
26+
}
27+
28+
override fun <T : Any> getIfCached(key: ObjectHash<T>): T? {
29+
return null
30+
}
31+
32+
override fun <T : Any> get(key: ObjectHash<T>): IStream.ZeroOrOne<T> {
33+
return getAllAsStream(IStream.of(key)).map {
34+
checkNotNull(it.second) { "Entry not found: ${key.hash}" } as T
35+
}.exactlyOne()
36+
}
37+
38+
override fun getAllAsStream(keys: IStream.Many<ObjectHash<*>>): IStream.Many<Pair<ObjectHash<*>, Any?>> {
39+
return keys.toList().flatMap { keysAsList ->
40+
getAllAsMap(keysAsList).flatMapIterable { it.entries }.map { it.key to it.value }
41+
}
42+
}
43+
44+
override fun getAllAsMap(keys: List<ObjectHash<*>>): IStream.One<Map<ObjectHash<*>, Any?>> {
45+
return IStream.fromFlow(
46+
flow {
47+
val serializedObjects = client.getObjects(repositoryId, keys.asSequence().map { it.hash })
48+
val deserializedObjects = keys.associateWith {
49+
serializedObjects[it.hash]?.let { p1 -> it.deserializer(p1) }
50+
}
51+
emit(deserializedObjects)
52+
},
53+
).exactlyOne()
54+
}
55+
56+
override fun putAll(entries: Map<ObjectHash<*>, IKVValue>): IStream.Zero {
57+
return IStream.fromFlow<Nothing>(
58+
flow {
59+
client.pushObjects(
60+
repositoryId,
61+
entries.asSequence().map { (key, value) ->
62+
require(HashUtil.isSha256(key.hash)) { "Only immutable objects are allowed: $key -> $value" }
63+
key.hash to value.serialize()
64+
},
65+
)
66+
},
67+
).drainAll()
68+
}
69+
70+
override fun getStreamExecutor(): IStreamExecutor {
71+
return SimpleStreamExecutor().withFlows()
72+
}
73+
}

model-client/src/jvmMain/kotlin/org/modelix/model/client2/LazyLoading.kt

Lines changed: 0 additions & 97 deletions
This file was deleted.

model-datastructure/src/commonMain/kotlin/org/modelix/model/async/BulkAsyncStore.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import org.modelix.streams.BulkRequestStreamExecutor
77
import org.modelix.streams.IBulkExecutor
88
import org.modelix.streams.IStream
99
import org.modelix.streams.IStreamExecutor
10-
import org.modelix.streams.SimpleStreamExecutor
11-
import org.modelix.streams.withSequences
1210

1311
class BulkAsyncStore(
1412
val store: IAsyncObjectStore,
@@ -18,13 +16,11 @@ class BulkAsyncStore(
1816
private val bulkExecutor = BulkRequestStreamExecutor<ObjectHash<*>, Any?>(
1917
object : IBulkExecutor<ObjectHash<*>, Any?> {
2018
override fun execute(keys: List<ObjectHash<*>>): Map<ObjectHash<*>, Any?> {
21-
@Suppress("DEPRECATION")
22-
return SimpleStreamExecutor().withSequences().query { store.getAllAsMap(keys) }
19+
return store.getStreamExecutor().query { store.getAllAsMap(keys) }
2320
}
2421

2522
override suspend fun executeSuspending(keys: List<ObjectHash<*>>): Map<ObjectHash<*>, Any?> {
26-
@Suppress("DEPRECATION")
27-
return SimpleStreamExecutor().withSequences().querySuspending { store.getAllAsMap(keys) }
23+
return store.getStreamExecutor().querySuspending { store.getAllAsMap(keys) }
2824
}
2925
},
3026
batchSize = batchSize,

model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class LazyLoadingTest {
8181
@Ignore @Test fun compare_cache_size_3200() = compare_cache_size(100_000, 22, 48, 0, 212, 1859, 0)
8282
private fun compare_cache_size(cacheSize: Int, vararg expected: Int) = runLazyLoadingTest(DepthFirstSearchPattern, 1_000, cacheSize, 50, 50, *expected)
8383

84-
@Ignore @Test fun compare_prefetch_size_0() = compare_prefetch_size(0, 22, 2055, 2073, 22, 2055, 2073)
84+
@Test fun compare_prefetch_size_0() = compare_prefetch_size(0, 22, 2055, 2073, 22, 2055, 2073)
8585
@Ignore @Test fun compare_prefetch_size_2() = compare_prefetch_size(2, 22, 1028, 1046, 38, 2056, 2092)
8686
@Ignore @Test fun compare_prefetch_size_4() = compare_prefetch_size(3, 22, 707, 717, 53, 2121, 2151)
8787
@Ignore @Test fun compare_prefetch_size_10() = compare_prefetch_size(10, 22, 379, 406, 115, 3773, 4013)

streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import kotlinx.coroutines.flow.take
2727
import kotlinx.coroutines.flow.toList
2828
import kotlinx.coroutines.flow.withIndex
2929
import kotlinx.coroutines.flow.zip
30+
import org.modelix.kotlin.utils.runBlockingIfJvm
3031
import kotlin.coroutines.coroutineContext
3132

3233
class FlowStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder, IStreamExecutorProvider by executor {
@@ -91,7 +92,11 @@ class FlowStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder, ISt
9192
override fun toList(): IStream.One<List<E>> = Wrapper(flow { emit(wrapped.toList()) })
9293
override fun asSequence(): Sequence<E> = throw UnsupportedOperationException()
9394

94-
override fun iterateSynchronous(visitor: (E) -> Unit) = throw UnsupportedOperationException()
95+
override fun iterateSynchronous(visitor: (E) -> Unit) {
96+
runBlockingIfJvm {
97+
wrapped.collect { visitor(it) }
98+
}
99+
}
95100
override suspend fun iterateSuspending(visitor: suspend (E) -> Unit) {
96101
wrapped.collect(visitor)
97102
}
@@ -107,7 +112,7 @@ class FlowStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder, ISt
107112
)
108113
}
109114
override fun executeSynchronous() {
110-
throw UnsupportedOperationException()
115+
runBlockingIfJvm { wrapped.collect() }
111116
}
112117

113118
override fun andThen(other: IStream.Zero): IStream.Zero {
@@ -147,7 +152,15 @@ class FlowStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder, ISt
147152

148153
inner class Wrapper<E>(wrapped: Flow<E>) : WrapperBase<E>(wrapped), IStream.One<E> {
149154
override fun getAsync(onError: ((Throwable) -> Unit)?, onSuccess: ((E) -> Unit)?) {
150-
throw UnsupportedOperationException()
155+
runBlockingIfJvm {
156+
try {
157+
wrapped.collect {
158+
onSuccess?.invoke(it)
159+
}
160+
} catch (ex: Throwable) {
161+
onError?.invoke(ex)
162+
}
163+
}
151164
}
152165

153166
override fun <R> flatMapOne(mapper: (E) -> IStream.One<R>): IStream.One<R> {
@@ -191,7 +204,7 @@ class FlowStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder, ISt
191204
}
192205

193206
override fun getSynchronous(): E {
194-
return throw UnsupportedOperationException()
207+
return runBlockingIfJvm { wrapped.toList().single() }
195208
}
196209

197210
override suspend fun getSuspending(): E {

0 commit comments

Comments
 (0)