diff --git a/model-api/src/commonMain/kotlin/org/modelix/model/api/async/AsyncNode.kt b/model-api/src/commonMain/kotlin/org/modelix/model/api/async/AsyncNode.kt index 9a482d7b9b..f72d5bf37e 100644 --- a/model-api/src/commonMain/kotlin/org/modelix/model/api/async/AsyncNode.kt +++ b/model-api/src/commonMain/kotlin/org/modelix/model/api/async/AsyncNode.kt @@ -85,10 +85,17 @@ class AsyncNode( override fun getDescendants(includeSelf: Boolean): IStream.Many { return if (includeSelf) { - getStreamExecutor() IStream.of(IStream.of(this), getDescendants(false)).flatten() } else { - getAllChildren().flatMap { it.getDescendants(true) } + getAllChildren().flatMapOrdered { it.getDescendants(true) } + } + } + + override fun getDescendantsUnordered(includeSelf: Boolean): IStream.Many { + return if (includeSelf) { + IStream.of(IStream.of(this), getDescendantsUnordered(false)).flatten() + } else { + getAllChildren().flatMapUnordered { it.getDescendantsUnordered(true) } } } } diff --git a/model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncNode.kt b/model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncNode.kt index 9fe729767c..a1e88a4a14 100644 --- a/model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncNode.kt +++ b/model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncNode.kt @@ -33,6 +33,7 @@ interface IAsyncNode : IStreamExecutorProvider { fun getAllReferenceTargets(): IStream.Many> fun getDescendants(includeSelf: Boolean): IStream.Many + fun getDescendantsUnordered(includeSelf: Boolean): IStream.Many = getDescendants(includeSelf) } interface INodeWithAsyncSupport : INode { diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt index b74ed741b7..a151ca9a78 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt @@ -1,6 +1,7 @@ package org.modelix.model.client2 import io.ktor.http.Url +import org.modelix.datastructures.objects.ObjectHash import org.modelix.kotlin.utils.DeprecationInfo import org.modelix.model.IVersion import org.modelix.model.ObjectDeltaFilter @@ -11,6 +12,7 @@ import org.modelix.model.lazy.BranchReference import org.modelix.model.lazy.RepositoryId import org.modelix.model.server.api.RepositoryConfig import org.modelix.modelql.core.IMonoStep +import kotlin.time.Duration /** * This interface is meant exclusively for model client usage. @@ -105,4 +107,41 @@ interface IModelClientV2 { suspend fun query(repositoryId: RepositoryId, versionHash: String, body: (IMonoStep) -> IMonoStep): R fun getFrontendUrl(branch: BranchReference): Url + + /** + * @param headVersion starting point for history computations. For a paginated view this value should be the same and + * the value for [skip] should be incremented instead. Only then its guaranteed that the returned list is + * complete. + * @param skip for a paginated view of the history + * @param limit maximum size of the returned list + * @param interval splits the timeline into equally sized intervals and returns only the last version of each interval + */ + suspend fun getHistory( + repositoryId: RepositoryId, + headVersion: ObjectHash, + skip: Int = 0, + limit: Int = 1000, + interval: Duration?, + ): HistoryResponse + + suspend fun getHistoryRange( + repositoryId: RepositoryId, + headVersion: ObjectHash, + skip: Long = 0L, + limit: Long = 1000L, + ): List } + +data class HistoryResponse( + val entries: List, + val nextVersions: List, +) + +data class HistoryEntry( + val firstVersionHash: ObjectHash, + val lastVersionHash: ObjectHash, + val minTime: Long?, + val maxTime: Long?, + val authors: Set, + // val headOfBranch: Set, +) diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientGraph.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientGraph.kt index f7ea88b78d..65c5af097e 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientGraph.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientGraph.kt @@ -128,8 +128,12 @@ class ModelClientGraph( hash: ObjectHash, data: T, ): ObjectReference { - // Should never be called, because all deserialization happens internally. - throw UnsupportedOperationException() + if (config.lazyLoadingEnabled) { + return ObjectReferenceImpl(this, hash, data) + } else { + // Should never be called, because all deserialization happens internally. + throw UnsupportedOperationException() + } } @Synchronized diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt index ab6ca56303..fd8121373e 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt @@ -37,7 +37,9 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch import mu.KotlinLogging +import org.modelix.datastructures.model.HistoryIndexNode import org.modelix.datastructures.objects.IObjectGraph +import org.modelix.datastructures.objects.Object import org.modelix.datastructures.objects.ObjectHash import org.modelix.kotlin.utils.DeprecationInfo import org.modelix.kotlin.utils.WeakValueMap @@ -89,6 +91,8 @@ import org.modelix.model.server.api.v2.toMap import org.modelix.modelql.client.ModelQLClient import org.modelix.modelql.core.IMonoStep import org.modelix.streams.IExecutableStream +import org.modelix.streams.getSuspending +import org.modelix.streams.iterateSuspending import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -328,6 +332,106 @@ class ModelClientV2( } } + override suspend fun getHistory( + repositoryId: RepositoryId, + headVersion: ObjectHash, + skip: Int, + limit: Int, + interval: Duration?, + ): HistoryResponse { + val index: Object = getHistoryIndex(repositoryId, headVersion) + val entries = if (interval != null) { + val mergedEntries = ArrayList() + var previousIntervalId: Long = Long.MAX_VALUE + try { + index.data.splitAtInterval(interval).iterateSuspending(index.graph) { + val intervalId = it.maxTime.toEpochMilliseconds() / interval.inWholeMilliseconds + require(intervalId <= previousIntervalId) + if (intervalId == previousIntervalId) { + val entry = mergedEntries[mergedEntries.lastIndex] + mergedEntries[mergedEntries.lastIndex] = HistoryEntry( + firstVersionHash = it.firstVersion.getHash(), + lastVersionHash = entry.lastVersionHash, + minTime = it.minTime.epochSeconds, + maxTime = entry.maxTime, + authors = entry.authors + it.authors, + ) + } else { + if (mergedEntries.size >= limit) throw LimitedReached() + previousIntervalId = intervalId + mergedEntries += HistoryEntry( + firstVersionHash = it.firstVersion.getHash(), + lastVersionHash = it.lastVersion.getHash(), + minTime = it.minTime.epochSeconds, + maxTime = it.maxTime.epochSeconds, + authors = it.authors, + ) + } + } + } catch (ex: LimitedReached) { + // Expected exception used for exiting the iterateSuspending call + } + mergedEntries + } else { + index.data.getAllVersionsReversed().flatMapOrdered { it.resolve() }.map { CLVersion(it) } + .map { + val hash = it.getObjectHash() + val time = it.getTimestamp()?.epochSeconds + HistoryEntry( + firstVersionHash = hash, + lastVersionHash = hash, + minTime = time, + maxTime = time, + authors = setOfNotNull(it.author), + ) + } + .take(limit) + .toList() + .getSuspending(index.graph) + } + return HistoryResponse(entries = entries, nextVersions = emptyList()) + } + + override suspend fun getHistoryRange( + repositoryId: RepositoryId, + headVersion: ObjectHash, + skip: Long, + limit: Long, + ): List { + val index: Object = getHistoryIndex(repositoryId, headVersion) + return index.data.getRange(skip until (limit + skip)) + .flatMapOrdered { it.getAllVersionsReversed() } + .flatMapOrdered { it.resolve() } + .map { CLVersion(it) } + .toList() + .getSuspending(index.graph) + } + + suspend fun getHistoryIndex( + repositoryId: RepositoryId?, + versionHash: ObjectHash, + ): Object { + return httpClient.prepareGet { + url { + takeFrom(baseUrl) + if (repositoryId == null) { + appendPathSegments("versions", versionHash.toString()) + } else { + appendPathSegments("repositories", repositoryId.id, "versions", versionHash.toString()) + } + appendPathSegments("history-index") + } + }.execute { response -> + val graph = getObjectGraph(repositoryId).also { it.config = it.config.copy(lazyLoadingEnabled = true) } + val text = response.bodyAsText() + val hashString = text.substringBefore('\n') + val serialized = text.substringAfter('\n') + val deserialized = HistoryIndexNode.deserialize(serialized, graph) + val ref = graph.fromDeserialized(ObjectHash(hashString), deserialized) + Object(deserialized, ref) + } + } + override suspend fun loadVersion( repositoryId: RepositoryId, versionHash: String, @@ -968,3 +1072,5 @@ fun IVersion.runWrite(idGenerator: IIdGenerator, author: String?, body: (IBranch } private fun String.ensureSuffix(suffix: String) = if (endsWith(suffix)) this else this + suffix + +private class LimitedReached : RuntimeException("limited reached") diff --git a/model-client/src/jsMain/kotlin/org/modelix/model/client2/ClientJS.kt b/model-client/src/jsMain/kotlin/org/modelix/model/client2/ClientJS.kt index ed8e35236d..074b73ddfd 100644 --- a/model-client/src/jsMain/kotlin/org/modelix/model/client2/ClientJS.kt +++ b/model-client/src/jsMain/kotlin/org/modelix/model/client2/ClientJS.kt @@ -8,13 +8,16 @@ import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.await import kotlinx.coroutines.promise +import kotlinx.datetime.toJSDate import org.modelix.datastructures.model.IGenericModelTree +import org.modelix.datastructures.objects.ObjectHash import org.modelix.model.TreeId import org.modelix.model.api.INode import org.modelix.model.api.INodeReference import org.modelix.model.api.JSNodeConverter import org.modelix.model.client.IdGenerator import org.modelix.model.data.ModelData +import org.modelix.model.lazy.CLVersion import org.modelix.model.lazy.RepositoryId import org.modelix.model.lazy.createObjectStoreCache import org.modelix.model.mutable.DummyIdGenerator @@ -111,6 +114,9 @@ interface ClientJS { */ fun initRepository(repositoryId: String, useRoleIds: Boolean = true): Promise + fun getHistoryRangeForBranch(repositoryId: String, branchId: String, skip: Int, limit: Int): Promise> + fun getHistoryRange(repositoryId: String, headVersion: String, skip: Int, limit: Int): Promise> + /** * Fetch existing branches for a given repository from the model server. * @@ -192,6 +198,30 @@ internal class ClientJSImpl(private val modelClient: ModelClientV2) : ClientJS { } } + override fun getHistoryRangeForBranch(repositoryId: String, branchId: String, skip: Int, limit: Int) = + GlobalScope.promise { modelClient.pullHash(RepositoryId(repositoryId).getBranchReference(branchId)) } + .then { getHistoryRange(repositoryId, it, skip, limit) } + .then { it } + + override fun getHistoryRange(repositoryId: String, headVersion: String, skip: Int, limit: Int) = + GlobalScope.promise { + modelClient.getHistoryRange( + RepositoryId(repositoryId), + ObjectHash(headVersion), + skip.toLong(), + limit.toLong(), + ) + .filterIsInstance() + .map { + VersionInformationJS( + it.author, + it.getTimestamp()?.toJSDate(), + it.getObjectHash().toString(), + ) + } + .toTypedArray() + } + override fun dispose() { modelClient.close() } diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt new file mode 100644 index 0000000000..5f50e85196 --- /dev/null +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt @@ -0,0 +1,440 @@ +package org.modelix.datastructures.model + +import kotlinx.datetime.Instant +import org.modelix.datastructures.objects.IObjectData +import org.modelix.datastructures.objects.IObjectDeserializer +import org.modelix.datastructures.objects.IObjectGraph +import org.modelix.datastructures.objects.IObjectReferenceFactory +import org.modelix.datastructures.objects.Object +import org.modelix.datastructures.objects.ObjectReference +import org.modelix.datastructures.objects.asObject +import org.modelix.datastructures.objects.getHashString +import org.modelix.datastructures.objects.requestBoth +import org.modelix.kotlin.utils.DelicateModelixApi +import org.modelix.kotlin.utils.urlDecode +import org.modelix.kotlin.utils.urlEncode +import org.modelix.model.lazy.CLVersion +import org.modelix.model.persistent.CPVersion +import org.modelix.model.persistent.Separators +import org.modelix.streams.IStream +import org.modelix.streams.flatten +import org.modelix.streams.getBlocking +import org.modelix.streams.plus +import kotlin.math.abs +import kotlin.math.max +import kotlin.math.min +import kotlin.time.Duration + +sealed class HistoryIndexNode : IObjectData { + abstract val firstVersion: ObjectReference + abstract val lastVersion: ObjectReference + abstract val authors: Set + abstract val size: Long + abstract val height: Long + abstract val minTime: Instant + abstract val maxTime: Instant + abstract val timeRange: ClosedRange + + abstract fun getAllVersions(): IStream.Many> + abstract fun getAllVersionsReversed(): IStream.Many> + abstract fun getRange(indexRange: LongRange): IStream.Many + fun getRange(indexRange: IntRange) = getRange(indexRange.first.toLong()..indexRange.last.toLong()) + abstract fun merge(self: Object, otherObj: Object): IStream.One> + + /** + * Each returned node spans at most the duration specified in [interval]. + * For the same interval multiple nodes may be returned. + * Latest entry is returned first. + */ + abstract fun splitAtInterval(interval: Duration): IStream.Many + + fun concat( + self: Object, + otherObj: Object, + ): Object { + require(self.data === this) + val other = otherObj.data + require(maxTime < other.minTime) + require(abs(height - other.height) <= 2) + return HistoryIndexRangeNode( + firstVersion = firstVersion, + lastVersion = other.lastVersion, + authors = authors + other.authors, + size = size + other.size, + height = max(height, other.height) + 1, + minTime = minTime, + maxTime = other.maxTime, + child1 = self.ref, + child2 = otherObj.ref, + ).let { + @OptIn(DelicateModelixApi::class) + it.asObject(self.graph) + } + } + + override fun getDeserializer(): IObjectDeserializer<*> { + return HistoryIndexNode + } + + companion object : IObjectDeserializer { + override fun deserialize( + serialized: String, + referenceFactory: IObjectReferenceFactory, + ): HistoryIndexNode { + val parts = serialized.split(Separators.LEVEL1) + return when (parts[0]) { + "R" -> { + val versionRefs = parts[1].split(Separators.LEVEL2) + .map { referenceFactory.fromHashString(it, CPVersion) } + val times = parts[5].split(Separators.LEVEL2).map { Instant.fromEpochSeconds(it.toLong()) } + return HistoryIndexRangeNode( + firstVersion = versionRefs[0], + lastVersion = versionRefs.getOrElse(1) { versionRefs[0] }, + authors = parts[2].split(Separators.LEVEL2).mapNotNull { it.urlDecode() }.toSet(), + size = parts[3].toLong(), + height = parts[4].toLong(), + minTime = times[0], + maxTime = times.getOrElse(1) { times[0] }, + child1 = referenceFactory.fromHashString(parts[6], HistoryIndexNode), + child2 = referenceFactory.fromHashString(parts[7], HistoryIndexNode), + ) + } + "L" -> { + HistoryIndexLeafNode( + versions = parts[1].split(Separators.LEVEL2) + .map { referenceFactory.fromHashString(it, CPVersion) }, + authors = parts[2].split(Separators.LEVEL2).mapNotNull { it.urlDecode() }.toSet(), + time = Instant.fromEpochSeconds(parts[3].toLong()), + ) + } + else -> error("Unknown type: " + parts[0]) + } + } + + fun of(version: Object): HistoryIndexNode { + val time = CLVersion(version).getTimestamp() ?: Instant.Companion.fromEpochMilliseconds(0L) + return HistoryIndexLeafNode( + versions = listOf(version.ref), + authors = setOfNotNull(version.data.author), + time = time, + ) + } + + fun of(version1: Object, version2: Object): HistoryIndexNode { + return of(version1).asObject(version1.graph).merge(of(version2).asObject(version2.graph)).getBlocking(version1.graph).data + } + } +} + +fun Object.merge(otherObj: Object) = data.merge(this, otherObj) +fun IStream.One>.merge(otherObj: Object) = flatMapOne { it.merge(otherObj) } +fun Object.concatUnbalanced(otherObj: Object): Object = data.concat(this, otherObj) +val Object.time get() = data.time + +data class HistoryIndexLeafNode( + val versions: List>, + override val authors: Set, + val time: Instant, +) : HistoryIndexNode() { + override val size: Long get() = versions.size.toLong() + override val height: Long get() = 1 + override val minTime: Instant get() = time + override val maxTime: Instant get() = time + override val timeRange: ClosedRange get() = minTime..maxTime + override val firstVersion: ObjectReference get() = versions.first() + override val lastVersion: ObjectReference get() = versions.last() + + override fun getAllVersions(): IStream.Many> { + return IStream.many(versions) + } + + override fun getAllVersionsReversed(): IStream.Many> { + return IStream.many(versions.asReversed()) + } + + override fun getRange(indexRange: LongRange): IStream.Many { + if (indexRange.first == 0L && indexRange.size() == versions.size.toLong()) { + return IStream.of(this) + } else { + return IStream.many( + versions.asReversed() + .drop(indexRange.first.toInt()) + .take(indexRange.size().toInt()), + ) + .flatMapOrdered { it.resolve() } + .map { + HistoryIndexLeafNode( + versions = listOf(it.ref), + authors = setOfNotNull(it.data.author), + time = time, + ) + } + } + } + + override fun getContainmentReferences(): List> { + return versions + } + + override fun objectDiff( + self: Object<*>, + oldObject: Object<*>?, + ): IStream.Many> { + TODO("Not yet implemented") + } + + override fun serialize(): String { + return "L" + + Separators.LEVEL1 + versions.joinToString(Separators.LEVEL2) { it.getHashString() } + + Separators.LEVEL1 + authors.joinToString(Separators.LEVEL2) { it.urlEncode() } + + Separators.LEVEL1 + time.epochSeconds.toString() + } + + override fun merge( + self: Object, + otherObj: Object, + ): IStream.One> { + val other = otherObj.data + return when (other) { + is HistoryIndexLeafNode -> { + when { + other.time < time -> otherObj.concatBalanced(self) + other.time > time -> self.concatBalanced(otherObj) + else -> HistoryIndexLeafNode( + versions = (versions + other.versions).distinctBy { it.getHash() }.toList(), + authors = authors + other.authors, + time = time, + ).asObject(self.graph).let { IStream.of(it) } + } + } + is HistoryIndexRangeNode -> { + otherObj.merge(self) + } + } + } + + override fun splitAtInterval(interval: Duration): IStream.Many { + TODO("Not yet implemented") + } +} + +/** + * The subranges can overlap. + */ +data class HistoryIndexRangeNode( + /** + * if subrange1 is set, then it's the same as subrange1.firstVersion + */ + override val firstVersion: ObjectReference, + /** + * if subrange2 is set, then it's the same as subrange2.lastVersion + */ + override val lastVersion: ObjectReference, + + /** + * All authors in this subtree. + */ + override val authors: Set, + + /** + * Number of versions in this subtree. + */ + override val size: Long, + + override val height: Long, + + override val minTime: Instant, + + override val maxTime: Instant, + + val child1: ObjectReference, + val child2: ObjectReference, +) : HistoryIndexNode() { + + init { + require(firstVersion.getHash() != lastVersion.getHash()) + require(minTime < maxTime) + require(height > 1L) + require(size > 1L) + require(child1.getHash() != child2.getHash()) + } + + private val graph: IObjectGraph get() = firstVersion.graph + override val timeRange: ClosedRange get() = minTime..maxTime + + override fun getDeserializer(): IObjectDeserializer<*> { + return HistoryIndexNode + } + + override fun getContainmentReferences(): List> { + return listOf(child1, child2) + } + + override fun objectDiff( + self: Object<*>, + oldObject: Object<*>?, + ): IStream.Many> { + TODO("Not yet implemented") + } + + override fun merge(self: Object, otherObj: Object): IStream.One> { + val self = self as Object + val other = otherObj.data + return child1.requestBoth(child2) { resolvedChild1, resolvedChild2 -> + when (other) { + is HistoryIndexLeafNode -> { + val range1 = resolvedChild1.data.timeRange + val range2 = resolvedChild2.data.timeRange + when { + other.time < range1.start -> otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2) + other.time <= range1.endInclusive -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2) + other.time < range2.start -> if (resolvedChild1.size <= resolvedChild2.size) { + resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2) + } else { + resolvedChild1.concatBalanced(otherObj.concatBalanced(resolvedChild2)) + } + other.time <= range2.endInclusive -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj)) + else -> resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj)) + } + } + is HistoryIndexRangeNode -> { + val range1 = resolvedChild1.data.timeRange + val range2 = resolvedChild2.data.timeRange + val intersects1 = other.timeRange.intersects(range1) + val intersects2 = other.timeRange.intersects(range2) + when { + intersects1 && intersects2 -> { + resolvedChild1.merge(otherObj).merge(resolvedChild2) + } + intersects1 -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2) + intersects2 -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj)) + other.maxTime < range1.start -> { + if (other.size < resolvedChild2.size) { + otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2) + } else { + otherObj.concatBalanced(self) + } + } + other.maxTime < range2.start -> { + if (resolvedChild2.size < resolvedChild1.size) { + resolvedChild1.concatBalanced(otherObj.concatBalanced(resolvedChild2)) + } else { + resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2) + } + } + else -> { + if (other.size < resolvedChild1.size) { + resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj)) + } else { + self.concatBalanced(otherObj) + } + } + } + } + } + }.flatten() + } + + override fun splitAtInterval(interval: Duration): IStream.Many { + TODO("Not yet implemented") + } + + /** + * Oldest version first + */ + override fun getAllVersions(): IStream.Many> { + return IStream.of(child1, child2) + .flatMapOrdered { it.resolve() } + .flatMapOrdered { it.data.getAllVersions() } + } + + /** + * Latest version first + */ + override fun getAllVersionsReversed(): IStream.Many> { + return IStream.of(child2, child1) + .flatMapOrdered { it.resolve() } + .flatMapOrdered { it.data.getAllVersionsReversed() } + } + + /** + * Latest element has index 0 + */ + override fun getRange(indexRange: LongRange): IStream.Many { + if (indexRange.isEmpty()) return IStream.empty() + if (!indexRange.intersects(0L until size)) return IStream.empty() + if (indexRange.contains(0L) && indexRange.contains(size - 1L)) return IStream.of(this) + return child1.requestBoth(child2) { resolvedChild1, resolvedChild2 -> + val validRange2 = 0L.rangeOfSize(resolvedChild2.size) + val validRange1 = (validRange2.last + 1L).rangeOfSize(resolvedChild1.size) + val range2 = indexRange.intersect(validRange2) + val range1 = indexRange.intersect(validRange1).shift(-resolvedChild2.size) + resolvedChild2.data.getRange(range2) + resolvedChild1.data.getRange(range1) + }.flatten() + } + + override fun serialize(): String { + return "R" + + Separators.LEVEL1 + firstVersion.getHashString() + Separators.LEVEL2 + lastVersion.getHashString() + + Separators.LEVEL1 + authors.joinToString(Separators.LEVEL2) { it.urlEncode() } + + Separators.LEVEL1 + size + + Separators.LEVEL1 + height + + Separators.LEVEL1 + minTime.epochSeconds.toString() + Separators.LEVEL2 + maxTime.epochSeconds + + Separators.LEVEL1 + child1.getHashString() + + Separators.LEVEL1 + child2.getHashString() + } +} + +private fun LongRange.shift(amount: Long) = first.plus(amount)..last.plus(amount) +private fun > ClosedRange.intersects(other: ClosedRange): Boolean { + return this.contains(other.start) || this.contains(other.endInclusive) || + other.contains(this.start) || other.contains(this.endInclusive) +} + +val Object.size get() = data.size +val Object.height get() = data.height +fun LongRange.size() = (last - first + 1).coerceAtLeast(0) +fun Long.rangeOfSize(size: Long) = this until (this + size) +fun LongRange.intersect(other: LongRange): LongRange { + return if (this.first > other.first) other.intersect(this) else other.first..min(this.last, other.last) +} + +private fun Object.rebalance(otherObj: Object): IStream.One, Object>> { + return if (otherObj.height > height + 1) { + (otherObj.data as HistoryIndexRangeNode).child1.requestBoth((otherObj.data as HistoryIndexRangeNode).child2) { split1, split2 -> + this.rebalance(split1).map { rebalanced -> + if (rebalanced.first.height <= split2.height) { + rebalanced.first.concatUnbalanced(rebalanced.second) to split2 + } else { + rebalanced.first to rebalanced.second.concatUnbalanced(split2) + } + } + }.flatten() + } else if (height > otherObj.height + 1) { + (this.data as HistoryIndexRangeNode).child1.requestBoth((this.data as HistoryIndexRangeNode).child2) { split1, split2 -> + split2.rebalance(otherObj).map { rebalanced -> + if (rebalanced.second.height > split1.height) { + split1.concatUnbalanced(rebalanced.first) to rebalanced.second + } else { + split1 to rebalanced.first.concatUnbalanced(rebalanced.second) + } + } + }.flatten() + } else { + IStream.of(this to otherObj) + } +} + +private fun Object.concatBalanced(otherObj: Object): IStream.One> { + return this.rebalance(otherObj).map { rebalanced -> + rebalanced.first.concatUnbalanced(rebalanced.second) + } +} + +private fun IStream.One>.concatBalanced(otherObj: Object): IStream.One> { + return flatMapOne { it.concatBalanced(otherObj) } +} + +private fun Object.concatBalanced(otherObj: IStream.One>): IStream.One> { + return otherObj.flatMapOne { this.concatBalanced(it) } +} diff --git a/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt b/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt new file mode 100644 index 0000000000..556018769c --- /dev/null +++ b/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt @@ -0,0 +1,233 @@ +import kotlinx.datetime.Instant +import org.modelix.datastructures.model.HistoryIndexNode +import org.modelix.datastructures.model.IGenericModelTree +import org.modelix.datastructures.model.height +import org.modelix.datastructures.model.merge +import org.modelix.datastructures.model.size +import org.modelix.datastructures.objects.FullyLoadedObjectGraph +import org.modelix.datastructures.objects.Object +import org.modelix.datastructures.objects.asObject +import org.modelix.model.IVersion +import org.modelix.model.TreeId +import org.modelix.model.lazy.CLVersion +import org.modelix.streams.getBlocking +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.seconds + +class HistoryIndexTest { + + private fun createInitialVersion(): CLVersion { + val emptyTree = IGenericModelTree.builder() + .withInt64Ids() + .treeId(TreeId.fromUUID("69dcb381-3dba-4251-b3ae-7aafe587c28e")) + .graph(FullyLoadedObjectGraph()) + .build() + + return IVersion.builder() + .tree(emptyTree) + .time(Instant.fromEpochMilliseconds(1757582404557L)) + .build() as CLVersion + } + + private fun newVersion( + base: CLVersion, + author: String? = "unit-test@modelix.org", + timeDeltaSeconds: Long = 1L, + ): CLVersion { + return IVersion.builder() + .tree(base.getModelTree()) + .author(author) + .time(base.getTimestamp()!!.plus(timeDeltaSeconds.seconds)) + .build() as CLVersion + } + + @Test + fun `1 version`() { + val history = HistoryIndexNode.of(createInitialVersion().obj) + assertEquals(1, history.size) + assertEquals(1, history.height) + } + + @Test + fun `2 versions`() { + val version1 = createInitialVersion() + val version2 = newVersion(version1) + val history = HistoryIndexNode.of(version1.obj, version2.obj) + assertEquals(2, history.size) + assertEquals(2, history.height) + } + + @Test + fun `3 versions`() { + val version1 = createInitialVersion() + val version2 = newVersion(version1) + val version3 = newVersion(version2) + val graph = version1.graph + val history1 = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) + val history2 = HistoryIndexNode.of(version3.obj).asObject(graph) + val history = history1.merge(history2).getBlocking(graph) + assertEquals(3, history.size) + assertEquals(3, history.height) + } + + @Test + fun `4 versions`() { + val version1 = createInitialVersion() + val version2 = newVersion(version1) + val version3 = newVersion(version2) + val version4 = newVersion(version3) + val graph = version1.graph + val history = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) + .merge(HistoryIndexNode.of(version3.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version4.obj).asObject(graph)) + .getBlocking(graph) + assertEquals(4, history.size) + assertEquals(4, history.height) + } + + @Test + fun `5 versions`() { + val version1 = createInitialVersion() + val version2 = newVersion(version1) + val version3 = newVersion(version2) + val version4 = newVersion(version3) + val version5 = newVersion(version4) + val graph = version1.graph + val history = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) + .merge(HistoryIndexNode.of(version3.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version4.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version5.obj).asObject(graph)) + .getBlocking(graph) + assertEquals(5, history.size) + assertEquals(4, history.height) + } + + @Test + fun `branch and merge`() { + val version1 = createInitialVersion() + val version2 = newVersion(version1, author = "user0") + + val version3a = newVersion(version2, author = "user1") + val version4a = newVersion(version3a, author = "user1") + val version5a = newVersion(version4a, author = "user1") + + val version3b = newVersion(version2, author = "user2") + val version4b = newVersion(version3b, author = "user2") + val version5b = newVersion(version4b, author = "user2") + val version6b = newVersion(version5b, author = "user2") + + val version7 = IVersion.builder() + .tree(version1.getModelTree()) + .time(version6b.getTimestamp()!!.plus(1.seconds)) + .autoMerge(version2, version5a, version6b) + .build() as CLVersion + val version8 = newVersion(version7, author = "user3") + + val graph = version1.graph + val historyA = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) + .merge(HistoryIndexNode.of(version3a.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version4a.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version5a.obj).asObject(graph)) + val historyB = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) + .merge(HistoryIndexNode.of(version3b.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version4b.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version5b.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version6b.obj).asObject(graph)) + .getBlocking(graph) + val history = historyA + .merge(historyB) + .merge(HistoryIndexNode.of(version7.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version8.obj).asObject(graph)) + .getBlocking(graph) + + assertEquals( + listOf(version1, version2, version3a, version3b, version4a, version4b, version5a, version5b, version6b, version7, version8).map { it.getObjectHash() }, + history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), + ) + assertEquals(11, history.size) + assertEquals(5, history.height) + } + + @Test + fun `large history linear`() { + val versions = mutableListOf(createInitialVersion()) + repeat(1000) { + versions.add(newVersion(versions.last())) + } + val graph = versions.first().graph + val history = versions.drop(1).fold(HistoryIndexNode.of(versions.first().asObject()).asObject(graph)) { acc, it -> + acc.merge(HistoryIndexNode.of(it.asObject()).asObject(graph)).getBlocking(graph) + } + assertEquals(versions.size.toLong(), history.size) + assertEquals( + versions.map { it.getObjectHash() }, + history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), + ) + assertEquals(11, history.height) + } + + @Test + fun `large history shuffled`() { + val versions = mutableListOf(createInitialVersion()) + repeat(1000) { + versions.add(newVersion(versions.last())) + } + val graph = versions.first().graph + val history = versions.drop(1).shuffled(Random(78234554)).fold(HistoryIndexNode.of(versions.first().asObject()).asObject(graph)) { acc, it -> + acc.merge(HistoryIndexNode.of(it.asObject()).asObject(graph)).getBlocking(graph) + } + assertEquals(versions.size.toLong(), history.size) + assertEquals( + versions.map { it.getObjectHash() }, + history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), + ) + assertEquals(13, history.height) + } + + @Test + fun `large history linear recursive`() { + val versions = mutableListOf(createInitialVersion()) + repeat(1000) { + versions.add(newVersion(versions.last())) + } + val graph = versions.first().graph + + val history = buildHistory(versions) + assertEquals(versions.size.toLong(), history.size) + assertEquals( + versions.map { it.getObjectHash() }, + history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), + ) + assertEquals(11, history.height) + } + + @Test + fun `large history shuffled recursive`() { + val versions = mutableListOf(createInitialVersion()) + repeat(1000) { + versions.add(newVersion(versions.last())) + } + val graph = versions.first().graph + + val history = buildHistory(versions.shuffled(Random(78234554))) + assertEquals(versions.size.toLong(), history.size) + assertEquals( + versions.map { it.getObjectHash() }, + history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), + ) + assertEquals(14, history.height) + } + + private fun buildHistory(versions: List): Object { + val graph = versions.first().graph + if (versions.size == 1) { + return HistoryIndexNode.of(versions.single().obj).asObject(graph) + } + val centerIndex = versions.size / 2 + return buildHistory(versions.subList(0, centerIndex)) + .merge(buildHistory(versions.subList(centerIndex, versions.size))) + .getBlocking(graph) + } +} diff --git a/model-server-openapi/specifications/model-server-v2.yaml b/model-server-openapi/specifications/model-server-v2.yaml index 6e973684b1..fa535aa13f 100644 --- a/model-server-openapi/specifications/model-server-v2.yaml +++ b/model-server-openapi/specifications/model-server-v2.yaml @@ -113,6 +113,31 @@ paths: $ref: '#/components/responses/200json' default: $ref: '#/components/responses/GeneralError' + /repositories/{repository}/versions/{versionHash}/history-index: + get: + tags: + - v2 + operationId: getHistoryIndex + parameters: + - name: repository + in: "path" + required: true + schema: + type: string + - name: versionHash + in: "path" + required: true + schema: + type: string + responses: + "403": + $ref: '#/components/responses/403' + "401": + $ref: '#/components/responses/401' + "200": + $ref: '#/components/responses/200' + default: + $ref: '#/components/responses/GeneralError' /repositories/{repository}/objects/getAll: post: tags: diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt index 5de2f15c7f..1aa0dd3b25 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt @@ -188,6 +188,23 @@ class ModelReplicationServer( call.respondText(versionHash) } + override suspend fun RoutingContext.getHistoryIndex( + repository: String, + versionHash: String, + ) { + checkPermission(ModelServerPermissionSchema.repository(repository).objects.read) + val version = repositoriesManager.getVersion(repositoryId(repository), versionHash) + if (version == null) { + throw VersionNotFoundException(versionHash) + } + + @OptIn(RequiresTransaction::class) + val index = runWrite { + (repositoriesManager as RepositoriesManager).getOrCreateHistoryIndex(repositoryId(repository), version) + } + call.respondText(index.getHashString() + "\n" + index.data.serialize()) + } + override suspend fun RoutingContext.initializeRepository( repositoryName: String, useRoleIds: Boolean?, diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index 487b93d25f..b6f23b5b54 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -3,9 +3,14 @@ package org.modelix.model.server.handlers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.onEach import kotlinx.datetime.Clock +import org.modelix.datastructures.model.HistoryIndexNode import org.modelix.datastructures.model.IGenericModelTree import org.modelix.datastructures.model.getHash +import org.modelix.datastructures.model.merge import org.modelix.datastructures.model.withIdTranslation +import org.modelix.datastructures.objects.Object +import org.modelix.datastructures.objects.ObjectHash +import org.modelix.datastructures.objects.asObject import org.modelix.model.IVersion import org.modelix.model.ModelMigrations import org.modelix.model.ObjectDeltaFilter @@ -393,6 +398,32 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { if (!isolated) { stores.genericStore.put(legacyBranchKey(branch), hash, false) } + if (hash != null) { + // eager update of the index + getVersion(branch.repositoryId, hash)?.let { getOrCreateHistoryIndex(branch.repositoryId, it) } + } + } + + @RequiresTransaction + fun getOrCreateHistoryIndex(repositoryId: RepositoryId, version: CLVersion): Object { + val key = versionHistoryKey(repositoryId, version.getObjectHash()) + val indexHash = stores.genericStore.get(key)?.let { ObjectHash(it) } + val graph = stores.getAsyncStore(repositoryId.takeIf { isIsolated(it) ?: false }).asObjectGraph() + if (indexHash != null) { + return graph.fromHash(indexHash, HistoryIndexNode).resolveNow() + } else { + val parentIndices = version.getParentVersions().map { getOrCreateHistoryIndex(repositoryId, it as CLVersion) } + val newElement = HistoryIndexNode.of(version.obj).asObject(graph) + val newIndex = when (parentIndices.size) { + 0 -> newElement + 1 -> parentIndices.single().merge(newElement).getBlocking(graph) + 2 -> parentIndices[0].merge(parentIndices[1]).merge(newElement).getBlocking(graph) + else -> error("impossible") + } + newIndex.write() + stores.genericStore.put(key, newIndex.getHashString()) + return newIndex + } } override suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String { @@ -485,6 +516,14 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { } } + private fun versionHistoryKey(repositoryId: RepositoryId, versionHash: ObjectHash, isolated: Boolean = isIsolated(repositoryId) ?: true): ObjectInRepository { + return if (isolated) { + ObjectInRepository(repositoryId.id, "$KEY_PREFIX:historyIndex:$versionHash") + } else { + ObjectInRepository.global("$KEY_PREFIX:repositories:${repositoryId.id}:historyIndex:$versionHash") + } + } + private fun legacyBranchKey(branchReference: BranchReference): ObjectInRepository { check(isIsolated(branchReference.repositoryId) != true) { "Not a legacy repository: " + branchReference.repositoryId } return ObjectInRepository.global(branchReference.getKey()) diff --git a/model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt new file mode 100644 index 0000000000..cd1b767c96 --- /dev/null +++ b/model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt @@ -0,0 +1,120 @@ +package org.modelix.model.server + +import io.ktor.server.testing.ApplicationTestBuilder +import io.ktor.server.testing.testApplication +import mu.KotlinLogging +import org.modelix.model.IVersion +import org.modelix.model.client2.IModelClientV2 +import org.modelix.model.historyAsSequence +import org.modelix.model.lazy.RepositoryId +import org.modelix.model.server.api.RepositoryConfig +import org.modelix.model.server.handlers.IdsApiImpl +import org.modelix.model.server.handlers.ModelReplicationServer +import org.modelix.model.server.handlers.RepositoriesManager +import org.modelix.model.server.store.InMemoryStoreClient +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.seconds + +private val LOG = KotlinLogging.logger { } + +class HistoryIndexTest { + + private lateinit var statistics: StoreClientWithStatistics + private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication { + application { + try { + installDefaultServerPlugins() + statistics = StoreClientWithStatistics(InMemoryStoreClient()) + val repoManager = RepositoriesManager(statistics) + ModelReplicationServer(repoManager).init(this) + IdsApiImpl(repoManager).init(this) + } catch (ex: Throwable) { + LOG.error("", ex) + } + } + block() + } + + @Test fun pagination_0_0() = runPaginationTest(0, 0) + + @Test fun pagination_0_1() = runPaginationTest(0, 1) + + @Test fun pagination_0_2() = runPaginationTest(0, 2) + + @Test fun pagination_0_3() = runPaginationTest(0, 3) + + @Test fun pagination_0_4() = runPaginationTest(0, 4) + + @Test fun pagination_0_5() = runPaginationTest(0, 5) + + @Test fun pagination_0_6() = runPaginationTest(0, 6) + + @Test fun pagination_0_7() = runPaginationTest(0, 7) + + @Test fun pagination_0_8() = runPaginationTest(0, 8) + + @Test fun pagination_0_9() = runPaginationTest(0, 9) + + @Test fun pagination_0_10() = runPaginationTest(0, 10) + + @Test fun pagination_10_10() = runPaginationTest(10, 10) + + @Test fun pagination_137_47() = runPaginationTest(137, 47) + + @Test fun pagination_138_47() = runPaginationTest(138, 47) + + @Test fun pagination_139_47() = runPaginationTest(139, 47) + + @Test fun pagination_140_47() = runPaginationTest(140, 47) + + @Test fun pagination_200_10() = runPaginationTest(200, 10) + + @Test fun pagination_201_10() = runPaginationTest(201, 10) + + @Test fun pagination_202_10() = runPaginationTest(202, 10) + + @Test fun pagination_201_201() = runPaginationTest(201, 201) + + private fun runPaginationTest(skip: Int, limit: Int) = runTest { + val rand = Random(8923345) + val modelClient: IModelClientV2 = createModelClient() + val repositoryId = RepositoryId("repo1") + val branchRef = repositoryId.getBranchReference() + val initialVersion = modelClient.initRepository(RepositoryConfig(repositoryId = repositoryId.id, repositoryName = repositoryId.id, modelId = "61bd6cb0-33ff-45d8-9d1b-2149fdb01d16")) + var currentVersion = initialVersion + + repeat(100) { + run { + val newVersion = IVersion.builder() + .baseVersion(currentVersion) + .tree(currentVersion.getModelTree()) + .author("user1") + .time(currentVersion.getTimestamp()!! + rand.nextInt(0, 3).seconds) + .build() + currentVersion = modelClient.push(branchRef, newVersion, currentVersion) + } + run { + val newVersion = IVersion.builder() + .baseVersion(currentVersion) + .tree(currentVersion.getModelTree()) + .author("user2") + .time(currentVersion.getTimestamp()!! + rand.nextInt(0, 3).seconds) + .build() + currentVersion = modelClient.push(branchRef, newVersion, currentVersion) + } + } + + val expectedOrder = currentVersion.historyAsSequence().map { it.getContentHash() }.toList() + + val history = modelClient.getHistoryRange( + repositoryId = repositoryId, + headVersion = currentVersion.getObjectHash(), + skip = skip.toLong(), + limit = limit.toLong(), + ) + assertEquals(expectedOrder.drop(skip).take(limit).toSet(), history.map { it.getContentHash() }.toSet()) + assertEquals(expectedOrder.drop(skip).take(limit), history.map { it.getContentHash() }) + } +} diff --git a/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt index 70657b962e..2cede1f3d4 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt @@ -148,7 +148,7 @@ private object DepthFirstSearchPattern : AccessPattern { private object StreamBasedApi : AccessPattern { override suspend fun runPattern(rootNode: INode) { val asyncNode = rootNode.asAsyncNode() - asyncNode.querySuspending { asyncNode.getDescendants(true).count() } + asyncNode.querySuspending { asyncNode.getDescendantsUnordered(true).count() } } } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/CollectionAsStream.kt b/streams/src/commonMain/kotlin/org/modelix/streams/CollectionAsStream.kt index 72bc521148..96d58cd2a4 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/CollectionAsStream.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/CollectionAsStream.kt @@ -27,8 +27,12 @@ open class CollectionAsStream(val collection: Collection) : IStream.Many flatMap(mapper: (E) -> IStream.Many): IStream.Many { - return convertLater().flatMap(mapper) + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { + return convertLater().flatMapOrdered(mapper) + } + + override fun flatMapUnordered(mapper: (E) -> IStream.Many): IStream.Many { + return convertLater().flatMapUnordered(mapper) } override fun flatMapIterable(mapper: (E) -> Iterable): IStream.Many { diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/DeferredStreamBuilder.kt b/streams/src/commonMain/kotlin/org/modelix/streams/DeferredStreamBuilder.kt index 76dcb812f6..fe6f1efbde 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/DeferredStreamBuilder.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/DeferredStreamBuilder.kt @@ -162,9 +162,15 @@ class DeferredStreamBuilder : IStreamBuilder { return ConvertibleMany { convert(it).map(mapper) } } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapUnordered(mapper: (E) -> IStream.Many): IStream.Many { return ConvertibleMany { converter -> - convert(converter).flatMap { mapper(it).convert(converter) } + convert(converter).flatMapUnordered { mapper(it).convert(converter) } + } + } + + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { + return ConvertibleMany { converter -> + convert(converter).flatMapOrdered { mapper(it).convert(converter) } } } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/EmptyStream.kt b/streams/src/commonMain/kotlin/org/modelix/streams/EmptyStream.kt index ba0b0258fb..39bd09e4d8 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/EmptyStream.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/EmptyStream.kt @@ -69,7 +69,7 @@ class EmptyStream : IStreamInternal.ZeroOrOne { @DelicateModelixApi override fun iterateBlocking(visitor: (E) -> Unit) {} - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return EmptyStream() } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt b/streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt index caef35f20c..b63ed1bb77 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt @@ -13,6 +13,7 @@ import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.flatMapConcat +import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.fold @@ -187,7 +188,11 @@ class FlowStreamBuilder() : IStreamBuilder { return Wrapper(wrapped.flatMapConcat { convert(mapper(it)) }) } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapUnordered(mapper: (E) -> IStream.Many): IStream.Many { + return Wrapper(wrapped.flatMapMerge { convert(mapper(it)) }) + } + + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return Wrapper(wrapped.flatMapConcat { convert(mapper(it)) }) } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/IStream.kt b/streams/src/commonMain/kotlin/org/modelix/streams/IStream.kt index a1e5e42301..315f46cfe1 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/IStream.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/IStream.kt @@ -27,7 +27,22 @@ interface IStream { fun filter(predicate: (E) -> Boolean): Many fun map(mapper: (E) -> R): Many fun mapNotNull(mapper: (E) -> R?): Many = map(mapper).filterNotNull() - fun flatMap(mapper: (E) -> Many): Many + + @Deprecated("Use flatMapOrdered or flatMapUnordered") + fun flatMap(mapper: (E) -> Many): Many = flatMapOrdered(mapper) + + /** + * Output elements are only emitted after all output elements of the previous input are emitted. + * Can have a lower performance than [flatMapUnordered]. + */ + fun flatMapOrdered(mapper: (E) -> Many): Many + + /** + * Output elements are emitted as soon as possible. + * Can have a higher performance than [flatMapOrdered]. + */ + fun flatMapUnordered(mapper: (E) -> Many): Many = flatMapOrdered(mapper) + fun flatMapIterable(mapper: (E) -> Iterable): Many = flatMap { IStream.many(mapper(it)) } fun concat(other: Many<@UnsafeVariance E>): Many fun concat(other: OneOrMany<@UnsafeVariance E>): OneOrMany diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt b/streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt index 2cb20fd3ee..e577f63fa8 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt @@ -261,8 +261,12 @@ class ReaktiveStreamBuilder() : IStreamBuilder { throw UnsupportedOperationException("Use IStreamExecutor.iterate") } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { - return WrapperMany(wrapped.flatMap { mapper(it).toReaktive() }) + override fun flatMapUnordered(mapper: (E) -> IStream.Many): IStream.Many { + return WrapperMany(wrapped.flatMap(maxConcurrency = Int.MAX_VALUE) { mapper(it).toReaktive() }) + } + + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { + return WrapperMany(wrapped.flatMap(maxConcurrency = 1) { mapper(it).toReaktive() }) } override fun concat(other: IStream.Many): IStream.Many { @@ -356,7 +360,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder { } override fun flatMapOne(mapper: (E) -> IStream.One): OneOrMany { - return WrapperOneOrMany(wrapped.flatMapSingle { mapper(it).toReaktive() }) + return WrapperOneOrMany(wrapped.flatMapSingle(maxConcurrency = 1) { mapper(it).toReaktive() }) } override fun onErrorReturn(valueSupplier: (Throwable) -> E): OneOrMany { @@ -439,7 +443,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder { return WrapperMaybe(wrapped.flatMapMaybe { mapper(it).toReaktive() }) } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return WrapperMany(wrapped.flatMapObservable { mapper(it).toReaktive() }) } @@ -565,7 +569,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder { throw UnsupportedOperationException("Use IStreamExecutor.iterate") } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return WrapperMany(wrapped.flatMapObservable { mapper(it).toReaktive() }) } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/SequenceAsStream.kt b/streams/src/commonMain/kotlin/org/modelix/streams/SequenceAsStream.kt index 7ddde6cd93..32359db3a3 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/SequenceAsStream.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/SequenceAsStream.kt @@ -49,8 +49,8 @@ open class SequenceAsStream(val wrapped: Sequence) : IStream.Many, IStr return SequenceAsStreamOneOrMany(wrapped.ifEmpty { sequenceOf(alternative()) }) } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { - return convertLater().flatMap(mapper) + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { + return convertLater().flatMapOrdered(mapper) } override fun concat(other: IStream.Many): IStream.Many { diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/SequenceStreamBuilder.kt b/streams/src/commonMain/kotlin/org/modelix/streams/SequenceStreamBuilder.kt index 3b261f5aef..a1118911e8 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/SequenceStreamBuilder.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/SequenceStreamBuilder.kt @@ -188,7 +188,7 @@ class SequenceStreamBuilder() : IStreamBuilder { return Wrapper(wrapped.flatMap { convert(mapper(it)) }) } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return Wrapper(wrapped.flatMap { convert(mapper(it)) }) } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/SingleValueStream.kt b/streams/src/commonMain/kotlin/org/modelix/streams/SingleValueStream.kt index 326cbdcd47..7503d75b2f 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/SingleValueStream.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/SingleValueStream.kt @@ -87,7 +87,7 @@ class SingleValueStream(val value: E) : IStreamInternal.One { return this } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return mapper(value) }