Skip to content

Commit 81e099c

Browse files
committed
feat(streams): introduction of IStreamExecutor
1 parent 899f7a9 commit 81e099c

File tree

79 files changed

+1064
-793
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+1064
-793
lines changed

commitlint.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module.exports = {
2222
"mps-model-server",
2323
"mps-sync-plugin",
2424
"openapi",
25+
"streams",
2526
"ts-model-api",
2627
"vue-model-api",
2728
],

model-api/src/commonMain/kotlin/org/modelix/model/api/async/AsyncNode.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.modelix.model.api.IReferenceLinkReference
1111
import org.modelix.model.api.resolve
1212
import org.modelix.model.api.resolveInCurrentContext
1313
import org.modelix.streams.IStream
14+
import org.modelix.streams.IStreamExecutor
1415
import org.modelix.streams.flatten
1516

1617
class AsyncNode(
@@ -20,6 +21,10 @@ class AsyncNode(
2021
private val createNodeAdapter: (Long) -> IAsyncNode,
2122
) : IAsyncNode {
2223

24+
override fun getStreamExecutor(): IStreamExecutor {
25+
return tree().getStreamExecutor()
26+
}
27+
2328
override fun asRegularNode(): INode = regularNode
2429

2530
private fun Long.asNode(): IAsyncNode = createNodeAdapter(this)

model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncNode.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import org.modelix.model.api.INodeReference
88
import org.modelix.model.api.IPropertyReference
99
import org.modelix.model.api.IReferenceLinkReference
1010
import org.modelix.streams.IStream
11+
import org.modelix.streams.IStreamExecutor
1112

1213
interface IAsyncNode {
1314
fun asRegularNode(): INode
15+
fun getStreamExecutor(): IStreamExecutor
1416

1517
fun getConcept(): IStream.One<IConcept>
1618
fun getConceptRef(): IStream.One<ConceptReference>

model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncTree.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ import org.modelix.model.api.IPropertyReference
77
import org.modelix.model.api.IReferenceLinkReference
88
import org.modelix.model.api.ITree
99
import org.modelix.streams.IStream
10+
import org.modelix.streams.IStreamExecutor
1011
import org.modelix.streams.plus
1112

1213
interface IAsyncTree {
1314
fun asSynchronousTree(): ITree
15+
fun getStreamExecutor(): IStreamExecutor
16+
1417
fun getChanges(oldVersion: IAsyncTree, changesOnly: Boolean): IStream.Many<TreeChangeEvent>
1518

1619
fun getConceptReference(nodeId: Long): IStream.One<ConceptReference>

model-api/src/commonMain/kotlin/org/modelix/model/api/async/NodeAsAsyncNode.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,14 @@ import org.modelix.model.api.getDescendants
1717
import org.modelix.model.api.meta.NullConcept
1818
import org.modelix.model.api.toReference
1919
import org.modelix.streams.IStream
20+
import org.modelix.streams.IStreamExecutor
21+
import org.modelix.streams.SimpleStreamExecutor
22+
import org.modelix.streams.withSequences
2023

21-
class NodeAsAsyncNode(val node: INode) : IAsyncNode {
24+
open class NodeAsAsyncNode(val node: INode) : IAsyncNode {
25+
override fun getStreamExecutor(): IStreamExecutor {
26+
return SimpleStreamExecutor().withSequences()
27+
}
2228

2329
private fun <T : Any> T?.asOptionalMono(): Maybe<T> = if (this != null) maybeOf(this) else maybeOfEmpty()
2430
private fun <T> T.asMono(): Single<T> = singleOf(this)

model-client/src/commonMain/kotlin/org/modelix/model/client/GarbageFilteringStore.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ import org.modelix.model.IKeyValueStore
55
import org.modelix.model.IKeyValueStoreWrapper
66
import org.modelix.model.api.runSynchronized
77
import org.modelix.model.persistent.HashUtil
8+
import org.modelix.streams.IStreamExecutorProvider
89

910
@Deprecated(message = "Replaced by NonWrittenEntry")
10-
class GarbageFilteringStore(private val store: IKeyValueStore) : IKeyValueStoreWrapper {
11+
class GarbageFilteringStore(private val store: IKeyValueStore) : IKeyValueStoreWrapper, IStreamExecutorProvider by store {
1112
private val pendingEntries: MutableMap<String?, String?> = HashMap()
1213

1314
override fun get(key: String): String? {

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ import org.modelix.model.server.api.v2.VersionDeltaStreamV2
7373
import org.modelix.model.server.api.v2.asStream
7474
import org.modelix.modelql.client.ModelQLClient
7575
import org.modelix.modelql.core.IMonoStep
76+
import org.modelix.streams.IExecutableStream
7677
import org.modelix.streams.IStream
78+
import org.modelix.streams.SimpleStreamExecutor
79+
import org.modelix.streams.withSequences
7780
import kotlin.time.Duration
7881
import kotlin.time.Duration.Companion.seconds
7982

@@ -298,35 +301,43 @@ class ModelClientV2(
298301
LOG.debug { "${clientId.toString(16)}.push($branch, $version, $baseVersion)" }
299302
require(version is CLVersion)
300303
require(baseVersion is CLVersion?)
301-
return IStream.useSequencesSuspending {
302-
version.write()
303-
val objects = version.fullDiff(baseVersion)
304-
// large HTTP requests and large Json objects don't scale well
305-
val lastChunk = pushObjects(branch.repositoryId, objects.map { it.hash to it.serialize() }, returnLastChunk = true)
306-
val delta = VersionDelta(version.getContentHash(), null, objectsMap = lastChunk.toMap())
307-
httpClient.preparePost {
308-
url {
309-
takeFrom(baseUrl)
310-
appendPathSegmentsEncodingSlash("repositories", branch.repositoryId.id, "branches", branch.branchName)
311-
}
312-
useVersionStreamFormat()
313-
contentType(ContentType.Application.Json)
314-
setBody(delta)
315-
}.execute { response ->
316-
createVersion(getStore(branch.repositoryId), version, response.readVersionDelta())
304+
version.write()
305+
val objects = version.asyncStore.getStreamExecutor().queryManyLater {
306+
version.fullDiff(baseVersion).map { it.hash to it.serialize() }
307+
}
308+
// large HTTP requests and large Json objects don't scale well
309+
val lastChunk = pushObjects(branch.repositoryId, objects, returnLastChunk = true)
310+
val delta = VersionDelta(version.getContentHash(), null, objectsMap = lastChunk.toMap())
311+
return httpClient.preparePost {
312+
url {
313+
takeFrom(baseUrl)
314+
appendPathSegmentsEncodingSlash("repositories", branch.repositoryId.id, "branches", branch.branchName)
317315
}
316+
useVersionStreamFormat()
317+
contentType(ContentType.Application.Json)
318+
setBody(delta)
319+
}.execute { response ->
320+
createVersion(getStore(branch.repositoryId), version, response.readVersionDelta())
318321
}
319322
}
320323

321324
override suspend fun pushObjects(repository: RepositoryId, objects: Sequence<ObjectHashAndSerializedObject>) {
322-
pushObjects(repository, IStream.many(objects), false)
325+
pushObjects(
326+
repository,
327+
SimpleStreamExecutor().withSequences().queryManyLater { IStream.many(objects) },
328+
false,
329+
)
323330
}
324331

325332
/**
326333
* If the last chunk is smaller than #minBodySize, then the remaining objects are returned for inlining in the
327334
* main request.
328335
*/
329-
private suspend fun pushObjects(repository: RepositoryId, objects: IStream.Many<ObjectHashAndSerializedObject>, returnLastChunk: Boolean): List<ObjectHashAndSerializedObject> {
336+
private suspend fun pushObjects(
337+
repository: RepositoryId,
338+
objects: IExecutableStream.Many<ObjectHashAndSerializedObject>,
339+
returnLastChunk: Boolean,
340+
): List<ObjectHashAndSerializedObject> {
330341
LOG.debug { "${clientId.toString(16)}.pushObjects($repository)" }
331342
val maxBodySize = 2 * 1024 * 1024
332343
val chunkContent = StringBuilder()
@@ -345,7 +356,7 @@ class ModelClientV2(
345356
chunkEntries.clear()
346357
}
347358

348-
objects.asSequence().forEach { entry ->
359+
objects.iterateSuspending { entry ->
349360
val entrySize = (if (chunkContent.isEmpty()) 0 else 1) + entry.first.length + 1 + entry.second.length
350361
if (chunkContent.length + entrySize > maxBodySize) {
351362
sendChunk()

model-client/src/commonTest/kotlin/org/modelix/model/api/TreePointerTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ class TreePointerTest {
2323
val rootNode = branch.getRootNode()
2424
val role = IReferenceLinkReference.fromName("refA")
2525
rootNode.setReferenceTarget(role.toLegacy(), rootNode)
26-
assertEquals(rootNode, rootNode.asAsyncNode().getReferenceTarget(role).getSynchronous()?.asRegularNode())
26+
assertEquals(rootNode, rootNode.asAsyncNode().let { n -> n.getStreamExecutor().query { n.getReferenceTarget(role).orNull() } }?.asRegularNode())
2727
}
2828
}

model-client/src/jvmMain/kotlin/org/modelix/model/KeyValueStoreCache.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package org.modelix.model
33
import org.apache.commons.collections4.map.LRUMap
44
import org.modelix.model.persistent.HashUtil
55
import org.modelix.model.util.StreamUtils.toStream
6+
import org.modelix.streams.IStreamExecutorProvider
67
import java.util.Collections
78
import java.util.concurrent.CompletableFuture
89
import java.util.stream.Collectors
910

10-
class KeyValueStoreCache(private val store: IKeyValueStore) : IKeyValueStoreWrapper {
11+
class KeyValueStoreCache(private val store: IKeyValueStore) : IKeyValueStoreWrapper, IStreamExecutorProvider by store {
1112
private val cache = Collections.synchronizedMap(LRUMap<String, String?>(300000))
1213
private val pendingPrefetches: MutableSet<String> = HashSet()
1314
private val activeRequests: MutableList<GetRequest> = ArrayList()

model-client/src/jvmMain/kotlin/org/modelix/model/client/AsyncStore.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ package org.modelix.model.client
33
import org.modelix.model.IKeyListener
44
import org.modelix.model.IKeyValueStore
55
import org.modelix.model.IKeyValueStoreWrapper
6+
import org.modelix.streams.IStreamExecutorProvider
67
import java.util.Objects
78
import java.util.concurrent.atomic.AtomicBoolean
89

9-
class AsyncStore(private val store: IKeyValueStore) : IKeyValueStoreWrapper {
10+
class AsyncStore(private val store: IKeyValueStore) : IKeyValueStoreWrapper, IStreamExecutorProvider by store {
1011
private val consumerActive = AtomicBoolean()
1112
private val pendingWrites: MutableMap<String, String?> = LinkedHashMap()
1213
override fun get(key: String): String? {

0 commit comments

Comments
 (0)