Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,17 @@ class AsyncNode(

override fun getDescendants(includeSelf: Boolean): IStream.Many<IAsyncNode> {
return if (includeSelf) {
getStreamExecutor()
IStream.of(IStream.of(this), getDescendants(false)).flatten()
} else {
getAllChildren().flatMap { it.getDescendants(true) }
getAllChildren().flatMapOrdered { it.getDescendants(true) }
}
}

override fun getDescendantsUnordered(includeSelf: Boolean): IStream.Many<IAsyncNode> {
return if (includeSelf) {
IStream.of(IStream.of(this), getDescendantsUnordered(false)).flatten()
} else {
getAllChildren().flatMapUnordered { it.getDescendantsUnordered(true) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ interface IAsyncNode : IStreamExecutorProvider {
fun getAllReferenceTargets(): IStream.Many<Pair<IReferenceLinkReference, IAsyncNode>>

fun getDescendants(includeSelf: Boolean): IStream.Many<IAsyncNode>
fun getDescendantsUnordered(includeSelf: Boolean): IStream.Many<IAsyncNode> = getDescendants(includeSelf)
}

interface INodeWithAsyncSupport : INode {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.modelix.model.client2

import io.ktor.http.Url
import org.modelix.datastructures.objects.ObjectHash
import org.modelix.kotlin.utils.DeprecationInfo
import org.modelix.model.IVersion
import org.modelix.model.ObjectDeltaFilter
Expand All @@ -11,6 +12,7 @@ import org.modelix.model.lazy.BranchReference
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.server.api.RepositoryConfig
import org.modelix.modelql.core.IMonoStep
import kotlin.time.Duration

/**
* This interface is meant exclusively for model client usage.
Expand Down Expand Up @@ -105,4 +107,41 @@ interface IModelClientV2 {
suspend fun <R> query(repositoryId: RepositoryId, versionHash: String, body: (IMonoStep<INode>) -> IMonoStep<R>): R

fun getFrontendUrl(branch: BranchReference): Url

/**
* @param headVersion starting point for history computations. For a paginated view this value should be the same and
* the value for [skip] should be incremented instead. Only then its guaranteed that the returned list is
* complete.
* @param skip for a paginated view of the history
* @param limit maximum size of the returned list
* @param interval splits the timeline into equally sized intervals and returns only the last version of each interval
*/
suspend fun getHistory(
repositoryId: RepositoryId,
headVersion: ObjectHash,
skip: Int = 0,
limit: Int = 1000,
interval: Duration?,
): HistoryResponse

suspend fun getHistoryRange(
repositoryId: RepositoryId,
headVersion: ObjectHash,
skip: Long = 0L,
limit: Long = 1000L,
): List<IVersion>
}

data class HistoryResponse(
val entries: List<HistoryEntry>,
val nextVersions: List<ObjectHash>,
)

data class HistoryEntry(
val firstVersionHash: ObjectHash,
val lastVersionHash: ObjectHash,
val minTime: Long?,
val maxTime: Long?,
val authors: Set<String>,
// val headOfBranch: Set<BranchReference>,
)
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,12 @@ class ModelClientGraph(
hash: ObjectHash,
data: T,
): ObjectReference<T> {
// Should never be called, because all deserialization happens internally.
throw UnsupportedOperationException()
if (config.lazyLoadingEnabled) {
return ObjectReferenceImpl(this, hash, data)
} else {
// Should never be called, because all deserialization happens internally.
throw UnsupportedOperationException()
}
}

@Synchronized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.modelix.datastructures.model.HistoryIndexNode
import org.modelix.datastructures.objects.IObjectGraph
import org.modelix.datastructures.objects.Object
import org.modelix.datastructures.objects.ObjectHash
import org.modelix.kotlin.utils.DeprecationInfo
import org.modelix.kotlin.utils.WeakValueMap
Expand Down Expand Up @@ -89,6 +91,8 @@ import org.modelix.model.server.api.v2.toMap
import org.modelix.modelql.client.ModelQLClient
import org.modelix.modelql.core.IMonoStep
import org.modelix.streams.IExecutableStream
import org.modelix.streams.getSuspending
import org.modelix.streams.iterateSuspending
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

Expand Down Expand Up @@ -328,6 +332,106 @@ class ModelClientV2(
}
}

override suspend fun getHistory(
repositoryId: RepositoryId,
headVersion: ObjectHash,
skip: Int,
limit: Int,
interval: Duration?,
): HistoryResponse {
val index: Object<HistoryIndexNode> = getHistoryIndex(repositoryId, headVersion)
val entries = if (interval != null) {
val mergedEntries = ArrayList<HistoryEntry>()
var previousIntervalId: Long = Long.MAX_VALUE
try {
index.data.splitAtInterval(interval).iterateSuspending(index.graph) {
val intervalId = it.maxTime.toEpochMilliseconds() / interval.inWholeMilliseconds
require(intervalId <= previousIntervalId)
if (intervalId == previousIntervalId) {
val entry = mergedEntries[mergedEntries.lastIndex]
mergedEntries[mergedEntries.lastIndex] = HistoryEntry(
firstVersionHash = it.firstVersion.getHash(),
lastVersionHash = entry.lastVersionHash,
minTime = it.minTime.epochSeconds,
maxTime = entry.maxTime,
authors = entry.authors + it.authors,
)
} else {
if (mergedEntries.size >= limit) throw LimitedReached()
previousIntervalId = intervalId
mergedEntries += HistoryEntry(
firstVersionHash = it.firstVersion.getHash(),
lastVersionHash = it.lastVersion.getHash(),
minTime = it.minTime.epochSeconds,
maxTime = it.maxTime.epochSeconds,
authors = it.authors,
)
}
}
} catch (ex: LimitedReached) {
// Expected exception used for exiting the iterateSuspending call
}
mergedEntries
} else {
index.data.getAllVersionsReversed().flatMapOrdered { it.resolve() }.map { CLVersion(it) }
.map {
val hash = it.getObjectHash()
val time = it.getTimestamp()?.epochSeconds
HistoryEntry(
firstVersionHash = hash,
lastVersionHash = hash,
minTime = time,
maxTime = time,
authors = setOfNotNull(it.author),
)
}
.take(limit)
.toList()
.getSuspending(index.graph)
}
return HistoryResponse(entries = entries, nextVersions = emptyList())
}

override suspend fun getHistoryRange(
repositoryId: RepositoryId,
headVersion: ObjectHash,
skip: Long,
limit: Long,
): List<IVersion> {
val index: Object<HistoryIndexNode> = getHistoryIndex(repositoryId, headVersion)
return index.data.getRange(skip until (limit + skip))
.flatMapOrdered { it.getAllVersionsReversed() }
.flatMapOrdered { it.resolve() }
.map { CLVersion(it) }
.toList()
.getSuspending(index.graph)
}

suspend fun getHistoryIndex(
repositoryId: RepositoryId?,
versionHash: ObjectHash,
): Object<HistoryIndexNode> {
return httpClient.prepareGet {
url {
takeFrom(baseUrl)
if (repositoryId == null) {
appendPathSegments("versions", versionHash.toString())
} else {
appendPathSegments("repositories", repositoryId.id, "versions", versionHash.toString())
}
appendPathSegments("history-index")
}
}.execute { response ->
val graph = getObjectGraph(repositoryId).also { it.config = it.config.copy(lazyLoadingEnabled = true) }
val text = response.bodyAsText()
val hashString = text.substringBefore('\n')
val serialized = text.substringAfter('\n')
val deserialized = HistoryIndexNode.deserialize(serialized, graph)
val ref = graph.fromDeserialized(ObjectHash(hashString), deserialized)
Object(deserialized, ref)
}
}

override suspend fun loadVersion(
repositoryId: RepositoryId,
versionHash: String,
Expand Down Expand Up @@ -968,3 +1072,5 @@ fun IVersion.runWrite(idGenerator: IIdGenerator, author: String?, body: (IBranch
}

private fun String.ensureSuffix(suffix: String) = if (endsWith(suffix)) this else this + suffix

private class LimitedReached : RuntimeException("limited reached")
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.await
import kotlinx.coroutines.promise
import kotlinx.datetime.toJSDate
import org.modelix.datastructures.model.IGenericModelTree
import org.modelix.datastructures.objects.ObjectHash
import org.modelix.model.TreeId
import org.modelix.model.api.INode
import org.modelix.model.api.INodeReference
import org.modelix.model.api.JSNodeConverter
import org.modelix.model.client.IdGenerator
import org.modelix.model.data.ModelData
import org.modelix.model.lazy.CLVersion
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.lazy.createObjectStoreCache
import org.modelix.model.mutable.DummyIdGenerator
Expand Down Expand Up @@ -111,6 +114,9 @@ interface ClientJS {
*/
fun initRepository(repositoryId: String, useRoleIds: Boolean = true): Promise<Unit>

fun getHistoryRangeForBranch(repositoryId: String, branchId: String, skip: Int, limit: Int): Promise<Array<VersionInformationJS>>
fun getHistoryRange(repositoryId: String, headVersion: String, skip: Int, limit: Int): Promise<Array<VersionInformationJS>>

/**
* Fetch existing branches for a given repository from the model server.
*
Expand Down Expand Up @@ -192,6 +198,30 @@ internal class ClientJSImpl(private val modelClient: ModelClientV2) : ClientJS {
}
}

override fun getHistoryRangeForBranch(repositoryId: String, branchId: String, skip: Int, limit: Int) =
GlobalScope.promise { modelClient.pullHash(RepositoryId(repositoryId).getBranchReference(branchId)) }
.then { getHistoryRange(repositoryId, it, skip, limit) }
.then { it }

override fun getHistoryRange(repositoryId: String, headVersion: String, skip: Int, limit: Int) =
GlobalScope.promise {
modelClient.getHistoryRange(
RepositoryId(repositoryId),
ObjectHash(headVersion),
skip.toLong(),
limit.toLong(),
)
.filterIsInstance<CLVersion>()
.map {
VersionInformationJS(
it.author,
it.getTimestamp()?.toJSDate(),
it.getObjectHash().toString(),
)
}
.toTypedArray()
}

override fun dispose() {
modelClient.close()
}
Expand Down
Loading
Loading