Skip to content

Commit 49de28f

Browse files
committed
feat(model-client): method for efficiently querying coarse grained history intervals
1 parent 412ad33 commit 49de28f

File tree

7 files changed

+202
-75
lines changed

7 files changed

+202
-75
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.modelix.model.client2
22

33
import io.ktor.http.Url
4+
import kotlinx.datetime.Instant
45
import org.modelix.datastructures.objects.ObjectHash
56
import org.modelix.kotlin.utils.DeprecationInfo
67
import org.modelix.model.IVersion
@@ -109,20 +110,18 @@ interface IModelClientV2 {
109110
fun getFrontendUrl(branch: BranchReference): Url
110111

111112
/**
112-
* @param headVersion starting point for history computations. For a paginated view this value should be the same and
113-
* the value for [skip] should be incremented instead. Only then its guaranteed that the returned list is
114-
* complete.
115-
* @param skip for a paginated view of the history
116-
* @param limit maximum size of the returned list
117-
* @param interval splits the timeline into equally sized intervals and returns only the last version of each interval
113+
* @param headVersion starting point for history computations. For a paginated view this value should be the same
114+
* and the value for [skip] should be incremented instead. Only then it's guaranteed that the returned list
115+
* is complete.
116+
* @param timeRange return only intervals in this time range
117+
* @param interval splits the timeline into equally sized intervals and returns a summary of the contained versions
118118
*/
119-
suspend fun getHistory(
119+
suspend fun getHistoryIntervals(
120120
repositoryId: RepositoryId,
121121
headVersion: ObjectHash,
122-
skip: Int = 0,
123-
limit: Int = 1000,
124-
interval: Duration?,
125-
): HistoryResponse
122+
timeRange: ClosedRange<Instant>?,
123+
interval: Duration,
124+
): List<HistoryInterval>
126125

127126
suspend fun getHistoryRange(
128127
repositoryId: RepositoryId,
@@ -133,15 +132,16 @@ interface IModelClientV2 {
133132
}
134133

135134
data class HistoryResponse(
136-
val entries: List<HistoryEntry>,
135+
val entries: List<HistoryInterval>,
137136
val nextVersions: List<ObjectHash>,
138137
)
139138

140-
data class HistoryEntry(
139+
data class HistoryInterval(
141140
val firstVersionHash: ObjectHash,
142141
val lastVersionHash: ObjectHash,
143-
val minTime: Long?,
144-
val maxTime: Long?,
142+
val size: Long,
143+
val minTime: Instant,
144+
val maxTime: Instant,
145145
val authors: Set<String>,
146146
// val headOfBranch: Set<BranchReference>,
147147
)

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 34 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import kotlinx.coroutines.flow.emptyFlow
3636
import kotlinx.coroutines.flow.flow
3737
import kotlinx.coroutines.flow.map
3838
import kotlinx.coroutines.launch
39+
import kotlinx.datetime.Instant
3940
import mu.KotlinLogging
4041
import org.modelix.datastructures.model.HistoryIndexNode
4142
import org.modelix.datastructures.objects.IObjectGraph
@@ -332,64 +333,43 @@ class ModelClientV2(
332333
}
333334
}
334335

335-
override suspend fun getHistory(
336+
override suspend fun getHistoryIntervals(
336337
repositoryId: RepositoryId,
337338
headVersion: ObjectHash,
338-
skip: Int,
339-
limit: Int,
340-
interval: Duration?,
341-
): HistoryResponse {
339+
timeRange: ClosedRange<Instant>?,
340+
interval: Duration,
341+
): List<HistoryInterval> {
342342
val index: Object<HistoryIndexNode> = getHistoryIndex(repositoryId, headVersion)
343-
val entries = if (interval != null) {
344-
val mergedEntries = ArrayList<HistoryEntry>()
345-
var previousIntervalId: Long = Long.MAX_VALUE
346-
try {
347-
index.data.splitAtInterval(interval).iterateSuspending(index.graph) {
348-
val intervalId = it.maxTime.toEpochMilliseconds() / interval.inWholeMilliseconds
349-
require(intervalId <= previousIntervalId)
350-
if (intervalId == previousIntervalId) {
351-
val entry = mergedEntries[mergedEntries.lastIndex]
352-
mergedEntries[mergedEntries.lastIndex] = HistoryEntry(
353-
firstVersionHash = it.firstVersion.getHash(),
354-
lastVersionHash = entry.lastVersionHash,
355-
minTime = it.minTime.epochSeconds,
356-
maxTime = entry.maxTime,
357-
authors = entry.authors + it.authors,
358-
)
359-
} else {
360-
if (mergedEntries.size >= limit) throw LimitedReached()
361-
previousIntervalId = intervalId
362-
mergedEntries += HistoryEntry(
363-
firstVersionHash = it.firstVersion.getHash(),
364-
lastVersionHash = it.lastVersion.getHash(),
365-
minTime = it.minTime.epochSeconds,
366-
maxTime = it.maxTime.epochSeconds,
367-
authors = it.authors,
368-
)
369-
}
370-
}
371-
} catch (ex: LimitedReached) {
372-
// Expected exception used for exiting the iterateSuspending call
343+
val mergedEntries = ArrayList<HistoryInterval>()
344+
var previousIntervalId: Long = Long.MAX_VALUE
345+
346+
index.data.splitAtInterval(interval, timeRange).iterateSuspending(index.graph) {
347+
val intervalId = it.maxTime.epochSeconds / interval.inWholeSeconds
348+
check(intervalId <= previousIntervalId)
349+
if (intervalId == previousIntervalId) {
350+
val entry = mergedEntries[mergedEntries.lastIndex]
351+
mergedEntries[mergedEntries.lastIndex] = HistoryInterval(
352+
firstVersionHash = it.firstVersion.getHash(),
353+
lastVersionHash = entry.lastVersionHash,
354+
size = entry.size + it.size,
355+
minTime = minOf(entry.minTime, it.minTime),
356+
maxTime = maxOf(entry.maxTime, it.maxTime),
357+
authors = entry.authors + it.authors,
358+
)
359+
} else {
360+
previousIntervalId = intervalId
361+
mergedEntries += HistoryInterval(
362+
firstVersionHash = it.firstVersion.getHash(),
363+
lastVersionHash = it.lastVersion.getHash(),
364+
size = it.size,
365+
minTime = it.minTime,
366+
maxTime = it.maxTime,
367+
authors = it.authors,
368+
)
373369
}
374-
mergedEntries
375-
} else {
376-
index.data.getAllVersionsReversed().flatMapOrdered { it.resolve() }.map { CLVersion(it) }
377-
.map {
378-
val hash = it.getObjectHash()
379-
val time = it.getTimestamp()?.epochSeconds
380-
HistoryEntry(
381-
firstVersionHash = hash,
382-
lastVersionHash = hash,
383-
minTime = time,
384-
maxTime = time,
385-
authors = setOfNotNull(it.author),
386-
)
387-
}
388-
.take(limit)
389-
.toList()
390-
.getSuspending(index.graph)
391370
}
392-
return HistoryResponse(entries = entries, nextVersions = emptyList())
371+
372+
return mergedEntries
393373
}
394374

395375
override suspend fun getHistoryRange(
@@ -1073,4 +1053,4 @@ fun IVersion.runWrite(idGenerator: IIdGenerator, author: String?, body: (IBranch
10731053

10741054
private fun String.ensureSuffix(suffix: String) = if (endsWith(suffix)) this else this + suffix
10751055

1076-
private class LimitedReached : RuntimeException("limited reached")
1056+
private class LimitReached : RuntimeException("limit reached")

model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/HistoryIndexNode.kt

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ sealed class HistoryIndexNode : IObjectData {
4646
* For the same interval multiple nodes may be returned.
4747
* Latest entry is returned first.
4848
*/
49-
abstract fun splitAtInterval(interval: Duration): IStream.Many<HistoryIndexNode>
49+
abstract fun splitAtInterval(interval: Duration, timeRangeFilter: ClosedRange<Instant>?): IStream.Many<HistoryIndexNode>
5050

5151
fun concat(
5252
self: Object<HistoryIndexNode>,
@@ -213,8 +213,8 @@ data class HistoryIndexLeafNode(
213213
}
214214
}
215215

216-
override fun splitAtInterval(interval: Duration): IStream.Many<HistoryIndexNode> {
217-
TODO("Not yet implemented")
216+
override fun splitAtInterval(interval: Duration, timeRangeFilter: ClosedRange<Instant>?): IStream.Many<HistoryIndexNode> {
217+
return if (timeRangeFilter == null || timeRangeFilter.contains(time)) IStream.of(this) else IStream.empty()
218218
}
219219
}
220220

@@ -335,8 +335,18 @@ data class HistoryIndexRangeNode(
335335
}.flatten()
336336
}
337337

338-
override fun splitAtInterval(interval: Duration): IStream.Many<HistoryIndexNode> {
339-
TODO("Not yet implemented")
338+
override fun splitAtInterval(interval: Duration, timeRangeFilter: ClosedRange<Instant>?): IStream.Many<HistoryIndexNode> {
339+
if (timeRangeFilter != null && !timeRangeFilter.intersects(timeRange)) return IStream.empty()
340+
341+
val intervalIndex1 = timeRange.start.epochSeconds / interval.inWholeSeconds
342+
val intervalIndex2 = timeRange.endInclusive.epochSeconds / interval.inWholeSeconds
343+
if (intervalIndex1 == intervalIndex2) {
344+
return IStream.of(this)
345+
} else {
346+
return IStream.of(child2, child1)
347+
.flatMapOrdered { it.resolve() }
348+
.flatMapOrdered { it.data.splitAtInterval(interval, timeRangeFilter) }
349+
}
340350
}
341351

342352
/**

model-datastructure/src/commonMain/kotlin/org/modelix/model/IVersion.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ interface IVersion {
3131
fun tryGetParentVersions(): List<VersionAndHash>
3232

3333
fun getTimestamp(): Instant?
34+
fun getAuthor(): String?
3435

3536
companion object {
3637
fun builder(): VersionBuilder = VersionBuilder()

model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class CLVersion(val obj: Object<CPVersion>) : IVersion {
6161

6262
val data: CPVersion get() = resolvedData.data
6363

64+
@get:JvmName("get_author")
65+
@Deprecated("Use getAuthor()", ReplaceWith("getAuthor()"))
6466
val author: String?
6567
get() = data.author
6668

@@ -83,6 +85,8 @@ class CLVersion(val obj: Object<CPVersion>) : IVersion {
8385
return null
8486
}
8587

88+
override fun getAuthor(): String? = data.author
89+
8690
@Deprecated("Use getObjectHash()", ReplaceWith("getObjectHash()"))
8791
val hash: String
8892
get() = getContentHash()
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package org.modelix.model.server
2+
3+
import io.ktor.server.testing.ApplicationTestBuilder
4+
import io.ktor.server.testing.testApplication
5+
import mu.KotlinLogging
6+
import org.modelix.model.IVersion
7+
import org.modelix.model.client2.HistoryInterval
8+
import org.modelix.model.client2.IModelClientV2
9+
import org.modelix.model.historyAsSequence
10+
import org.modelix.model.lazy.RepositoryId
11+
import org.modelix.model.server.api.RepositoryConfig
12+
import org.modelix.model.server.handlers.IdsApiImpl
13+
import org.modelix.model.server.handlers.ModelReplicationServer
14+
import org.modelix.model.server.handlers.RepositoriesManager
15+
import org.modelix.model.server.store.InMemoryStoreClient
16+
import kotlin.random.Random
17+
import kotlin.test.Test
18+
import kotlin.test.assertEquals
19+
import kotlin.time.Duration.Companion.seconds
20+
21+
private val LOG = KotlinLogging.logger { }
22+
23+
class HistoryIndexIntervalTest {
24+
25+
private lateinit var statistics: StoreClientWithStatistics
26+
private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication {
27+
application {
28+
try {
29+
installDefaultServerPlugins()
30+
statistics = StoreClientWithStatistics(InMemoryStoreClient())
31+
val repoManager = RepositoriesManager(statistics)
32+
ModelReplicationServer(repoManager).init(this)
33+
IdsApiImpl(repoManager).init(this)
34+
} catch (ex: Throwable) {
35+
LOG.error("", ex)
36+
}
37+
}
38+
block()
39+
}
40+
41+
@Test fun interval_0_201_1() = runIntervalTest(0, 201, 1)
42+
43+
@Test fun interval_0_201_10() = runIntervalTest(0, 201, 10)
44+
45+
@Test fun interval_0_201_20() = runIntervalTest(0, 201, 20)
46+
47+
@Test fun interval_0_201_50() = runIntervalTest(0, 201, 50)
48+
49+
@Test fun interval_0_201_100() = runIntervalTest(0, 201, 100)
50+
51+
@Test fun interval_0_201_150() = runIntervalTest(0, 201, 150)
52+
53+
@Test fun interval_0_201_200() = runIntervalTest(0, 201, 200)
54+
55+
@Test fun interval_0_201_500() = runIntervalTest(0, 201, 500)
56+
57+
@Test fun interval_0_1_20() = runIntervalTest(0, 1, 20)
58+
59+
@Test fun interval_0_2_20() = runIntervalTest(0, 2, 20)
60+
61+
@Test fun interval_0_3_20() = runIntervalTest(0, 3, 20)
62+
63+
@Test fun interval_0_4_20() = runIntervalTest(0, 4, 20)
64+
65+
@Test fun interval_1_4_20() = runIntervalTest(1, 4, 20)
66+
67+
@Test fun interval_2_4_20() = runIntervalTest(2, 4, 20)
68+
69+
@Test fun interval_3_4_20() = runIntervalTest(3, 4, 20)
70+
71+
@Test fun interval_4_4_20() = runIntervalTest(4, 4, 20)
72+
73+
@Test fun interval_5_4_20() = runIntervalTest(5, 4, 20)
74+
75+
private fun runIntervalTest(skip: Int, limit: Int, intervalSeconds: Int) = runTest {
76+
val rand = Random(8923345)
77+
val modelClient: IModelClientV2 = createModelClient()
78+
val repositoryId = RepositoryId("repo1")
79+
val branchRef = repositoryId.getBranchReference()
80+
val initialVersion = modelClient.initRepository(RepositoryConfig(repositoryId = repositoryId.id, repositoryName = repositoryId.id, modelId = "61bd6cb0-33ff-45d8-9d1b-2149fdb01d16"))
81+
var currentVersion = initialVersion
82+
83+
repeat(100) {
84+
run {
85+
val newVersion = IVersion.builder()
86+
.baseVersion(currentVersion)
87+
.tree(currentVersion.getModelTree())
88+
.author("user1")
89+
.time(currentVersion.getTimestamp()!! + rand.nextInt(0, 3).seconds)
90+
.build()
91+
currentVersion = modelClient.push(branchRef, newVersion, currentVersion)
92+
}
93+
run {
94+
val newVersion = IVersion.builder()
95+
.baseVersion(currentVersion)
96+
.tree(currentVersion.getModelTree())
97+
.author("user2")
98+
.time(currentVersion.getTimestamp()!! + rand.nextInt(0, 3).seconds)
99+
.build()
100+
currentVersion = modelClient.push(branchRef, newVersion, currentVersion)
101+
}
102+
}
103+
104+
val expectedHistory = currentVersion.historyAsSequence().toList().reversed()
105+
106+
val expectedIntervals = expectedHistory
107+
.groupBy { it.getTimestamp()!!.epochSeconds / intervalSeconds }
108+
.map { entry ->
109+
val versions = entry.value
110+
HistoryInterval(
111+
firstVersionHash = versions.first().getObjectHash(),
112+
lastVersionHash = versions.last().getObjectHash(),
113+
size = versions.size.toLong(),
114+
minTime = versions.minOf { it.getTimestamp()!! },
115+
maxTime = versions.maxOf { it.getTimestamp()!! },
116+
authors = versions.mapNotNull { it.getAuthor() }.toSet(),
117+
)
118+
}
119+
.reversed()
120+
.drop(skip)
121+
.take(limit)
122+
123+
val timeRange = (expectedIntervals.minOf { it.minTime })..(expectedIntervals.maxOf { it.maxTime })
124+
val history = modelClient.getHistoryIntervals(
125+
repositoryId = repositoryId,
126+
headVersion = currentVersion.getObjectHash(),
127+
timeRange = timeRange,
128+
interval = intervalSeconds.seconds,
129+
)
130+
assertEquals(expectedIntervals, history)
131+
}
132+
}

model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexTest.kt renamed to model-server/src/test/kotlin/org/modelix/model/server/HistoryIndexPaginationTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import kotlin.time.Duration.Companion.seconds
1919

2020
private val LOG = KotlinLogging.logger { }
2121

22-
class HistoryIndexTest {
22+
class HistoryIndexPaginationTest {
2323

2424
private lateinit var statistics: StoreClientWithStatistics
2525
private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication {

0 commit comments

Comments
 (0)