Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,17 @@ class AsyncNode(

override fun getDescendants(includeSelf: Boolean): IStream.Many<IAsyncNode> {
return if (includeSelf) {
getStreamExecutor()
IStream.of(IStream.of(this), getDescendants(false)).flatten()
} else {
getAllChildren().flatMap { it.getDescendants(true) }
getAllChildren().flatMapOrdered { it.getDescendants(true) }
}
}

override fun getDescendantsUnordered(includeSelf: Boolean): IStream.Many<IAsyncNode> {
return if (includeSelf) {
IStream.of(IStream.of(this), getDescendantsUnordered(false)).flatten()
} else {
getAllChildren().flatMapUnordered { it.getDescendantsUnordered(true) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ interface IAsyncNode : IStreamExecutorProvider {
fun getAllReferenceTargets(): IStream.Many<Pair<IReferenceLinkReference, IAsyncNode>>

fun getDescendants(includeSelf: Boolean): IStream.Many<IAsyncNode>
fun getDescendantsUnordered(includeSelf: Boolean): IStream.Many<IAsyncNode> = getDescendants(includeSelf)
}

interface INodeWithAsyncSupport : INode {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.modelix.model.client2

import io.ktor.http.Url
import org.modelix.datastructures.objects.ObjectHash
import org.modelix.kotlin.utils.DeprecationInfo
import org.modelix.model.IVersion
import org.modelix.model.ObjectDeltaFilter
Expand All @@ -11,6 +12,7 @@ import org.modelix.model.lazy.BranchReference
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.server.api.RepositoryConfig
import org.modelix.modelql.core.IMonoStep
import kotlin.time.Duration

/**
* This interface is meant exclusively for model client usage.
Expand Down Expand Up @@ -105,4 +107,41 @@ interface IModelClientV2 {
suspend fun <R> query(repositoryId: RepositoryId, versionHash: String, body: (IMonoStep<INode>) -> IMonoStep<R>): R

fun getFrontendUrl(branch: BranchReference): Url

/**
* @param headVersion starting point for history computations. For a paginated view this value should be the same and
* the value for [skip] should be incremented instead. Only then its guaranteed that the returned list is
* complete.
* @param skip for a paginated view of the history
* @param limit maximum size of the returned list
* @param interval splits the timeline into equally sized intervals and returns only the last version of each interval
*/
suspend fun getHistory(
repositoryId: RepositoryId,
headVersion: ObjectHash,
skip: Int = 0,
limit: Int = 1000,
interval: Duration?,
): HistoryResponse

suspend fun getHistoryRange(
repositoryId: RepositoryId,
headVersion: ObjectHash,
skip: Long = 0L,
limit: Long = 1000L,
): List<IVersion>
}

data class HistoryResponse(
val entries: List<HistoryEntry>,
val nextVersions: List<ObjectHash>,
)

data class HistoryEntry(
val firstVersionHash: ObjectHash,
val lastVersionHash: ObjectHash,
val minTime: Long?,
val maxTime: Long?,
val authors: Set<String>,
// val headOfBranch: Set<BranchReference>,
)
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,12 @@ class ModelClientGraph(
hash: ObjectHash,
data: T,
): ObjectReference<T> {
// Should never be called, because all deserialization happens internally.
throw UnsupportedOperationException()
if (config.lazyLoadingEnabled) {
return ObjectReferenceImpl(this, hash, data)
} else {
// Should never be called, because all deserialization happens internally.
throw UnsupportedOperationException()
}
}

@Synchronized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.modelix.datastructures.model.HistoryIndexNode
import org.modelix.datastructures.objects.IObjectGraph
import org.modelix.datastructures.objects.Object
import org.modelix.datastructures.objects.ObjectHash
import org.modelix.kotlin.utils.DeprecationInfo
import org.modelix.kotlin.utils.WeakValueMap
Expand Down Expand Up @@ -89,6 +91,8 @@ import org.modelix.model.server.api.v2.toMap
import org.modelix.modelql.client.ModelQLClient
import org.modelix.modelql.core.IMonoStep
import org.modelix.streams.IExecutableStream
import org.modelix.streams.getSuspending
import org.modelix.streams.iterateSuspending
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

Expand Down Expand Up @@ -328,6 +332,106 @@ class ModelClientV2(
}
}

override suspend fun getHistory(
repositoryId: RepositoryId,
headVersion: ObjectHash,
skip: Int,
limit: Int,
interval: Duration?,
): HistoryResponse {
val index: Object<HistoryIndexNode> = getHistoryIndex(repositoryId, headVersion)
val entries = if (interval != null) {
val mergedEntries = ArrayList<HistoryEntry>()
var previousIntervalId: Long = Long.MAX_VALUE
try {
index.data.splitAtInterval(interval).iterateSuspending(index.graph) {
val intervalId = it.maxTime.toEpochMilliseconds() / interval.inWholeMilliseconds
require(intervalId <= previousIntervalId)
if (intervalId == previousIntervalId) {
val entry = mergedEntries[mergedEntries.lastIndex]
mergedEntries[mergedEntries.lastIndex] = HistoryEntry(
firstVersionHash = it.firstVersion.getHash(),
lastVersionHash = entry.lastVersionHash,
minTime = it.minTime.epochSeconds,
maxTime = entry.maxTime,
authors = entry.authors + it.authors,
)
} else {
if (mergedEntries.size >= limit) throw LimitedReached()
previousIntervalId = intervalId
mergedEntries += HistoryEntry(
firstVersionHash = it.firstVersion.getHash(),
lastVersionHash = it.lastVersion.getHash(),
minTime = it.minTime.epochSeconds,
maxTime = it.maxTime.epochSeconds,
authors = it.authors,
)
}
}
} catch (ex: LimitedReached) {
// Expected exception used for exiting the iterateSuspending call
}
mergedEntries
} else {
index.data.getAllVersionsReversed().flatMapOrdered { it.resolve() }.map { CLVersion(it) }
.map {
val hash = it.getObjectHash()
val time = it.getTimestamp()?.epochSeconds
HistoryEntry(
firstVersionHash = hash,
lastVersionHash = hash,
minTime = time,
maxTime = time,
authors = setOfNotNull(it.author),
)
}
.take(limit)
.toList()
.getSuspending(index.graph)
}
return HistoryResponse(entries = entries, nextVersions = emptyList())
}

override suspend fun getHistoryRange(
repositoryId: RepositoryId,
headVersion: ObjectHash,
skip: Long,
limit: Long,
): List<IVersion> {
val index: Object<HistoryIndexNode> = getHistoryIndex(repositoryId, headVersion)
return index.data.getRange(skip until (limit + skip))
.flatMapOrdered { it.getAllVersionsReversed() }
.flatMapOrdered { it.resolve() }
.map { CLVersion(it) }
.toList()
.getSuspending(index.graph)
}

suspend fun getHistoryIndex(
repositoryId: RepositoryId?,
versionHash: ObjectHash,
): Object<HistoryIndexNode> {
return httpClient.prepareGet {
url {
takeFrom(baseUrl)
if (repositoryId == null) {
appendPathSegments("versions", versionHash.toString())
} else {
appendPathSegments("repositories", repositoryId.id, "versions", versionHash.toString())
}
appendPathSegments("history-index")
}
}.execute { response ->
val graph = getObjectGraph(repositoryId).also { it.config = it.config.copy(lazyLoadingEnabled = true) }
val text = response.bodyAsText()
val hashString = text.substringBefore('\n')
val serialized = text.substringAfter('\n')
val deserialized = HistoryIndexNode.deserialize(serialized, graph)
val ref = graph.fromDeserialized(ObjectHash(hashString), deserialized)
Object(deserialized, ref)
}
}

override suspend fun loadVersion(
repositoryId: RepositoryId,
versionHash: String,
Expand Down Expand Up @@ -968,3 +1072,5 @@ fun IVersion.runWrite(idGenerator: IIdGenerator, author: String?, body: (IBranch
}

private fun String.ensureSuffix(suffix: String) = if (endsWith(suffix)) this else this + suffix

private class LimitedReached : RuntimeException("limited reached")
Loading
Loading