From 4eafd32383febd19934053789492bf99b0c36758 Mon Sep 17 00:00:00 2001 From: Sascha Lisson Date: Sat, 13 Sep 2025 12:17:41 +0200 Subject: [PATCH 1/7] fix(streams): IStream.flatMap replaced by flatMapOrdered/flatMapUnordered The Reaktive based implementation had the semantics of flatMapUnordered while all other implementations had the one of flatMapOrdered. --- .../org/modelix/model/api/async/AsyncNode.kt | 11 +++++++++-- .../org/modelix/model/api/async/IAsyncNode.kt | 1 + .../org/modelix/model/server/LazyLoadingTest.kt | 2 +- .../org/modelix/streams/CollectionAsStream.kt | 8 ++++++-- .../modelix/streams/DeferredStreamBuilder.kt | 10 ++++++++-- .../kotlin/org/modelix/streams/EmptyStream.kt | 2 +- .../org/modelix/streams/FlowStreamBuilder.kt | 7 ++++++- .../kotlin/org/modelix/streams/IStream.kt | 17 ++++++++++++++++- .../modelix/streams/ReaktiveStreamBuilder.kt | 14 +++++++++----- .../org/modelix/streams/SequenceAsStream.kt | 4 ++-- .../modelix/streams/SequenceStreamBuilder.kt | 2 +- .../org/modelix/streams/SingleValueStream.kt | 2 +- 12 files changed, 61 insertions(+), 19 deletions(-) diff --git a/model-api/src/commonMain/kotlin/org/modelix/model/api/async/AsyncNode.kt b/model-api/src/commonMain/kotlin/org/modelix/model/api/async/AsyncNode.kt index 9a482d7b9b..f72d5bf37e 100644 --- a/model-api/src/commonMain/kotlin/org/modelix/model/api/async/AsyncNode.kt +++ b/model-api/src/commonMain/kotlin/org/modelix/model/api/async/AsyncNode.kt @@ -85,10 +85,17 @@ class AsyncNode( override fun getDescendants(includeSelf: Boolean): IStream.Many { return if (includeSelf) { - getStreamExecutor() IStream.of(IStream.of(this), getDescendants(false)).flatten() } else { - getAllChildren().flatMap { it.getDescendants(true) } + getAllChildren().flatMapOrdered { it.getDescendants(true) } + } + } + + override fun getDescendantsUnordered(includeSelf: Boolean): IStream.Many { + return if (includeSelf) { + IStream.of(IStream.of(this), getDescendantsUnordered(false)).flatten() + } else { + getAllChildren().flatMapUnordered { it.getDescendantsUnordered(true) } } } } diff --git a/model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncNode.kt b/model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncNode.kt index 9fe729767c..a1e88a4a14 100644 --- a/model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncNode.kt +++ b/model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncNode.kt @@ -33,6 +33,7 @@ interface IAsyncNode : IStreamExecutorProvider { fun getAllReferenceTargets(): IStream.Many> fun getDescendants(includeSelf: Boolean): IStream.Many + fun getDescendantsUnordered(includeSelf: Boolean): IStream.Many = getDescendants(includeSelf) } interface INodeWithAsyncSupport : INode { diff --git a/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt index 70657b962e..2cede1f3d4 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt @@ -148,7 +148,7 @@ private object DepthFirstSearchPattern : AccessPattern { private object StreamBasedApi : AccessPattern { override suspend fun runPattern(rootNode: INode) { val asyncNode = rootNode.asAsyncNode() - asyncNode.querySuspending { asyncNode.getDescendants(true).count() } + asyncNode.querySuspending { asyncNode.getDescendantsUnordered(true).count() } } } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/CollectionAsStream.kt b/streams/src/commonMain/kotlin/org/modelix/streams/CollectionAsStream.kt index 72bc521148..96d58cd2a4 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/CollectionAsStream.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/CollectionAsStream.kt @@ -27,8 +27,12 @@ open class CollectionAsStream(val collection: Collection) : IStream.Many flatMap(mapper: (E) -> IStream.Many): IStream.Many { - return convertLater().flatMap(mapper) + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { + return convertLater().flatMapOrdered(mapper) + } + + override fun flatMapUnordered(mapper: (E) -> IStream.Many): IStream.Many { + return convertLater().flatMapUnordered(mapper) } override fun flatMapIterable(mapper: (E) -> Iterable): IStream.Many { diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/DeferredStreamBuilder.kt b/streams/src/commonMain/kotlin/org/modelix/streams/DeferredStreamBuilder.kt index 76dcb812f6..fe6f1efbde 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/DeferredStreamBuilder.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/DeferredStreamBuilder.kt @@ -162,9 +162,15 @@ class DeferredStreamBuilder : IStreamBuilder { return ConvertibleMany { convert(it).map(mapper) } } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapUnordered(mapper: (E) -> IStream.Many): IStream.Many { return ConvertibleMany { converter -> - convert(converter).flatMap { mapper(it).convert(converter) } + convert(converter).flatMapUnordered { mapper(it).convert(converter) } + } + } + + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { + return ConvertibleMany { converter -> + convert(converter).flatMapOrdered { mapper(it).convert(converter) } } } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/EmptyStream.kt b/streams/src/commonMain/kotlin/org/modelix/streams/EmptyStream.kt index ba0b0258fb..39bd09e4d8 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/EmptyStream.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/EmptyStream.kt @@ -69,7 +69,7 @@ class EmptyStream : IStreamInternal.ZeroOrOne { @DelicateModelixApi override fun iterateBlocking(visitor: (E) -> Unit) {} - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return EmptyStream() } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt b/streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt index caef35f20c..b63ed1bb77 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt @@ -13,6 +13,7 @@ import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.flatMapConcat +import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.fold @@ -187,7 +188,11 @@ class FlowStreamBuilder() : IStreamBuilder { return Wrapper(wrapped.flatMapConcat { convert(mapper(it)) }) } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapUnordered(mapper: (E) -> IStream.Many): IStream.Many { + return Wrapper(wrapped.flatMapMerge { convert(mapper(it)) }) + } + + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return Wrapper(wrapped.flatMapConcat { convert(mapper(it)) }) } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/IStream.kt b/streams/src/commonMain/kotlin/org/modelix/streams/IStream.kt index a1e5e42301..315f46cfe1 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/IStream.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/IStream.kt @@ -27,7 +27,22 @@ interface IStream { fun filter(predicate: (E) -> Boolean): Many fun map(mapper: (E) -> R): Many fun mapNotNull(mapper: (E) -> R?): Many = map(mapper).filterNotNull() - fun flatMap(mapper: (E) -> Many): Many + + @Deprecated("Use flatMapOrdered or flatMapUnordered") + fun flatMap(mapper: (E) -> Many): Many = flatMapOrdered(mapper) + + /** + * Output elements are only emitted after all output elements of the previous input are emitted. + * Can have a lower performance than [flatMapUnordered]. + */ + fun flatMapOrdered(mapper: (E) -> Many): Many + + /** + * Output elements are emitted as soon as possible. + * Can have a higher performance than [flatMapOrdered]. + */ + fun flatMapUnordered(mapper: (E) -> Many): Many = flatMapOrdered(mapper) + fun flatMapIterable(mapper: (E) -> Iterable): Many = flatMap { IStream.many(mapper(it)) } fun concat(other: Many<@UnsafeVariance E>): Many fun concat(other: OneOrMany<@UnsafeVariance E>): OneOrMany diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt b/streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt index 2cb20fd3ee..e577f63fa8 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt @@ -261,8 +261,12 @@ class ReaktiveStreamBuilder() : IStreamBuilder { throw UnsupportedOperationException("Use IStreamExecutor.iterate") } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { - return WrapperMany(wrapped.flatMap { mapper(it).toReaktive() }) + override fun flatMapUnordered(mapper: (E) -> IStream.Many): IStream.Many { + return WrapperMany(wrapped.flatMap(maxConcurrency = Int.MAX_VALUE) { mapper(it).toReaktive() }) + } + + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { + return WrapperMany(wrapped.flatMap(maxConcurrency = 1) { mapper(it).toReaktive() }) } override fun concat(other: IStream.Many): IStream.Many { @@ -356,7 +360,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder { } override fun flatMapOne(mapper: (E) -> IStream.One): OneOrMany { - return WrapperOneOrMany(wrapped.flatMapSingle { mapper(it).toReaktive() }) + return WrapperOneOrMany(wrapped.flatMapSingle(maxConcurrency = 1) { mapper(it).toReaktive() }) } override fun onErrorReturn(valueSupplier: (Throwable) -> E): OneOrMany { @@ -439,7 +443,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder { return WrapperMaybe(wrapped.flatMapMaybe { mapper(it).toReaktive() }) } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return WrapperMany(wrapped.flatMapObservable { mapper(it).toReaktive() }) } @@ -565,7 +569,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder { throw UnsupportedOperationException("Use IStreamExecutor.iterate") } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return WrapperMany(wrapped.flatMapObservable { mapper(it).toReaktive() }) } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/SequenceAsStream.kt b/streams/src/commonMain/kotlin/org/modelix/streams/SequenceAsStream.kt index 7ddde6cd93..32359db3a3 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/SequenceAsStream.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/SequenceAsStream.kt @@ -49,8 +49,8 @@ open class SequenceAsStream(val wrapped: Sequence) : IStream.Many, IStr return SequenceAsStreamOneOrMany(wrapped.ifEmpty { sequenceOf(alternative()) }) } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { - return convertLater().flatMap(mapper) + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { + return convertLater().flatMapOrdered(mapper) } override fun concat(other: IStream.Many): IStream.Many { diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/SequenceStreamBuilder.kt b/streams/src/commonMain/kotlin/org/modelix/streams/SequenceStreamBuilder.kt index 3b261f5aef..a1118911e8 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/SequenceStreamBuilder.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/SequenceStreamBuilder.kt @@ -188,7 +188,7 @@ class SequenceStreamBuilder() : IStreamBuilder { return Wrapper(wrapped.flatMap { convert(mapper(it)) }) } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return Wrapper(wrapped.flatMap { convert(mapper(it)) }) } diff --git a/streams/src/commonMain/kotlin/org/modelix/streams/SingleValueStream.kt b/streams/src/commonMain/kotlin/org/modelix/streams/SingleValueStream.kt index 326cbdcd47..7503d75b2f 100644 --- a/streams/src/commonMain/kotlin/org/modelix/streams/SingleValueStream.kt +++ b/streams/src/commonMain/kotlin/org/modelix/streams/SingleValueStream.kt @@ -87,7 +87,7 @@ class SingleValueStream(val value: E) : IStreamInternal.One { return this } - override fun flatMap(mapper: (E) -> IStream.Many): IStream.Many { + override fun flatMapOrdered(mapper: (E) -> IStream.Many): IStream.Many { return mapper(value) } From 8a304903777be28f9c3cfb59562f02f4ed90aaf5 Mon Sep 17 00:00:00 2001 From: Sascha Lisson Date: Thu, 11 Sep 2025 12:34:34 +0200 Subject: [PATCH 2/7] feat: index for efficient history queries --- .../modelix/model/client2/IModelClientV2.kt | 39 ++ .../modelix/model/client2/ModelClientGraph.kt | 8 +- .../modelix/model/client2/ModelClientV2.kt | 106 +++++ .../datastructures/model/HistoryIndexNode.kt | 408 ++++++++++++++++++ .../src/commonTest/kotlin/HistoryIndexTest.kt | 224 ++++++++++ .../specifications/model-server-v2.yaml | 25 ++ .../server/handlers/ModelReplicationServer.kt | 17 + .../server/handlers/RepositoriesManager.kt | 39 ++ .../modelix/model/server/HistoryIndexTest.kt | 118 +++++ 9 files changed, 982 insertions(+), 2 deletions(-) create mode 100644 model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt create mode 100644 model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt create mode 100644 model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt index b74ed741b7..a151ca9a78 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt @@ -1,6 +1,7 @@ package org.modelix.model.client2 import io.ktor.http.Url +import org.modelix.datastructures.objects.ObjectHash import org.modelix.kotlin.utils.DeprecationInfo import org.modelix.model.IVersion import org.modelix.model.ObjectDeltaFilter @@ -11,6 +12,7 @@ import org.modelix.model.lazy.BranchReference import org.modelix.model.lazy.RepositoryId import org.modelix.model.server.api.RepositoryConfig import org.modelix.modelql.core.IMonoStep +import kotlin.time.Duration /** * This interface is meant exclusively for model client usage. @@ -105,4 +107,41 @@ interface IModelClientV2 { suspend fun query(repositoryId: RepositoryId, versionHash: String, body: (IMonoStep) -> IMonoStep): R fun getFrontendUrl(branch: BranchReference): Url + + /** + * @param headVersion starting point for history computations. For a paginated view this value should be the same and + * the value for [skip] should be incremented instead. Only then its guaranteed that the returned list is + * complete. + * @param skip for a paginated view of the history + * @param limit maximum size of the returned list + * @param interval splits the timeline into equally sized intervals and returns only the last version of each interval + */ + suspend fun getHistory( + repositoryId: RepositoryId, + headVersion: ObjectHash, + skip: Int = 0, + limit: Int = 1000, + interval: Duration?, + ): HistoryResponse + + suspend fun getHistoryRange( + repositoryId: RepositoryId, + headVersion: ObjectHash, + skip: Long = 0L, + limit: Long = 1000L, + ): List } + +data class HistoryResponse( + val entries: List, + val nextVersions: List, +) + +data class HistoryEntry( + val firstVersionHash: ObjectHash, + val lastVersionHash: ObjectHash, + val minTime: Long?, + val maxTime: Long?, + val authors: Set, + // val headOfBranch: Set, +) diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientGraph.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientGraph.kt index f7ea88b78d..65c5af097e 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientGraph.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientGraph.kt @@ -128,8 +128,12 @@ class ModelClientGraph( hash: ObjectHash, data: T, ): ObjectReference { - // Should never be called, because all deserialization happens internally. - throw UnsupportedOperationException() + if (config.lazyLoadingEnabled) { + return ObjectReferenceImpl(this, hash, data) + } else { + // Should never be called, because all deserialization happens internally. + throw UnsupportedOperationException() + } } @Synchronized diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt index ab6ca56303..fd8121373e 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt @@ -37,7 +37,9 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch import mu.KotlinLogging +import org.modelix.datastructures.model.HistoryIndexNode import org.modelix.datastructures.objects.IObjectGraph +import org.modelix.datastructures.objects.Object import org.modelix.datastructures.objects.ObjectHash import org.modelix.kotlin.utils.DeprecationInfo import org.modelix.kotlin.utils.WeakValueMap @@ -89,6 +91,8 @@ import org.modelix.model.server.api.v2.toMap import org.modelix.modelql.client.ModelQLClient import org.modelix.modelql.core.IMonoStep import org.modelix.streams.IExecutableStream +import org.modelix.streams.getSuspending +import org.modelix.streams.iterateSuspending import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -328,6 +332,106 @@ class ModelClientV2( } } + override suspend fun getHistory( + repositoryId: RepositoryId, + headVersion: ObjectHash, + skip: Int, + limit: Int, + interval: Duration?, + ): HistoryResponse { + val index: Object = getHistoryIndex(repositoryId, headVersion) + val entries = if (interval != null) { + val mergedEntries = ArrayList() + var previousIntervalId: Long = Long.MAX_VALUE + try { + index.data.splitAtInterval(interval).iterateSuspending(index.graph) { + val intervalId = it.maxTime.toEpochMilliseconds() / interval.inWholeMilliseconds + require(intervalId <= previousIntervalId) + if (intervalId == previousIntervalId) { + val entry = mergedEntries[mergedEntries.lastIndex] + mergedEntries[mergedEntries.lastIndex] = HistoryEntry( + firstVersionHash = it.firstVersion.getHash(), + lastVersionHash = entry.lastVersionHash, + minTime = it.minTime.epochSeconds, + maxTime = entry.maxTime, + authors = entry.authors + it.authors, + ) + } else { + if (mergedEntries.size >= limit) throw LimitedReached() + previousIntervalId = intervalId + mergedEntries += HistoryEntry( + firstVersionHash = it.firstVersion.getHash(), + lastVersionHash = it.lastVersion.getHash(), + minTime = it.minTime.epochSeconds, + maxTime = it.maxTime.epochSeconds, + authors = it.authors, + ) + } + } + } catch (ex: LimitedReached) { + // Expected exception used for exiting the iterateSuspending call + } + mergedEntries + } else { + index.data.getAllVersionsReversed().flatMapOrdered { it.resolve() }.map { CLVersion(it) } + .map { + val hash = it.getObjectHash() + val time = it.getTimestamp()?.epochSeconds + HistoryEntry( + firstVersionHash = hash, + lastVersionHash = hash, + minTime = time, + maxTime = time, + authors = setOfNotNull(it.author), + ) + } + .take(limit) + .toList() + .getSuspending(index.graph) + } + return HistoryResponse(entries = entries, nextVersions = emptyList()) + } + + override suspend fun getHistoryRange( + repositoryId: RepositoryId, + headVersion: ObjectHash, + skip: Long, + limit: Long, + ): List { + val index: Object = getHistoryIndex(repositoryId, headVersion) + return index.data.getRange(skip until (limit + skip)) + .flatMapOrdered { it.getAllVersionsReversed() } + .flatMapOrdered { it.resolve() } + .map { CLVersion(it) } + .toList() + .getSuspending(index.graph) + } + + suspend fun getHistoryIndex( + repositoryId: RepositoryId?, + versionHash: ObjectHash, + ): Object { + return httpClient.prepareGet { + url { + takeFrom(baseUrl) + if (repositoryId == null) { + appendPathSegments("versions", versionHash.toString()) + } else { + appendPathSegments("repositories", repositoryId.id, "versions", versionHash.toString()) + } + appendPathSegments("history-index") + } + }.execute { response -> + val graph = getObjectGraph(repositoryId).also { it.config = it.config.copy(lazyLoadingEnabled = true) } + val text = response.bodyAsText() + val hashString = text.substringBefore('\n') + val serialized = text.substringAfter('\n') + val deserialized = HistoryIndexNode.deserialize(serialized, graph) + val ref = graph.fromDeserialized(ObjectHash(hashString), deserialized) + Object(deserialized, ref) + } + } + override suspend fun loadVersion( repositoryId: RepositoryId, versionHash: String, @@ -968,3 +1072,5 @@ fun IVersion.runWrite(idGenerator: IIdGenerator, author: String?, body: (IBranch } private fun String.ensureSuffix(suffix: String) = if (endsWith(suffix)) this else this + suffix + +private class LimitedReached : RuntimeException("limited reached") diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt new file mode 100644 index 0000000000..f16039d62b --- /dev/null +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt @@ -0,0 +1,408 @@ +package org.modelix.datastructures.model + +import kotlinx.datetime.Instant +import org.modelix.datastructures.model.HistoryIndexNode.Companion.merge +import org.modelix.datastructures.model.HistoryIndexNode.Companion.of +import org.modelix.datastructures.objects.IObjectData +import org.modelix.datastructures.objects.IObjectDeserializer +import org.modelix.datastructures.objects.IObjectGraph +import org.modelix.datastructures.objects.IObjectReferenceFactory +import org.modelix.datastructures.objects.Object +import org.modelix.datastructures.objects.ObjectReference +import org.modelix.datastructures.objects.asObject +import org.modelix.datastructures.objects.getHashString +import org.modelix.kotlin.utils.DelicateModelixApi +import org.modelix.kotlin.utils.urlDecode +import org.modelix.kotlin.utils.urlEncode +import org.modelix.model.lazy.CLVersion +import org.modelix.model.persistent.CPVersion +import org.modelix.model.persistent.Separators +import org.modelix.streams.IStream +import org.modelix.streams.plus +import kotlin.math.max +import kotlin.math.min +import kotlin.time.Duration + +/** + * The subranges can overlap. + */ +data class HistoryIndexNode( + /** + * if subrange1 is set, then it's the same as subrange1.firstVersion + */ + val firstVersion: ObjectReference, + /** + * if subrange2 is set, then it's the same as subrange2.lastVersion + */ + val lastVersion: ObjectReference, + + /** + * All authors in this subtree. + */ + val authors: Set, + + /** + * Number if versions in this subtree. + */ + val size: Long, + + val height: Long, + + val minTime: Instant, + + val maxTime: Instant, + + val subranges: Pair, ObjectReference>?, +) : IObjectData { + + init { + when (size) { + 0L -> error("empty node not expected") + 1L -> { + require(subranges == null) + require(firstVersion.getHash() == lastVersion.getHash()) + require(minTime == maxTime) + require(height == 1L) + } + 2L -> { + require(subranges == null) + require(firstVersion.getHash() != lastVersion.getHash()) + require(minTime <= maxTime) + require(height == 1L) + } + else -> { + require(firstVersion.getHash() != lastVersion.getHash()) + require(subranges != null) + require(subranges.first.getHash() != subranges.second.getHash()) + require(minTime <= maxTime) + require(height > 1L) + } + } + } + + private val graph: IObjectGraph get() = firstVersion.graph + val time: ClosedRange get() = minTime..maxTime + + override fun getDeserializer(): IObjectDeserializer<*> { + return HistoryIndexNode + } + + override fun getContainmentReferences(): List> { + return subranges?.toList() ?: emptyList() + } + + override fun objectDiff( + self: Object<*>, + oldObject: Object<*>?, + ): IStream.Many> { + TODO("Not yet implemented") + } + + /** + * Oldest version first + */ + fun getAllVersions(): IStream.Many> { + return when (size) { + 1L -> IStream.of(firstVersion) + 2L -> IStream.many(arrayOf(firstVersion, lastVersion)) + else -> IStream.many(subranges!!.toList()) + .flatMapOrdered { it.resolve() } + .flatMapOrdered { it.data.getAllVersions() } + } + } + + /** + * Latest version first + */ + fun getAllVersionsReversed(): IStream.Many> { + return when (size) { + 1L -> IStream.of(firstVersion) + 2L -> IStream.many(arrayOf(lastVersion, firstVersion)) + else -> IStream.many(subranges!!.toList().asReversed()) + .flatMapOrdered { it.resolve() } + .flatMapOrdered { it.data.getAllVersionsReversed() } + } + } + + /** + * Each returned node spans at most the duration specified in [interval]. + * For the same interval multiple nodes may be returned. + * Latest entry is returned first. + */ + fun splitAtInterval(interval: Duration): IStream.Many { + if (size == 1L) return IStream.of(this) + val intervalId1 = minTime.toEpochMilliseconds() / interval.inWholeMilliseconds + val intervalId2 = maxTime.toEpochMilliseconds() / interval.inWholeMilliseconds + if (intervalId1 == intervalId2) return IStream.of(this) + return splitReversed().flatMapOrdered { it.splitAtInterval(interval) } + } + + fun split(): IStream.Many { + return when (size) { + 1L -> IStream.of(this) + 2L -> IStream.many(listOf(firstVersion, lastVersion)) + .flatMapOrdered { it.resolve() } + .map { of(it) } + else -> IStream.many(subranges!!.toList()).flatMapOrdered { it.resolve() }.map { it.data } + } + } + + fun splitReversed(): IStream.Many { + return when (size) { + 1L -> IStream.of(this) + 2L -> IStream.many(listOf(lastVersion, firstVersion)) + .flatMapOrdered { it.resolve() } + .map { of(it) } + else -> IStream.many(subranges!!.let { listOf(it.second, it.first) }) + .flatMapOrdered { it.resolve() } + .map { it.data } + } + } + + /** + * Latest element has index 0 + */ + fun getRange(indexRange: LongRange): IStream.Many { + if (indexRange.isEmpty()) return IStream.empty() + if (!indexRange.intersects(0L until size)) return IStream.empty() + if (indexRange.contains(0L) && indexRange.contains(size - 1L)) return IStream.of(this) + return splitReversed().toList().flatMapOrdered { list -> + when (list.size) { + 1 -> getRange(indexRange) + 2 -> { + val validRange1 = 0L.rangeOfSize(list[0].size) + val validRange2 = (validRange1.last + 1L).rangeOfSize(list[1].size) + val range1 = indexRange.intersect(validRange1) + val range2 = indexRange.intersect(validRange2).shift(-list[0].size) + list[0].getRange(range1) + list[1].getRange(range2) + } + else -> error("impossible") + } + } + } + fun getRange(indexRange: IntRange) = getRange(indexRange.first.toLong()..indexRange.last.toLong()) + + override fun serialize(): String { + val firstAndLast = if (firstVersion.getHash() == lastVersion.getHash()) { + firstVersion.getHashString() + } else { + firstVersion.getHashString() + Separators.LEVEL2 + lastVersion.getHashString() + } + val times = if (minTime == maxTime) { + minTime.epochSeconds.toString() + } else { + "${minTime.epochSeconds}${Separators.LEVEL2}${maxTime.epochSeconds}" + } + return firstAndLast + + Separators.LEVEL1 + authors.joinToString(Separators.LEVEL2) { it.urlEncode() } + + Separators.LEVEL1 + size + + Separators.LEVEL1 + height + + Separators.LEVEL1 + times + + (subranges?.let { Separators.LEVEL1 + it.first.getHashString() + Separators.LEVEL2 + it.second.getHashString() } ?: "") + } + + companion object : IObjectDeserializer { + + override fun deserialize( + serialized: String, + referenceFactory: IObjectReferenceFactory, + ): HistoryIndexNode { + val parts = serialized.split(Separators.LEVEL1) + val versionRefs = parts[0].split(Separators.LEVEL2) + .map { referenceFactory.fromHashString(it, CPVersion) } + val times = parts[4].split(Separators.LEVEL2).map { Instant.fromEpochSeconds(it.toLong()) } + return HistoryIndexNode( + firstVersion = versionRefs[0], + lastVersion = versionRefs.getOrElse(1) { versionRefs[0] }, + authors = parts[1].split(Separators.LEVEL2).mapNotNull { it.urlDecode() }.toSet(), + size = parts[2].toLong(), + height = parts[3].toLong(), + minTime = times[0], + maxTime = times.getOrElse(1) { times[0] }, + subranges = parts.getOrNull(5)?.split(Separators.LEVEL2) + ?.map { referenceFactory.fromHashString(it, HistoryIndexNode) } + ?.let { it[0] to it[1] }, + ) + } + + fun of(version: Object): HistoryIndexNode { + val time = CLVersion(version).getTimestamp() ?: Instant.Companion.fromEpochMilliseconds(0L) + return HistoryIndexNode( + firstVersion = version.ref, + lastVersion = version.ref, + authors = setOfNotNull(version.data.author), + size = 1, + height = 1L, + minTime = time, + maxTime = time, + subranges = null, + ) + } + + fun of(version1: Object, version2: Object): HistoryIndexNode { + val time1 = CLVersion(version1).getTimestamp() ?: Instant.Companion.fromEpochMilliseconds(0L) + val time2 = CLVersion(version2).getTimestamp() ?: Instant.Companion.fromEpochMilliseconds(0L) + return if (time1 <= time2) { + HistoryIndexNode( + firstVersion = version1.ref, + lastVersion = version2.ref, + authors = setOfNotNull(version1.data.author, version2.data.author), + size = 2, + height = 1L, + minTime = time1, + maxTime = time2, + subranges = null, + ) + } else { + of(version2, version1) + } + } + + fun merge(range1: Object?, range2: Object?): Object { + if (range1 == null) return requireNotNull(range2) + if (range2 == null) return range1 + if (range1.getHash() == range2.getHash()) return range1 + if (range2.data.maxTime < range1.data.minTime) return merge(range2, range1) + + val totalSize = range1.data.size + range2.data.size + + if (totalSize <= 2) return concat(range1, range2) + + if (range1.data.time.intersects(range2.data.time)) { + val split1 = range1.splitNow() + val split2 = range2.splitNow() + + if (split1.size == 1) { + if (split2.size == 1) { + TODO() + } else { + return merge(merge(split1[0], split2[0]), split2[1]) + } + } else { + if (split2.size == 1) { + return merge(split1[0], merge(split1[1], split2[0])) + } else { + return merge(split1[0], merge(merge(split1[1], split2[0]), split2[1])) + } + } + } + + // In a balanced tree, one subtree shouldn't be more than twice as big as the other. + val minSubtreeSize = totalSize / 3L + val maxSubtreeSize = totalSize - minSubtreeSize + val allowedSubtreeSizeRange = minSubtreeSize..maxSubtreeSize + + if (range1.data.size > maxSubtreeSize) { + val (range1A, range1B) = range1.splitLeft(allowedSubtreeSizeRange) + return merge(range1A, merge(range1B, range2)) + } + + if (range2.data.size > maxSubtreeSize) { + val (range2A, range2B) = range2.splitRight(allowedSubtreeSizeRange) + return merge(merge(range1, range2A), range2B) + } + + if (range1.data.size < minSubtreeSize) { + val (range2A, range2B) = range2.splitLeft(allowedSubtreeSizeRange.shift(-range1.data.size)) + return merge(merge(range1, range2A), range2B) + } + + if (range2.data.size < minSubtreeSize) { + val (range1A, range1B) = range1.splitRight(allowedSubtreeSizeRange.shift(-range2.data.size)) + return merge(range1A, merge(range1B, range2)) + } + + return concat(range1, range2) + } + + /** + * Just merges the two subtrees without any guarantees about the shape of the resulting tree. + */ + private fun concat(range1: Object, range2: Object): Object { + val subranges = if (range1.data.size == 1L && range2.data.size == 1L) { + null + } else { + range1.ref to range2.ref + } + require(range1.data.maxTime <= range2.data.minTime) { + "${range1.data.time} overlaps with ${range2.data.time}" + } + return HistoryIndexNode( + firstVersion = range1.data.firstVersion, + lastVersion = range2.data.lastVersion, + authors = range1.data.authors + range2.data.authors, + size = range1.data.size + range2.data.size, + height = if (subranges == null) 1L else max(range1.data.height, range2.data.height) + 1, + minTime = range1.data.minTime, + maxTime = range2.data.maxTime, + subranges = subranges, + ).let { + @OptIn(DelicateModelixApi::class) + it.asObject(range1.graph) + } + } + } +} + +private fun Object.splitNow(): List> { + return when (data.size) { + 1L -> listOf(this) + 2L -> listOf(of(this.firstVersion.resolveNow()).asObject(graph), of(this.lastVersion.resolveNow()).asObject(graph)) + else -> data.subranges!!.toList().map { it.resolveNow() } + } +} + +private fun LongRange.coerceAtLeast(limit: Long) = first.coerceAtLeast(limit)..last.coerceAtLeast(limit) +private fun LongRange.coerceAtMost(limit: Long) = first.coerceAtMost(limit)..last.coerceAtMost(limit) +private fun LongRange.shift(amount: Long) = first.plus(amount)..last.plus(amount) +private fun > ClosedRange.intersects(other: ClosedRange): Boolean { + return this.contains(other.start) || this.contains(other.endInclusive) || + other.contains(this.start) || other.contains(this.endInclusive) +} + +val Object.size get() = data.size +val Object.height get() = data.height +val Object.firstVersion get() = data.firstVersion +val Object.lastVersion get() = data.lastVersion +val Object.subranges get() = data.subranges +private fun Object.split(leftSize: LongRange, rightSize: LongRange): Pair?, Object?> { + require(leftSize.first + rightSize.last == size) + require(leftSize.last + rightSize.first == size) + if (size == 1L) { + if (leftSize.contains(1L)) return this to null + if (rightSize.contains(1L)) return null to this + error("Invalid constraints for size 1: $leftSize and $rightSize") + } + if (size == 2L) { + if (leftSize.contains(2L) && rightSize.contains(0L)) return this to null + if (leftSize.contains(0L) && rightSize.contains(2L)) return null to this + if (leftSize.contains(1L) && rightSize.contains(1L)) { + return of(firstVersion.resolveNow()).asObject(graph) to of(lastVersion.resolveNow()).asObject(graph) + } + error("Invalid constraints for size 2: $leftSize and $rightSize") + } + val subranges = this.subranges + requireNotNull(subranges) + val range1 = subranges.first.resolveNow() + val range2 = subranges.second.resolveNow() + if (leftSize.contains(range1.data.size) && rightSize.contains(range2.data.size)) { + return range1 to range2 + } + if (!leftSize.contains(range1.size)) { + val (range1A, range1B) = range1.splitRight(rightSize.shift(-range2.size).coerceAtLeast(0)) + return range1A to merge(range1B, range2) + } else { + val (range2A, range2B) = range2.splitLeft(leftSize.shift(-range1.size).coerceAtLeast(0)) + return merge(range1, range2A) to range2B + } +} +fun Object.splitRight(rightSize: LongRange) = split((size - rightSize.last)..(size - rightSize.first), rightSize) +fun Object.splitLeft(leftSize: LongRange) = split(leftSize, (size - leftSize.last)..(size - leftSize.first)) +fun Object?.merge(other: Object?) = merge(this, other) +fun LongRange.size() = (last - first + 1).coerceAtLeast(0) +fun LongRange.withSize(newSize: Long) = first..(last.coerceAtMost(newSize - first - 1)) +fun Long.rangeOfSize(size: Long) = this until (this + size) +fun LongRange.intersect(other: LongRange): LongRange { + return if (this.first > other.first) other.intersect(this) else other.first..min(this.last, other.last) +} +fun LongRange.shiftFirstTo(newFirst: Long) = newFirst..(last + newFirst - first) diff --git a/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt b/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt new file mode 100644 index 0000000000..d8e33f7714 --- /dev/null +++ b/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt @@ -0,0 +1,224 @@ +import kotlinx.datetime.Instant +import org.modelix.datastructures.model.HistoryIndexNode +import org.modelix.datastructures.model.IGenericModelTree +import org.modelix.datastructures.model.height +import org.modelix.datastructures.model.merge +import org.modelix.datastructures.model.size +import org.modelix.datastructures.objects.FullyLoadedObjectGraph +import org.modelix.datastructures.objects.Object +import org.modelix.datastructures.objects.asObject +import org.modelix.model.IVersion +import org.modelix.model.TreeId +import org.modelix.model.lazy.CLVersion +import org.modelix.streams.getBlocking +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.seconds + +class HistoryIndexTest { + + private fun createInitialVersion(): CLVersion { + val emptyTree = IGenericModelTree.builder() + .withInt64Ids() + .treeId(TreeId.fromUUID("69dcb381-3dba-4251-b3ae-7aafe587c28e")) + .graph(FullyLoadedObjectGraph()) + .build() + + return IVersion.builder() + .tree(emptyTree) + .time(Instant.fromEpochMilliseconds(1757582404557L)) + .build() as CLVersion + } + + private fun newVersion( + base: CLVersion, + author: String? = "unit-test@modelix.org", + timeDeltaSeconds: Long = 1L, + ): CLVersion { + return IVersion.builder() + .tree(base.getModelTree()) + .author(author) + .time(base.getTimestamp()!!.plus(timeDeltaSeconds.seconds)) + .build() as CLVersion + } + + @Test + fun `1 version`() { + val history = HistoryIndexNode.of(createInitialVersion().obj) + assertEquals(1, history.size) + assertEquals(1, history.height) + } + + @Test + fun `2 versions`() { + val version1 = createInitialVersion() + val version2 = newVersion(version1) + val history = HistoryIndexNode.of(version1.obj, version2.obj) + assertEquals(2, history.size) + assertEquals(1, history.height) + } + + @Test + fun `3 versions`() { + val version1 = createInitialVersion() + val version2 = newVersion(version1) + val version3 = newVersion(version2) + val graph = version1.graph + val history1 = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) + val history2 = HistoryIndexNode.of(version3.obj).asObject(graph) + val history = history1.merge(history2) + assertEquals(3, history.size) + assertEquals(2, history.height) + } + + @Test + fun `4 versions`() { + val version1 = createInitialVersion() + val version2 = newVersion(version1) + val version3 = newVersion(version2) + val version4 = newVersion(version3) + val graph = version1.graph + val history = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) + .merge(HistoryIndexNode.of(version3.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version4.obj).asObject(graph)) + assertEquals(4, history.size) + assertEquals(3, history.height) + } + + @Test + fun `5 versions`() { + val version1 = createInitialVersion() + val version2 = newVersion(version1) + val version3 = newVersion(version2) + val version4 = newVersion(version3) + val version5 = newVersion(version4) + val graph = version1.graph + val history = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) + .merge(HistoryIndexNode.of(version3.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version4.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version5.obj).asObject(graph)) + assertEquals(5, history.size) + assertEquals(4, history.height) + } + + @Test + fun `branch and merge`() { + val version1 = createInitialVersion() + val version2 = newVersion(version1, author = "user0") + + val version3a = newVersion(version2, author = "user1") + val version4a = newVersion(version3a, author = "user1") + val version5a = newVersion(version4a, author = "user1") + + val version3b = newVersion(version2, author = "user2") + val version4b = newVersion(version3b, author = "user2") + val version5b = newVersion(version4b, author = "user2") + val version6b = newVersion(version5b, author = "user2") + + val version7 = IVersion.builder() + .tree(version1.getModelTree()) + .time(version6b.getTimestamp()!!.plus(1.seconds)) + .autoMerge(version2, version5a, version6b) + .build() as CLVersion + val version8 = newVersion(version7, author = "user3") + + val graph = version1.graph + val historyA = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) + .merge(HistoryIndexNode.of(version3a.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version4a.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version5a.obj).asObject(graph)) + val historyB = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) + .merge(HistoryIndexNode.of(version3b.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version4b.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version5b.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version6b.obj).asObject(graph)) + val history = historyA + .merge(historyB) + .merge(HistoryIndexNode.of(version7.obj).asObject(graph)) + .merge(HistoryIndexNode.of(version8.obj).asObject(graph)) + + assertEquals( + listOf(version1, version2, version3a, version3b, version4a, version4b, version5a, version5b, version6b, version7, version8).map { it.getObjectHash() }, + history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), + ) + assertEquals(11, history.size) + assertEquals(5, history.height) + } + + @Test + fun `large history linear`() { + val versions = mutableListOf(createInitialVersion()) + repeat(1000) { + versions.add(newVersion(versions.last())) + } + val graph = versions.first().graph + val history = versions.drop(1).fold(HistoryIndexNode.of(versions.first().asObject()).asObject(graph)) { acc, it -> + acc.merge(HistoryIndexNode.of(it.asObject()).asObject(graph)) + } + assertEquals(versions.size.toLong(), history.size) + assertEquals( + versions.map { it.getObjectHash() }, + history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), + ) + assertEquals(12, history.height) + } + + @Test + fun `large history shuffled`() { + val versions = mutableListOf(createInitialVersion()) + repeat(1000) { + versions.add(newVersion(versions.last())) + } + val graph = versions.first().graph + val history = versions.drop(1).shuffled(Random(78234554)).fold(HistoryIndexNode.of(versions.first().asObject()).asObject(graph)) { acc, it -> + acc.merge(HistoryIndexNode.of(it.asObject()).asObject(graph)) + } + assertEquals(versions.size.toLong(), history.size) + assertEquals( + versions.map { it.getObjectHash() }, + history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), + ) + assertEquals(13, history.height) + } + + @Test + fun `large history linear recursive`() { + val versions = mutableListOf(createInitialVersion()) + repeat(1000) { + versions.add(newVersion(versions.last())) + } + val graph = versions.first().graph + + val history = buildHistory(versions) + assertEquals(versions.size.toLong(), history.size) + assertEquals( + versions.map { it.getObjectHash() }, + history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), + ) + assertEquals(10, history.height) + } + + @Test + fun `large history shuffled recursive`() { + val versions = mutableListOf(createInitialVersion()) + repeat(1000) { + versions.add(newVersion(versions.last())) + } + val graph = versions.first().graph + + val history = buildHistory(versions.shuffled(Random(78234554))) + assertEquals(versions.size.toLong(), history.size) + assertEquals( + versions.map { it.getObjectHash() }, + history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), + ) + assertEquals(14, history.height) + } + + private fun buildHistory(versions: List): Object { + if (versions.size == 1) return HistoryIndexNode.of(versions.single().obj).asObject(versions.single().graph) + val centerIndex = versions.size / 2 + return buildHistory(versions.subList(0, centerIndex)).merge(buildHistory(versions.subList(centerIndex, versions.size))) + } +} diff --git a/model-server-openapi/specifications/model-server-v2.yaml b/model-server-openapi/specifications/model-server-v2.yaml index 6e973684b1..fa535aa13f 100644 --- a/model-server-openapi/specifications/model-server-v2.yaml +++ b/model-server-openapi/specifications/model-server-v2.yaml @@ -113,6 +113,31 @@ paths: $ref: '#/components/responses/200json' default: $ref: '#/components/responses/GeneralError' + /repositories/{repository}/versions/{versionHash}/history-index: + get: + tags: + - v2 + operationId: getHistoryIndex + parameters: + - name: repository + in: "path" + required: true + schema: + type: string + - name: versionHash + in: "path" + required: true + schema: + type: string + responses: + "403": + $ref: '#/components/responses/403' + "401": + $ref: '#/components/responses/401' + "200": + $ref: '#/components/responses/200' + default: + $ref: '#/components/responses/GeneralError' /repositories/{repository}/objects/getAll: post: tags: diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt index 5de2f15c7f..1aa0dd3b25 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt @@ -188,6 +188,23 @@ class ModelReplicationServer( call.respondText(versionHash) } + override suspend fun RoutingContext.getHistoryIndex( + repository: String, + versionHash: String, + ) { + checkPermission(ModelServerPermissionSchema.repository(repository).objects.read) + val version = repositoriesManager.getVersion(repositoryId(repository), versionHash) + if (version == null) { + throw VersionNotFoundException(versionHash) + } + + @OptIn(RequiresTransaction::class) + val index = runWrite { + (repositoriesManager as RepositoriesManager).getOrCreateHistoryIndex(repositoryId(repository), version) + } + call.respondText(index.getHashString() + "\n" + index.data.serialize()) + } + override suspend fun RoutingContext.initializeRepository( repositoryName: String, useRoleIds: Boolean?, diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index 487b93d25f..a1cd05bb70 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -3,9 +3,14 @@ package org.modelix.model.server.handlers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.onEach import kotlinx.datetime.Clock +import org.modelix.datastructures.model.HistoryIndexNode import org.modelix.datastructures.model.IGenericModelTree import org.modelix.datastructures.model.getHash +import org.modelix.datastructures.model.merge import org.modelix.datastructures.model.withIdTranslation +import org.modelix.datastructures.objects.Object +import org.modelix.datastructures.objects.ObjectHash +import org.modelix.datastructures.objects.asObject import org.modelix.model.IVersion import org.modelix.model.ModelMigrations import org.modelix.model.ObjectDeltaFilter @@ -393,6 +398,32 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { if (!isolated) { stores.genericStore.put(legacyBranchKey(branch), hash, false) } + if (hash != null) { + // eager update of the index + getVersion(branch.repositoryId, hash)?.let { getOrCreateHistoryIndex(branch.repositoryId, it) } + } + } + + @RequiresTransaction + fun getOrCreateHistoryIndex(repositoryId: RepositoryId, version: CLVersion): Object { + val key = versionHistoryKey(repositoryId, version.getObjectHash()) + val indexHash = stores.genericStore.get(key)?.let { ObjectHash(it) } + val graph = stores.getAsyncStore(repositoryId.takeIf { isIsolated(it) ?: false }).asObjectGraph() + if (indexHash != null) { + return graph.fromHash(indexHash, HistoryIndexNode).resolveNow() + } else { + val parentIndices = version.getParentVersions().map { getOrCreateHistoryIndex(repositoryId, it as CLVersion) } + val newElement = HistoryIndexNode.of(version.obj).asObject(graph) + val newIndex = when (parentIndices.size) { + 0 -> newElement + 1 -> parentIndices.single().merge(newElement) + 2 -> parentIndices[0].merge(parentIndices[1]).merge(newElement) + else -> error("impossible") + } + newIndex.write() + stores.genericStore.put(key, newIndex.getHashString()) + return newIndex + } } override suspend fun pollVersionHash(branch: BranchReference, lastKnown: String?): String { @@ -485,6 +516,14 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { } } + private fun versionHistoryKey(repositoryId: RepositoryId, versionHash: ObjectHash, isolated: Boolean = isIsolated(repositoryId) ?: true): ObjectInRepository { + return if (isolated) { + ObjectInRepository(repositoryId.id, "$KEY_PREFIX:historyIndex:$versionHash") + } else { + ObjectInRepository.global("$KEY_PREFIX:repositories:${repositoryId.id}:historyIndex:$versionHash") + } + } + private fun legacyBranchKey(branchReference: BranchReference): ObjectInRepository { check(isIsolated(branchReference.repositoryId) != true) { "Not a legacy repository: " + branchReference.repositoryId } return ObjectInRepository.global(branchReference.getKey()) diff --git a/model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt new file mode 100644 index 0000000000..8c8f255742 --- /dev/null +++ b/model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt @@ -0,0 +1,118 @@ +package org.modelix.model.server + +import io.ktor.server.testing.ApplicationTestBuilder +import io.ktor.server.testing.testApplication +import mu.KotlinLogging +import org.modelix.model.IVersion +import org.modelix.model.client2.IModelClientV2 +import org.modelix.model.historyAsSequence +import org.modelix.model.lazy.RepositoryId +import org.modelix.model.server.api.RepositoryConfig +import org.modelix.model.server.handlers.IdsApiImpl +import org.modelix.model.server.handlers.ModelReplicationServer +import org.modelix.model.server.handlers.RepositoriesManager +import org.modelix.model.server.store.InMemoryStoreClient +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.seconds + +private val LOG = KotlinLogging.logger { } + +class HistoryIndexTest { + + private lateinit var statistics: StoreClientWithStatistics + private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication { + application { + try { + installDefaultServerPlugins() + statistics = StoreClientWithStatistics(InMemoryStoreClient()) + val repoManager = RepositoriesManager(statistics) + ModelReplicationServer(repoManager).init(this) + IdsApiImpl(repoManager).init(this) + } catch (ex: Throwable) { + LOG.error("", ex) + } + } + block() + } + + @Test fun pagination_0_0() = runPaginationTest(0, 0) + + @Test fun pagination_0_1() = runPaginationTest(0, 1) + + @Test fun pagination_0_2() = runPaginationTest(0, 2) + + @Test fun pagination_0_3() = runPaginationTest(0, 3) + + @Test fun pagination_0_4() = runPaginationTest(0, 4) + + @Test fun pagination_0_5() = runPaginationTest(0, 5) + + @Test fun pagination_0_6() = runPaginationTest(0, 6) + + @Test fun pagination_0_7() = runPaginationTest(0, 7) + + @Test fun pagination_0_8() = runPaginationTest(0, 8) + + @Test fun pagination_0_9() = runPaginationTest(0, 9) + + @Test fun pagination_0_10() = runPaginationTest(0, 10) + + @Test fun pagination_10_10() = runPaginationTest(10, 10) + + @Test fun pagination_137_47() = runPaginationTest(137, 47) + + @Test fun pagination_138_47() = runPaginationTest(138, 47) + + @Test fun pagination_139_47() = runPaginationTest(139, 47) + + @Test fun pagination_140_47() = runPaginationTest(140, 47) + + @Test fun pagination_200_10() = runPaginationTest(200, 10) + + @Test fun pagination_201_10() = runPaginationTest(201, 10) + + @Test fun pagination_202_10() = runPaginationTest(202, 10) + + @Test fun pagination_201_201() = runPaginationTest(201, 201) + + private fun runPaginationTest(skip: Int, limit: Int) = runTest { + val modelClient: IModelClientV2 = createModelClient() + val repositoryId = RepositoryId("repo1") + val branchRef = repositoryId.getBranchReference() + val initialVersion = modelClient.initRepository(RepositoryConfig(repositoryId = repositoryId.id, repositoryName = repositoryId.id, modelId = "61bd6cb0-33ff-45d8-9d1b-2149fdb01d16")) + var currentVersion = initialVersion + + repeat(100) { + run { + val newVersion = IVersion.builder() + .baseVersion(currentVersion) + .tree(currentVersion.getModelTree()) + .author("user1") + .time(currentVersion.getTimestamp()!! + 3.seconds) + .build() + currentVersion = modelClient.push(branchRef, newVersion, currentVersion) + } + run { + val newVersion = IVersion.builder() + .baseVersion(currentVersion) + .tree(currentVersion.getModelTree()) + .author("user2") + .time(currentVersion.getTimestamp()!! + 2.seconds) + .build() + currentVersion = modelClient.push(branchRef, newVersion, currentVersion) + } + } + + val expectedOrder = currentVersion.historyAsSequence().map { it.getContentHash() }.toList() + + val history = modelClient.getHistoryRange( + repositoryId = repositoryId, + headVersion = currentVersion.getObjectHash(), + skip = skip.toLong(), + limit = limit.toLong(), + ) + assertEquals(expectedOrder.drop(skip).take(limit).toSet(), history.map { it.getContentHash() }.toSet()) + assertEquals(expectedOrder.drop(skip).take(limit), history.map { it.getContentHash() }) + } +} From 66fbf72dfd0a91d917ec68eebb2dde7fcea49c18 Mon Sep 17 00:00:00 2001 From: Sascha Lisson Date: Sun, 14 Sep 2025 13:53:04 +0200 Subject: [PATCH 3/7] feat: index for efficient history queries --- .../datastructures/model/HistoryIndexNode.kt | 600 +++++++++--------- 1 file changed, 294 insertions(+), 306 deletions(-) diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt index f16039d62b..9c54abaa04 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt @@ -1,8 +1,6 @@ package org.modelix.datastructures.model import kotlinx.datetime.Instant -import org.modelix.datastructures.model.HistoryIndexNode.Companion.merge -import org.modelix.datastructures.model.HistoryIndexNode.Companion.of import org.modelix.datastructures.objects.IObjectData import org.modelix.datastructures.objects.IObjectDeserializer import org.modelix.datastructures.objects.IObjectGraph @@ -11,6 +9,7 @@ import org.modelix.datastructures.objects.Object import org.modelix.datastructures.objects.ObjectReference import org.modelix.datastructures.objects.asObject import org.modelix.datastructures.objects.getHashString +import org.modelix.datastructures.objects.requestBoth import org.modelix.kotlin.utils.DelicateModelixApi import org.modelix.kotlin.utils.urlDecode import org.modelix.kotlin.utils.urlEncode @@ -18,77 +17,253 @@ import org.modelix.model.lazy.CLVersion import org.modelix.model.persistent.CPVersion import org.modelix.model.persistent.Separators import org.modelix.streams.IStream +import org.modelix.streams.flatten import org.modelix.streams.plus import kotlin.math.max import kotlin.math.min import kotlin.time.Duration +sealed class HistoryIndexNode : IObjectData { + abstract val firstVersion: ObjectReference + abstract val lastVersion: ObjectReference + abstract val authors: Set + abstract val size: Long + abstract val height: Long + abstract val minTime: Instant + abstract val maxTime: Instant + abstract val timeRange: ClosedRange + + abstract fun getAllVersions(): IStream.Many> + abstract fun getAllVersionsReversed(): IStream.Many> + abstract fun getRange(indexRange: LongRange): IStream.Many + fun getRange(indexRange: IntRange) = getRange(indexRange.first.toLong()..indexRange.last.toLong()) + abstract fun merge(self: Object, otherObj: Object): Object + + /** + * Each returned node spans at most the duration specified in [interval]. + * For the same interval multiple nodes may be returned. + * Latest entry is returned first. + */ + abstract fun splitAtInterval(interval: Duration): IStream.Many + + fun concat( + self: Object, + otherObj: Object, + ): Object { + require(self.data === this) + val other = otherObj.data + require(maxTime < other.minTime) + return HistoryIndexRangeNode( + firstVersion = firstVersion, + lastVersion = other.lastVersion, + authors = authors + other.authors, + size = size + other.size, + height = max(height, other.height) + 1, + minTime = minTime, + maxTime = other.maxTime, + child1 = self.ref, + child2 = otherObj.ref, + ).let { + @OptIn(DelicateModelixApi::class) + it.asObject(self.graph) + } + } + + override fun getDeserializer(): IObjectDeserializer<*> { + return HistoryIndexNode + } + + companion object : IObjectDeserializer { + override fun deserialize( + serialized: String, + referenceFactory: IObjectReferenceFactory, + ): HistoryIndexNode { + val parts = serialized.split(Separators.LEVEL1) + return when (parts[0]) { + "R" -> { + val versionRefs = parts[1].split(Separators.LEVEL2) + .map { referenceFactory.fromHashString(it, CPVersion) } + val times = parts[5].split(Separators.LEVEL2).map { Instant.fromEpochSeconds(it.toLong()) } + return HistoryIndexRangeNode( + firstVersion = versionRefs[0], + lastVersion = versionRefs.getOrElse(1) { versionRefs[0] }, + authors = parts[2].split(Separators.LEVEL2).mapNotNull { it.urlDecode() }.toSet(), + size = parts[3].toLong(), + height = parts[4].toLong(), + minTime = times[0], + maxTime = times.getOrElse(1) { times[0] }, + child1 = referenceFactory.fromHashString(parts[6], HistoryIndexNode), + child2 = referenceFactory.fromHashString(parts[7], HistoryIndexNode), + ) + } + "L" -> { + HistoryIndexLeafNode( + versions = parts[1].split(Separators.LEVEL2) + .map { referenceFactory.fromHashString(it, CPVersion) }, + authors = parts[2].split(Separators.LEVEL2).mapNotNull { it.urlDecode() }.toSet(), + time = Instant.fromEpochSeconds(parts[3].toLong()), + ) + } + else -> error("Unknown type: " + parts[0]) + } + } + + fun of(version: Object): HistoryIndexLeafNode { + val time = CLVersion(version).getTimestamp() ?: Instant.Companion.fromEpochMilliseconds(0L) + return HistoryIndexLeafNode( + versions = listOf(version.ref), + authors = setOfNotNull(version.data.author), + time = time, + ) + } + + fun of(version1: Object, version2: Object): HistoryIndexNode { + return of(version1).asObject(version1.graph).merge(of(version2).asObject(version2.graph)).data + } + } +} + +fun Object.merge(otherObj: Object): Object = data.merge(this, otherObj) +fun Object.concat(otherObj: Object): Object = data.concat(this, otherObj) +val Object.time get() = data.time + +data class HistoryIndexLeafNode( + val versions: List>, + override val authors: Set, + val time: Instant, +) : HistoryIndexNode() { + override val size: Long get() = versions.size.toLong() + override val height: Long get() = 1 + override val minTime: Instant get() = time + override val maxTime: Instant get() = time + override val timeRange: ClosedRange get() = minTime..maxTime + override val firstVersion: ObjectReference get() = versions.first() + override val lastVersion: ObjectReference get() = versions.last() + + override fun getAllVersions(): IStream.Many> { + return IStream.many(versions) + } + + override fun getAllVersionsReversed(): IStream.Many> { + return IStream.many(versions.asReversed()) + } + + override fun getRange(indexRange: LongRange): IStream.Many { + if (indexRange.first == 0L && indexRange.size() == versions.size.toLong()) { + return IStream.of(this) + } else { + return IStream.many( + versions.asReversed() + .drop(indexRange.first.toInt()) + .take(indexRange.size().toInt()), + ) + .flatMapOrdered { it.resolve() } + .map { + HistoryIndexLeafNode( + versions = listOf(it.ref), + authors = setOfNotNull(it.data.author), + time = time, + ) + } + } + } + + override fun getContainmentReferences(): List> { + return versions + } + + override fun objectDiff( + self: Object<*>, + oldObject: Object<*>?, + ): IStream.Many> { + TODO("Not yet implemented") + } + + override fun serialize(): String { + return "L" + + Separators.LEVEL1 + versions.joinToString(Separators.LEVEL2) { it.getHashString() } + + Separators.LEVEL1 + authors.joinToString(Separators.LEVEL2) { it.urlEncode() } + + Separators.LEVEL1 + time.epochSeconds.toString() + } + + override fun merge( + self: Object, + otherObj: Object, + ): Object { + val other = otherObj.data + return when (other) { + is HistoryIndexLeafNode -> { + when { + other.time < time -> otherObj.concat(self) + other.time > time -> self.concat(otherObj) + else -> HistoryIndexLeafNode( + versions = (versions.associateBy { it.getHash() } + other.versions.associateBy { it.getHash() }).values.toList(), + authors = authors + other.authors, + time = time, + ).asObject(self.graph) + } + } + is HistoryIndexRangeNode -> { + otherObj.merge(self) + } + } + } + + override fun splitAtInterval(interval: Duration): IStream.Many { + TODO("Not yet implemented") + } +} + /** * The subranges can overlap. */ -data class HistoryIndexNode( +data class HistoryIndexRangeNode( /** * if subrange1 is set, then it's the same as subrange1.firstVersion */ - val firstVersion: ObjectReference, + override val firstVersion: ObjectReference, /** * if subrange2 is set, then it's the same as subrange2.lastVersion */ - val lastVersion: ObjectReference, + override val lastVersion: ObjectReference, /** * All authors in this subtree. */ - val authors: Set, + override val authors: Set, /** - * Number if versions in this subtree. + * Number of versions in this subtree. */ - val size: Long, + override val size: Long, - val height: Long, + override val height: Long, - val minTime: Instant, + override val minTime: Instant, - val maxTime: Instant, + override val maxTime: Instant, - val subranges: Pair, ObjectReference>?, -) : IObjectData { + val child1: ObjectReference, + val child2: ObjectReference, +) : HistoryIndexNode() { init { - when (size) { - 0L -> error("empty node not expected") - 1L -> { - require(subranges == null) - require(firstVersion.getHash() == lastVersion.getHash()) - require(minTime == maxTime) - require(height == 1L) - } - 2L -> { - require(subranges == null) - require(firstVersion.getHash() != lastVersion.getHash()) - require(minTime <= maxTime) - require(height == 1L) - } - else -> { - require(firstVersion.getHash() != lastVersion.getHash()) - require(subranges != null) - require(subranges.first.getHash() != subranges.second.getHash()) - require(minTime <= maxTime) - require(height > 1L) - } - } + require(firstVersion.getHash() != lastVersion.getHash()) + require(minTime < maxTime) + require(height > 1L) + require(size > 1L) + require(child1.getHash() != child2.getHash()) } private val graph: IObjectGraph get() = firstVersion.graph - val time: ClosedRange get() = minTime..maxTime + override val timeRange: ClosedRange get() = minTime..maxTime override fun getDeserializer(): IObjectDeserializer<*> { return HistoryIndexNode } override fun getContainmentReferences(): List> { - return subranges?.toList() ?: emptyList() + return listOf(child1, child2) } override fun objectDiff( @@ -98,262 +273,114 @@ data class HistoryIndexNode( TODO("Not yet implemented") } - /** - * Oldest version first - */ - fun getAllVersions(): IStream.Many> { - return when (size) { - 1L -> IStream.of(firstVersion) - 2L -> IStream.many(arrayOf(firstVersion, lastVersion)) - else -> IStream.many(subranges!!.toList()) - .flatMapOrdered { it.resolve() } - .flatMapOrdered { it.data.getAllVersions() } + override fun merge(self: Object, otherObj: Object): Object { + val self = self as Object + val other = otherObj.data + val resolvedChild1 = child1.resolveNow() + val resolvedChild2 = child2.resolveNow() + when (other) { + is HistoryIndexLeafNode -> { + val range1 = resolvedChild1.data.timeRange + val range2 = resolvedChild2.data.timeRange + return when { + other.time < range1.start -> otherObj.concat(resolvedChild1).concat(resolvedChild2) + other.time <= range1.endInclusive -> resolvedChild1.merge(otherObj).concat(resolvedChild2) + other.time < range2.start -> if (resolvedChild1.size <= resolvedChild2.size) { + resolvedChild1.concat(otherObj).concat(resolvedChild2) + } else { + resolvedChild1.concat(otherObj.concat(resolvedChild2)) + } + other.time <= range2.endInclusive -> resolvedChild1.concat(resolvedChild2.merge(otherObj)) + else -> resolvedChild1.concat(resolvedChild2.concat(otherObj)) + } + } + is HistoryIndexRangeNode -> { + val range1 = resolvedChild1.data.timeRange + val range2 = resolvedChild2.data.timeRange + val intersects1 = other.timeRange.intersects(range1) + val intersects2 = other.timeRange.intersects(range2) + return when { + intersects1 && intersects2 -> { + resolvedChild1.merge(otherObj).merge(resolvedChild2) + } + intersects1 -> resolvedChild1.merge(otherObj).concat(resolvedChild2) + intersects2 -> resolvedChild1.concat(resolvedChild2.merge(otherObj)) + other.maxTime < range1.start -> { + if (other.size < resolvedChild2.size) { + otherObj.concat(resolvedChild1).concat(resolvedChild2) + } else { + otherObj.concat(self) + } + } + other.maxTime < range2.start -> { + if (resolvedChild2.size < resolvedChild1.size) { + resolvedChild1.concat(otherObj.concat(resolvedChild2)) + } else { + resolvedChild1.concat(otherObj).concat(resolvedChild2) + } + } + else -> { + if (other.size < resolvedChild1.size) { + resolvedChild1.concat(resolvedChild2.concat(otherObj)) + } else { + self.concat(otherObj) + } + } + } + } } } - /** - * Latest version first - */ - fun getAllVersionsReversed(): IStream.Many> { - return when (size) { - 1L -> IStream.of(firstVersion) - 2L -> IStream.many(arrayOf(lastVersion, firstVersion)) - else -> IStream.many(subranges!!.toList().asReversed()) - .flatMapOrdered { it.resolve() } - .flatMapOrdered { it.data.getAllVersionsReversed() } - } + override fun splitAtInterval(interval: Duration): IStream.Many { + TODO("Not yet implemented") } /** - * Each returned node spans at most the duration specified in [interval]. - * For the same interval multiple nodes may be returned. - * Latest entry is returned first. + * Oldest version first */ - fun splitAtInterval(interval: Duration): IStream.Many { - if (size == 1L) return IStream.of(this) - val intervalId1 = minTime.toEpochMilliseconds() / interval.inWholeMilliseconds - val intervalId2 = maxTime.toEpochMilliseconds() / interval.inWholeMilliseconds - if (intervalId1 == intervalId2) return IStream.of(this) - return splitReversed().flatMapOrdered { it.splitAtInterval(interval) } - } - - fun split(): IStream.Many { - return when (size) { - 1L -> IStream.of(this) - 2L -> IStream.many(listOf(firstVersion, lastVersion)) - .flatMapOrdered { it.resolve() } - .map { of(it) } - else -> IStream.many(subranges!!.toList()).flatMapOrdered { it.resolve() }.map { it.data } - } + override fun getAllVersions(): IStream.Many> { + return IStream.of(child1, child2) + .flatMapOrdered { it.resolve() } + .flatMapOrdered { it.data.getAllVersions() } } - fun splitReversed(): IStream.Many { - return when (size) { - 1L -> IStream.of(this) - 2L -> IStream.many(listOf(lastVersion, firstVersion)) - .flatMapOrdered { it.resolve() } - .map { of(it) } - else -> IStream.many(subranges!!.let { listOf(it.second, it.first) }) - .flatMapOrdered { it.resolve() } - .map { it.data } - } + /** + * Latest version first + */ + override fun getAllVersionsReversed(): IStream.Many> { + return IStream.of(child2, child1) + .flatMapOrdered { it.resolve() } + .flatMapOrdered { it.data.getAllVersionsReversed() } } /** * Latest element has index 0 */ - fun getRange(indexRange: LongRange): IStream.Many { + override fun getRange(indexRange: LongRange): IStream.Many { if (indexRange.isEmpty()) return IStream.empty() if (!indexRange.intersects(0L until size)) return IStream.empty() if (indexRange.contains(0L) && indexRange.contains(size - 1L)) return IStream.of(this) - return splitReversed().toList().flatMapOrdered { list -> - when (list.size) { - 1 -> getRange(indexRange) - 2 -> { - val validRange1 = 0L.rangeOfSize(list[0].size) - val validRange2 = (validRange1.last + 1L).rangeOfSize(list[1].size) - val range1 = indexRange.intersect(validRange1) - val range2 = indexRange.intersect(validRange2).shift(-list[0].size) - list[0].getRange(range1) + list[1].getRange(range2) - } - else -> error("impossible") - } - } + return child1.requestBoth(child2) { resolvedChild1, resolvedChild2 -> + val validRange2 = 0L.rangeOfSize(resolvedChild2.size) + val validRange1 = (validRange2.last + 1L).rangeOfSize(resolvedChild1.size) + val range2 = indexRange.intersect(validRange2) + val range1 = indexRange.intersect(validRange1).shift(-resolvedChild2.size) + resolvedChild2.data.getRange(range2) + resolvedChild1.data.getRange(range1) + }.flatten() } - fun getRange(indexRange: IntRange) = getRange(indexRange.first.toLong()..indexRange.last.toLong()) override fun serialize(): String { - val firstAndLast = if (firstVersion.getHash() == lastVersion.getHash()) { - firstVersion.getHashString() - } else { - firstVersion.getHashString() + Separators.LEVEL2 + lastVersion.getHashString() - } - val times = if (minTime == maxTime) { - minTime.epochSeconds.toString() - } else { - "${minTime.epochSeconds}${Separators.LEVEL2}${maxTime.epochSeconds}" - } - return firstAndLast + + return "R" + + Separators.LEVEL1 + firstVersion.getHashString() + Separators.LEVEL2 + lastVersion.getHashString() + Separators.LEVEL1 + authors.joinToString(Separators.LEVEL2) { it.urlEncode() } + Separators.LEVEL1 + size + Separators.LEVEL1 + height + - Separators.LEVEL1 + times + - (subranges?.let { Separators.LEVEL1 + it.first.getHashString() + Separators.LEVEL2 + it.second.getHashString() } ?: "") - } - - companion object : IObjectDeserializer { - - override fun deserialize( - serialized: String, - referenceFactory: IObjectReferenceFactory, - ): HistoryIndexNode { - val parts = serialized.split(Separators.LEVEL1) - val versionRefs = parts[0].split(Separators.LEVEL2) - .map { referenceFactory.fromHashString(it, CPVersion) } - val times = parts[4].split(Separators.LEVEL2).map { Instant.fromEpochSeconds(it.toLong()) } - return HistoryIndexNode( - firstVersion = versionRefs[0], - lastVersion = versionRefs.getOrElse(1) { versionRefs[0] }, - authors = parts[1].split(Separators.LEVEL2).mapNotNull { it.urlDecode() }.toSet(), - size = parts[2].toLong(), - height = parts[3].toLong(), - minTime = times[0], - maxTime = times.getOrElse(1) { times[0] }, - subranges = parts.getOrNull(5)?.split(Separators.LEVEL2) - ?.map { referenceFactory.fromHashString(it, HistoryIndexNode) } - ?.let { it[0] to it[1] }, - ) - } - - fun of(version: Object): HistoryIndexNode { - val time = CLVersion(version).getTimestamp() ?: Instant.Companion.fromEpochMilliseconds(0L) - return HistoryIndexNode( - firstVersion = version.ref, - lastVersion = version.ref, - authors = setOfNotNull(version.data.author), - size = 1, - height = 1L, - minTime = time, - maxTime = time, - subranges = null, - ) - } - - fun of(version1: Object, version2: Object): HistoryIndexNode { - val time1 = CLVersion(version1).getTimestamp() ?: Instant.Companion.fromEpochMilliseconds(0L) - val time2 = CLVersion(version2).getTimestamp() ?: Instant.Companion.fromEpochMilliseconds(0L) - return if (time1 <= time2) { - HistoryIndexNode( - firstVersion = version1.ref, - lastVersion = version2.ref, - authors = setOfNotNull(version1.data.author, version2.data.author), - size = 2, - height = 1L, - minTime = time1, - maxTime = time2, - subranges = null, - ) - } else { - of(version2, version1) - } - } - - fun merge(range1: Object?, range2: Object?): Object { - if (range1 == null) return requireNotNull(range2) - if (range2 == null) return range1 - if (range1.getHash() == range2.getHash()) return range1 - if (range2.data.maxTime < range1.data.minTime) return merge(range2, range1) - - val totalSize = range1.data.size + range2.data.size - - if (totalSize <= 2) return concat(range1, range2) - - if (range1.data.time.intersects(range2.data.time)) { - val split1 = range1.splitNow() - val split2 = range2.splitNow() - - if (split1.size == 1) { - if (split2.size == 1) { - TODO() - } else { - return merge(merge(split1[0], split2[0]), split2[1]) - } - } else { - if (split2.size == 1) { - return merge(split1[0], merge(split1[1], split2[0])) - } else { - return merge(split1[0], merge(merge(split1[1], split2[0]), split2[1])) - } - } - } - - // In a balanced tree, one subtree shouldn't be more than twice as big as the other. - val minSubtreeSize = totalSize / 3L - val maxSubtreeSize = totalSize - minSubtreeSize - val allowedSubtreeSizeRange = minSubtreeSize..maxSubtreeSize - - if (range1.data.size > maxSubtreeSize) { - val (range1A, range1B) = range1.splitLeft(allowedSubtreeSizeRange) - return merge(range1A, merge(range1B, range2)) - } - - if (range2.data.size > maxSubtreeSize) { - val (range2A, range2B) = range2.splitRight(allowedSubtreeSizeRange) - return merge(merge(range1, range2A), range2B) - } - - if (range1.data.size < minSubtreeSize) { - val (range2A, range2B) = range2.splitLeft(allowedSubtreeSizeRange.shift(-range1.data.size)) - return merge(merge(range1, range2A), range2B) - } - - if (range2.data.size < minSubtreeSize) { - val (range1A, range1B) = range1.splitRight(allowedSubtreeSizeRange.shift(-range2.data.size)) - return merge(range1A, merge(range1B, range2)) - } - - return concat(range1, range2) - } - - /** - * Just merges the two subtrees without any guarantees about the shape of the resulting tree. - */ - private fun concat(range1: Object, range2: Object): Object { - val subranges = if (range1.data.size == 1L && range2.data.size == 1L) { - null - } else { - range1.ref to range2.ref - } - require(range1.data.maxTime <= range2.data.minTime) { - "${range1.data.time} overlaps with ${range2.data.time}" - } - return HistoryIndexNode( - firstVersion = range1.data.firstVersion, - lastVersion = range2.data.lastVersion, - authors = range1.data.authors + range2.data.authors, - size = range1.data.size + range2.data.size, - height = if (subranges == null) 1L else max(range1.data.height, range2.data.height) + 1, - minTime = range1.data.minTime, - maxTime = range2.data.maxTime, - subranges = subranges, - ).let { - @OptIn(DelicateModelixApi::class) - it.asObject(range1.graph) - } - } + Separators.LEVEL1 + minTime.epochSeconds.toString() + Separators.LEVEL2 + maxTime.epochSeconds + + Separators.LEVEL1 + child1.getHashString() + + Separators.LEVEL1 + child2.getHashString() } } -private fun Object.splitNow(): List> { - return when (data.size) { - 1L -> listOf(this) - 2L -> listOf(of(this.firstVersion.resolveNow()).asObject(graph), of(this.lastVersion.resolveNow()).asObject(graph)) - else -> data.subranges!!.toList().map { it.resolveNow() } - } -} - -private fun LongRange.coerceAtLeast(limit: Long) = first.coerceAtLeast(limit)..last.coerceAtLeast(limit) -private fun LongRange.coerceAtMost(limit: Long) = first.coerceAtMost(limit)..last.coerceAtMost(limit) private fun LongRange.shift(amount: Long) = first.plus(amount)..last.plus(amount) private fun > ClosedRange.intersects(other: ClosedRange): Boolean { return this.contains(other.start) || this.contains(other.endInclusive) || @@ -362,47 +389,8 @@ private fun > ClosedRange.intersects(other: ClosedRange) val Object.size get() = data.size val Object.height get() = data.height -val Object.firstVersion get() = data.firstVersion -val Object.lastVersion get() = data.lastVersion -val Object.subranges get() = data.subranges -private fun Object.split(leftSize: LongRange, rightSize: LongRange): Pair?, Object?> { - require(leftSize.first + rightSize.last == size) - require(leftSize.last + rightSize.first == size) - if (size == 1L) { - if (leftSize.contains(1L)) return this to null - if (rightSize.contains(1L)) return null to this - error("Invalid constraints for size 1: $leftSize and $rightSize") - } - if (size == 2L) { - if (leftSize.contains(2L) && rightSize.contains(0L)) return this to null - if (leftSize.contains(0L) && rightSize.contains(2L)) return null to this - if (leftSize.contains(1L) && rightSize.contains(1L)) { - return of(firstVersion.resolveNow()).asObject(graph) to of(lastVersion.resolveNow()).asObject(graph) - } - error("Invalid constraints for size 2: $leftSize and $rightSize") - } - val subranges = this.subranges - requireNotNull(subranges) - val range1 = subranges.first.resolveNow() - val range2 = subranges.second.resolveNow() - if (leftSize.contains(range1.data.size) && rightSize.contains(range2.data.size)) { - return range1 to range2 - } - if (!leftSize.contains(range1.size)) { - val (range1A, range1B) = range1.splitRight(rightSize.shift(-range2.size).coerceAtLeast(0)) - return range1A to merge(range1B, range2) - } else { - val (range2A, range2B) = range2.splitLeft(leftSize.shift(-range1.size).coerceAtLeast(0)) - return merge(range1, range2A) to range2B - } -} -fun Object.splitRight(rightSize: LongRange) = split((size - rightSize.last)..(size - rightSize.first), rightSize) -fun Object.splitLeft(leftSize: LongRange) = split(leftSize, (size - leftSize.last)..(size - leftSize.first)) -fun Object?.merge(other: Object?) = merge(this, other) fun LongRange.size() = (last - first + 1).coerceAtLeast(0) -fun LongRange.withSize(newSize: Long) = first..(last.coerceAtMost(newSize - first - 1)) fun Long.rangeOfSize(size: Long) = this until (this + size) fun LongRange.intersect(other: LongRange): LongRange { return if (this.first > other.first) other.intersect(this) else other.first..min(this.last, other.last) } -fun LongRange.shiftFirstTo(newFirst: Long) = newFirst..(last + newFirst - first) From eafba2147dda5bb0869766c0219a88356e551f2a Mon Sep 17 00:00:00 2001 From: Sascha Lisson Date: Mon, 15 Sep 2025 11:09:29 +0200 Subject: [PATCH 4/7] feat: index for efficient history queries --- .../datastructures/model/HistoryIndexNode.kt | 67 ++++++++++++++----- .../src/commonTest/kotlin/HistoryIndexTest.kt | 10 +-- .../modelix/model/server/HistoryIndexTest.kt | 6 +- 3 files changed, 58 insertions(+), 25 deletions(-) diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt index 9c54abaa04..26fc630c5a 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt @@ -19,6 +19,7 @@ import org.modelix.model.persistent.Separators import org.modelix.streams.IStream import org.modelix.streams.flatten import org.modelix.streams.plus +import kotlin.math.abs import kotlin.math.max import kotlin.math.min import kotlin.time.Duration @@ -53,6 +54,7 @@ sealed class HistoryIndexNode : IObjectData { require(self.data === this) val other = otherObj.data require(maxTime < other.minTime) + require(abs(height - other.height) <= 2) return HistoryIndexRangeNode( firstVersion = firstVersion, lastVersion = other.lastVersion, @@ -108,7 +110,7 @@ sealed class HistoryIndexNode : IObjectData { } } - fun of(version: Object): HistoryIndexLeafNode { + fun of(version: Object): HistoryIndexNode { val time = CLVersion(version).getTimestamp() ?: Instant.Companion.fromEpochMilliseconds(0L) return HistoryIndexLeafNode( versions = listOf(version.ref), @@ -124,7 +126,7 @@ sealed class HistoryIndexNode : IObjectData { } fun Object.merge(otherObj: Object): Object = data.merge(this, otherObj) -fun Object.concat(otherObj: Object): Object = data.concat(this, otherObj) +fun Object.concatUnbalanced(otherObj: Object): Object = data.concat(this, otherObj) val Object.time get() = data.time data class HistoryIndexLeafNode( @@ -194,8 +196,8 @@ data class HistoryIndexLeafNode( return when (other) { is HistoryIndexLeafNode -> { when { - other.time < time -> otherObj.concat(self) - other.time > time -> self.concat(otherObj) + other.time < time -> otherObj.concatBalanced(self) + other.time > time -> self.concatBalanced(otherObj) else -> HistoryIndexLeafNode( versions = (versions.associateBy { it.getHash() } + other.versions.associateBy { it.getHash() }).values.toList(), authors = authors + other.authors, @@ -283,15 +285,15 @@ data class HistoryIndexRangeNode( val range1 = resolvedChild1.data.timeRange val range2 = resolvedChild2.data.timeRange return when { - other.time < range1.start -> otherObj.concat(resolvedChild1).concat(resolvedChild2) - other.time <= range1.endInclusive -> resolvedChild1.merge(otherObj).concat(resolvedChild2) + other.time < range1.start -> otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2) + other.time <= range1.endInclusive -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2) other.time < range2.start -> if (resolvedChild1.size <= resolvedChild2.size) { - resolvedChild1.concat(otherObj).concat(resolvedChild2) + resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2) } else { - resolvedChild1.concat(otherObj.concat(resolvedChild2)) + resolvedChild1.concatBalanced(otherObj.concatBalanced(resolvedChild2)) } - other.time <= range2.endInclusive -> resolvedChild1.concat(resolvedChild2.merge(otherObj)) - else -> resolvedChild1.concat(resolvedChild2.concat(otherObj)) + other.time <= range2.endInclusive -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj)) + else -> resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj)) } } is HistoryIndexRangeNode -> { @@ -303,27 +305,27 @@ data class HistoryIndexRangeNode( intersects1 && intersects2 -> { resolvedChild1.merge(otherObj).merge(resolvedChild2) } - intersects1 -> resolvedChild1.merge(otherObj).concat(resolvedChild2) - intersects2 -> resolvedChild1.concat(resolvedChild2.merge(otherObj)) + intersects1 -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2) + intersects2 -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj)) other.maxTime < range1.start -> { if (other.size < resolvedChild2.size) { - otherObj.concat(resolvedChild1).concat(resolvedChild2) + otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2) } else { - otherObj.concat(self) + otherObj.concatBalanced(self) } } other.maxTime < range2.start -> { if (resolvedChild2.size < resolvedChild1.size) { - resolvedChild1.concat(otherObj.concat(resolvedChild2)) + resolvedChild1.concatBalanced(otherObj.concatBalanced(resolvedChild2)) } else { - resolvedChild1.concat(otherObj).concat(resolvedChild2) + resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2) } } else -> { if (other.size < resolvedChild1.size) { - resolvedChild1.concat(resolvedChild2.concat(otherObj)) + resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj)) } else { - self.concat(otherObj) + self.concatBalanced(otherObj) } } } @@ -394,3 +396,32 @@ fun Long.rangeOfSize(size: Long) = this until (this + size) fun LongRange.intersect(other: LongRange): LongRange { return if (this.first > other.first) other.intersect(this) else other.first..min(this.last, other.last) } + +fun Object.rebalance(otherObj: Object): Pair, Object> { + if (otherObj.height > height + 1) { + val split1 = (otherObj.data as HistoryIndexRangeNode).child1.resolveNow() + val split2 = (otherObj.data as HistoryIndexRangeNode).child2.resolveNow() + val rebalanced = this.rebalance(split1) + if (rebalanced.first.height <= split2.height) { + return rebalanced.first.concatUnbalanced(rebalanced.second) to split2 + } else { + return rebalanced.first to rebalanced.second.concatUnbalanced(split2) + } + } else if (height > otherObj.height + 1) { + val split1 = (this.data as HistoryIndexRangeNode).child1.resolveNow() + val split2 = (this.data as HistoryIndexRangeNode).child2.resolveNow() + val rebalanced = split2.rebalance(otherObj) + if (rebalanced.second.height > split1.height) { + return split1.concatUnbalanced(rebalanced.first) to rebalanced.second + } else { + return split1 to rebalanced.first.concatUnbalanced(rebalanced.second) + } + } else { + return this to otherObj + } +} + +fun Object.concatBalanced(otherObj: Object): Object { + val rebalanced = this.rebalance(otherObj) + return rebalanced.first.concatUnbalanced(rebalanced.second) +} diff --git a/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt b/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt index d8e33f7714..3f58ceb3fb 100644 --- a/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt +++ b/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt @@ -56,7 +56,7 @@ class HistoryIndexTest { val version2 = newVersion(version1) val history = HistoryIndexNode.of(version1.obj, version2.obj) assertEquals(2, history.size) - assertEquals(1, history.height) + assertEquals(2, history.height) } @Test @@ -69,7 +69,7 @@ class HistoryIndexTest { val history2 = HistoryIndexNode.of(version3.obj).asObject(graph) val history = history1.merge(history2) assertEquals(3, history.size) - assertEquals(2, history.height) + assertEquals(3, history.height) } @Test @@ -83,7 +83,7 @@ class HistoryIndexTest { .merge(HistoryIndexNode.of(version3.obj).asObject(graph)) .merge(HistoryIndexNode.of(version4.obj).asObject(graph)) assertEquals(4, history.size) - assertEquals(3, history.height) + assertEquals(4, history.height) } @Test @@ -161,7 +161,7 @@ class HistoryIndexTest { versions.map { it.getObjectHash() }, history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), ) - assertEquals(12, history.height) + assertEquals(11, history.height) } @Test @@ -196,7 +196,7 @@ class HistoryIndexTest { versions.map { it.getObjectHash() }, history.data.getAllVersions().map { it.getHash() }.toList().getBlocking(graph), ) - assertEquals(10, history.height) + assertEquals(11, history.height) } @Test diff --git a/model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt index 8c8f255742..cd1b767c96 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt @@ -12,6 +12,7 @@ import org.modelix.model.server.handlers.IdsApiImpl import org.modelix.model.server.handlers.ModelReplicationServer import org.modelix.model.server.handlers.RepositoriesManager import org.modelix.model.server.store.InMemoryStoreClient +import kotlin.random.Random import kotlin.test.Test import kotlin.test.assertEquals import kotlin.time.Duration.Companion.seconds @@ -77,6 +78,7 @@ class HistoryIndexTest { @Test fun pagination_201_201() = runPaginationTest(201, 201) private fun runPaginationTest(skip: Int, limit: Int) = runTest { + val rand = Random(8923345) val modelClient: IModelClientV2 = createModelClient() val repositoryId = RepositoryId("repo1") val branchRef = repositoryId.getBranchReference() @@ -89,7 +91,7 @@ class HistoryIndexTest { .baseVersion(currentVersion) .tree(currentVersion.getModelTree()) .author("user1") - .time(currentVersion.getTimestamp()!! + 3.seconds) + .time(currentVersion.getTimestamp()!! + rand.nextInt(0, 3).seconds) .build() currentVersion = modelClient.push(branchRef, newVersion, currentVersion) } @@ -98,7 +100,7 @@ class HistoryIndexTest { .baseVersion(currentVersion) .tree(currentVersion.getModelTree()) .author("user2") - .time(currentVersion.getTimestamp()!! + 2.seconds) + .time(currentVersion.getTimestamp()!! + rand.nextInt(0, 3).seconds) .build() currentVersion = modelClient.push(branchRef, newVersion, currentVersion) } From eb61acaa840b24238756fe7b9d12dcfa5755afce Mon Sep 17 00:00:00 2001 From: Sascha Lisson Date: Mon, 15 Sep 2025 11:43:13 +0200 Subject: [PATCH 5/7] feat: index for efficient history queries --- .../datastructures/model/HistoryIndexNode.kt | 159 ++++++++++-------- .../src/commonTest/kotlin/HistoryIndexTest.kt | 19 ++- .../server/handlers/RepositoriesManager.kt | 4 +- 3 files changed, 102 insertions(+), 80 deletions(-) diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt index 26fc630c5a..000e0c35c9 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt @@ -18,6 +18,7 @@ import org.modelix.model.persistent.CPVersion import org.modelix.model.persistent.Separators import org.modelix.streams.IStream import org.modelix.streams.flatten +import org.modelix.streams.getBlocking import org.modelix.streams.plus import kotlin.math.abs import kotlin.math.max @@ -38,7 +39,7 @@ sealed class HistoryIndexNode : IObjectData { abstract fun getAllVersionsReversed(): IStream.Many> abstract fun getRange(indexRange: LongRange): IStream.Many fun getRange(indexRange: IntRange) = getRange(indexRange.first.toLong()..indexRange.last.toLong()) - abstract fun merge(self: Object, otherObj: Object): Object + abstract fun merge(self: Object, otherObj: Object): IStream.One> /** * Each returned node spans at most the duration specified in [interval]. @@ -120,12 +121,13 @@ sealed class HistoryIndexNode : IObjectData { } fun of(version1: Object, version2: Object): HistoryIndexNode { - return of(version1).asObject(version1.graph).merge(of(version2).asObject(version2.graph)).data + return of(version1).asObject(version1.graph).merge(of(version2).asObject(version2.graph)).getBlocking(version1.graph).data } } } -fun Object.merge(otherObj: Object): Object = data.merge(this, otherObj) +fun Object.merge(otherObj: Object) = data.merge(this, otherObj) +fun IStream.One>.merge(otherObj: Object) = flatMapOne { it.merge(otherObj) } fun Object.concatUnbalanced(otherObj: Object): Object = data.concat(this, otherObj) val Object.time get() = data.time @@ -191,7 +193,7 @@ data class HistoryIndexLeafNode( override fun merge( self: Object, otherObj: Object, - ): Object { + ): IStream.One> { val other = otherObj.data return when (other) { is HistoryIndexLeafNode -> { @@ -202,7 +204,7 @@ data class HistoryIndexLeafNode( versions = (versions.associateBy { it.getHash() } + other.versions.associateBy { it.getHash() }).values.toList(), authors = authors + other.authors, time = time, - ).asObject(self.graph) + ).asObject(self.graph).let { IStream.of(it) } } } is HistoryIndexRangeNode -> { @@ -275,62 +277,62 @@ data class HistoryIndexRangeNode( TODO("Not yet implemented") } - override fun merge(self: Object, otherObj: Object): Object { + override fun merge(self: Object, otherObj: Object): IStream.One> { val self = self as Object val other = otherObj.data - val resolvedChild1 = child1.resolveNow() - val resolvedChild2 = child2.resolveNow() - when (other) { - is HistoryIndexLeafNode -> { - val range1 = resolvedChild1.data.timeRange - val range2 = resolvedChild2.data.timeRange - return when { - other.time < range1.start -> otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2) - other.time <= range1.endInclusive -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2) - other.time < range2.start -> if (resolvedChild1.size <= resolvedChild2.size) { - resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2) - } else { - resolvedChild1.concatBalanced(otherObj.concatBalanced(resolvedChild2)) - } - other.time <= range2.endInclusive -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj)) - else -> resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj)) - } - } - is HistoryIndexRangeNode -> { - val range1 = resolvedChild1.data.timeRange - val range2 = resolvedChild2.data.timeRange - val intersects1 = other.timeRange.intersects(range1) - val intersects2 = other.timeRange.intersects(range2) - return when { - intersects1 && intersects2 -> { - resolvedChild1.merge(otherObj).merge(resolvedChild2) - } - intersects1 -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2) - intersects2 -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj)) - other.maxTime < range1.start -> { - if (other.size < resolvedChild2.size) { - otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2) + return child1.requestBoth(child2) { resolvedChild1, resolvedChild2 -> + when (other) { + is HistoryIndexLeafNode -> { + val range1 = resolvedChild1.data.timeRange + val range2 = resolvedChild2.data.timeRange + when { + other.time < range1.start -> otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2) + other.time <= range1.endInclusive -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2) + other.time < range2.start -> if (resolvedChild1.size <= resolvedChild2.size) { + resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2) } else { - otherObj.concatBalanced(self) - } - } - other.maxTime < range2.start -> { - if (resolvedChild2.size < resolvedChild1.size) { resolvedChild1.concatBalanced(otherObj.concatBalanced(resolvedChild2)) - } else { - resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2) } + other.time <= range2.endInclusive -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj)) + else -> resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj)) } - else -> { - if (other.size < resolvedChild1.size) { - resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj)) - } else { - self.concatBalanced(otherObj) + } + is HistoryIndexRangeNode -> { + val range1 = resolvedChild1.data.timeRange + val range2 = resolvedChild2.data.timeRange + val intersects1 = other.timeRange.intersects(range1) + val intersects2 = other.timeRange.intersects(range2) + when { + intersects1 && intersects2 -> { + resolvedChild1.merge(otherObj).merge(resolvedChild2) + } + intersects1 -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2) + intersects2 -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj)) + other.maxTime < range1.start -> { + if (other.size < resolvedChild2.size) { + otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2) + } else { + otherObj.concatBalanced(self) + } + } + other.maxTime < range2.start -> { + if (resolvedChild2.size < resolvedChild1.size) { + resolvedChild1.concatBalanced(otherObj.concatBalanced(resolvedChild2)) + } else { + resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2) + } + } + else -> { + if (other.size < resolvedChild1.size) { + resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj)) + } else { + self.concatBalanced(otherObj) + } } } } } - } + }.flatten() } override fun splitAtInterval(interval: Duration): IStream.Many { @@ -397,31 +399,42 @@ fun LongRange.intersect(other: LongRange): LongRange { return if (this.first > other.first) other.intersect(this) else other.first..min(this.last, other.last) } -fun Object.rebalance(otherObj: Object): Pair, Object> { - if (otherObj.height > height + 1) { - val split1 = (otherObj.data as HistoryIndexRangeNode).child1.resolveNow() - val split2 = (otherObj.data as HistoryIndexRangeNode).child2.resolveNow() - val rebalanced = this.rebalance(split1) - if (rebalanced.first.height <= split2.height) { - return rebalanced.first.concatUnbalanced(rebalanced.second) to split2 - } else { - return rebalanced.first to rebalanced.second.concatUnbalanced(split2) - } +private fun Object.rebalance(otherObj: Object): IStream.One, Object>> { + return if (otherObj.height > height + 1) { + (otherObj.data as HistoryIndexRangeNode).child1.requestBoth((otherObj.data as HistoryIndexRangeNode).child2) { split1, split2 -> + this.rebalance(split1).map { rebalanced -> + if (rebalanced.first.height <= split2.height) { + rebalanced.first.concatUnbalanced(rebalanced.second) to split2 + } else { + rebalanced.first to rebalanced.second.concatUnbalanced(split2) + } + } + }.flatten() } else if (height > otherObj.height + 1) { - val split1 = (this.data as HistoryIndexRangeNode).child1.resolveNow() - val split2 = (this.data as HistoryIndexRangeNode).child2.resolveNow() - val rebalanced = split2.rebalance(otherObj) - if (rebalanced.second.height > split1.height) { - return split1.concatUnbalanced(rebalanced.first) to rebalanced.second - } else { - return split1 to rebalanced.first.concatUnbalanced(rebalanced.second) - } + (this.data as HistoryIndexRangeNode).child1.requestBoth((this.data as HistoryIndexRangeNode).child2) { split1, split2 -> + split2.rebalance(otherObj).map { rebalanced -> + if (rebalanced.second.height > split1.height) { + split1.concatUnbalanced(rebalanced.first) to rebalanced.second + } else { + split1 to rebalanced.first.concatUnbalanced(rebalanced.second) + } + } + }.flatten() } else { - return this to otherObj + IStream.of(this to otherObj) + } +} + +private fun Object.concatBalanced(otherObj: Object): IStream.One> { + return this.rebalance(otherObj).map { rebalanced -> + rebalanced.first.concatUnbalanced(rebalanced.second) } } -fun Object.concatBalanced(otherObj: Object): Object { - val rebalanced = this.rebalance(otherObj) - return rebalanced.first.concatUnbalanced(rebalanced.second) +private fun IStream.One>.concatBalanced(otherObj: Object): IStream.One> { + return flatMapOne { it.concatBalanced(otherObj) } +} + +private fun Object.concatBalanced(otherObj: IStream.One>): IStream.One> { + return otherObj.flatMapOne { this.concatBalanced(it) } } diff --git a/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt b/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt index 3f58ceb3fb..556018769c 100644 --- a/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt +++ b/model-datastructure/src/commonTest/kotlin/HistoryIndexTest.kt @@ -67,7 +67,7 @@ class HistoryIndexTest { val graph = version1.graph val history1 = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) val history2 = HistoryIndexNode.of(version3.obj).asObject(graph) - val history = history1.merge(history2) + val history = history1.merge(history2).getBlocking(graph) assertEquals(3, history.size) assertEquals(3, history.height) } @@ -82,6 +82,7 @@ class HistoryIndexTest { val history = HistoryIndexNode.of(version1.obj, version2.obj).asObject(graph) .merge(HistoryIndexNode.of(version3.obj).asObject(graph)) .merge(HistoryIndexNode.of(version4.obj).asObject(graph)) + .getBlocking(graph) assertEquals(4, history.size) assertEquals(4, history.height) } @@ -98,6 +99,7 @@ class HistoryIndexTest { .merge(HistoryIndexNode.of(version3.obj).asObject(graph)) .merge(HistoryIndexNode.of(version4.obj).asObject(graph)) .merge(HistoryIndexNode.of(version5.obj).asObject(graph)) + .getBlocking(graph) assertEquals(5, history.size) assertEquals(4, history.height) } @@ -133,10 +135,12 @@ class HistoryIndexTest { .merge(HistoryIndexNode.of(version4b.obj).asObject(graph)) .merge(HistoryIndexNode.of(version5b.obj).asObject(graph)) .merge(HistoryIndexNode.of(version6b.obj).asObject(graph)) + .getBlocking(graph) val history = historyA .merge(historyB) .merge(HistoryIndexNode.of(version7.obj).asObject(graph)) .merge(HistoryIndexNode.of(version8.obj).asObject(graph)) + .getBlocking(graph) assertEquals( listOf(version1, version2, version3a, version3b, version4a, version4b, version5a, version5b, version6b, version7, version8).map { it.getObjectHash() }, @@ -154,7 +158,7 @@ class HistoryIndexTest { } val graph = versions.first().graph val history = versions.drop(1).fold(HistoryIndexNode.of(versions.first().asObject()).asObject(graph)) { acc, it -> - acc.merge(HistoryIndexNode.of(it.asObject()).asObject(graph)) + acc.merge(HistoryIndexNode.of(it.asObject()).asObject(graph)).getBlocking(graph) } assertEquals(versions.size.toLong(), history.size) assertEquals( @@ -172,7 +176,7 @@ class HistoryIndexTest { } val graph = versions.first().graph val history = versions.drop(1).shuffled(Random(78234554)).fold(HistoryIndexNode.of(versions.first().asObject()).asObject(graph)) { acc, it -> - acc.merge(HistoryIndexNode.of(it.asObject()).asObject(graph)) + acc.merge(HistoryIndexNode.of(it.asObject()).asObject(graph)).getBlocking(graph) } assertEquals(versions.size.toLong(), history.size) assertEquals( @@ -217,8 +221,13 @@ class HistoryIndexTest { } private fun buildHistory(versions: List): Object { - if (versions.size == 1) return HistoryIndexNode.of(versions.single().obj).asObject(versions.single().graph) + val graph = versions.first().graph + if (versions.size == 1) { + return HistoryIndexNode.of(versions.single().obj).asObject(graph) + } val centerIndex = versions.size / 2 - return buildHistory(versions.subList(0, centerIndex)).merge(buildHistory(versions.subList(centerIndex, versions.size))) + return buildHistory(versions.subList(0, centerIndex)) + .merge(buildHistory(versions.subList(centerIndex, versions.size))) + .getBlocking(graph) } } diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index a1cd05bb70..b6f23b5b54 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -416,8 +416,8 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager { val newElement = HistoryIndexNode.of(version.obj).asObject(graph) val newIndex = when (parentIndices.size) { 0 -> newElement - 1 -> parentIndices.single().merge(newElement) - 2 -> parentIndices[0].merge(parentIndices[1]).merge(newElement) + 1 -> parentIndices.single().merge(newElement).getBlocking(graph) + 2 -> parentIndices[0].merge(parentIndices[1]).merge(newElement).getBlocking(graph) else -> error("impossible") } newIndex.write() From b4ee5d1625a18649abf78129da41f41ee756ce5b Mon Sep 17 00:00:00 2001 From: Sascha Lisson Date: Mon, 15 Sep 2025 11:46:24 +0200 Subject: [PATCH 6/7] feat: index for efficient history queries --- .../kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt index 000e0c35c9..5f50e85196 100644 --- a/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt +++ b/model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt @@ -201,7 +201,7 @@ data class HistoryIndexLeafNode( other.time < time -> otherObj.concatBalanced(self) other.time > time -> self.concatBalanced(otherObj) else -> HistoryIndexLeafNode( - versions = (versions.associateBy { it.getHash() } + other.versions.associateBy { it.getHash() }).values.toList(), + versions = (versions + other.versions).distinctBy { it.getHash() }.toList(), authors = authors + other.authors, time = time, ).asObject(self.graph).let { IStream.of(it) } From 0ab35546be818206231426a71b7555200eb1a5f0 Mon Sep 17 00:00:00 2001 From: Bastian Kruck Date: Tue, 16 Sep 2025 16:02:09 +0200 Subject: [PATCH 7/7] feat(model-client): add getHistoryRangeForBranch to ClientJS --- .../org/modelix/model/client2/ClientJS.kt | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/model-client/src/jsMain/kotlin/org/modelix/model/client2/ClientJS.kt b/model-client/src/jsMain/kotlin/org/modelix/model/client2/ClientJS.kt index ed8e35236d..074b73ddfd 100644 --- a/model-client/src/jsMain/kotlin/org/modelix/model/client2/ClientJS.kt +++ b/model-client/src/jsMain/kotlin/org/modelix/model/client2/ClientJS.kt @@ -8,13 +8,16 @@ import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.await import kotlinx.coroutines.promise +import kotlinx.datetime.toJSDate import org.modelix.datastructures.model.IGenericModelTree +import org.modelix.datastructures.objects.ObjectHash import org.modelix.model.TreeId import org.modelix.model.api.INode import org.modelix.model.api.INodeReference import org.modelix.model.api.JSNodeConverter import org.modelix.model.client.IdGenerator import org.modelix.model.data.ModelData +import org.modelix.model.lazy.CLVersion import org.modelix.model.lazy.RepositoryId import org.modelix.model.lazy.createObjectStoreCache import org.modelix.model.mutable.DummyIdGenerator @@ -111,6 +114,9 @@ interface ClientJS { */ fun initRepository(repositoryId: String, useRoleIds: Boolean = true): Promise + fun getHistoryRangeForBranch(repositoryId: String, branchId: String, skip: Int, limit: Int): Promise> + fun getHistoryRange(repositoryId: String, headVersion: String, skip: Int, limit: Int): Promise> + /** * Fetch existing branches for a given repository from the model server. * @@ -192,6 +198,30 @@ internal class ClientJSImpl(private val modelClient: ModelClientV2) : ClientJS { } } + override fun getHistoryRangeForBranch(repositoryId: String, branchId: String, skip: Int, limit: Int) = + GlobalScope.promise { modelClient.pullHash(RepositoryId(repositoryId).getBranchReference(branchId)) } + .then { getHistoryRange(repositoryId, it, skip, limit) } + .then { it } + + override fun getHistoryRange(repositoryId: String, headVersion: String, skip: Int, limit: Int) = + GlobalScope.promise { + modelClient.getHistoryRange( + RepositoryId(repositoryId), + ObjectHash(headVersion), + skip.toLong(), + limit.toLong(), + ) + .filterIsInstance() + .map { + VersionInformationJS( + it.author, + it.getTimestamp()?.toJSDate(), + it.getObjectHash().toString(), + ) + } + .toTypedArray() + } + override fun dispose() { modelClient.close() }