@@ -37,7 +37,9 @@ import kotlinx.coroutines.flow.flow
37
37
import kotlinx.coroutines.flow.map
38
38
import kotlinx.coroutines.launch
39
39
import mu.KotlinLogging
40
+ import org.modelix.datastructures.model.HistoryIndexNode
40
41
import org.modelix.datastructures.objects.IObjectGraph
42
+ import org.modelix.datastructures.objects.Object
41
43
import org.modelix.datastructures.objects.ObjectHash
42
44
import org.modelix.kotlin.utils.DeprecationInfo
43
45
import org.modelix.kotlin.utils.WeakValueMap
@@ -89,6 +91,8 @@ import org.modelix.model.server.api.v2.toMap
89
91
import org.modelix.modelql.client.ModelQLClient
90
92
import org.modelix.modelql.core.IMonoStep
91
93
import org.modelix.streams.IExecutableStream
94
+ import org.modelix.streams.getSuspending
95
+ import org.modelix.streams.iterateSuspending
92
96
import kotlin.time.Duration
93
97
import kotlin.time.Duration.Companion.seconds
94
98
@@ -328,6 +332,106 @@ class ModelClientV2(
328
332
}
329
333
}
330
334
335
+ override suspend fun getHistory (
336
+ repositoryId : RepositoryId ,
337
+ headVersion : ObjectHash ,
338
+ skip : Int ,
339
+ limit : Int ,
340
+ interval : Duration ? ,
341
+ ): HistoryResponse {
342
+ val index: Object <HistoryIndexNode > = getHistoryIndex(repositoryId, headVersion)
343
+ val entries = if (interval != null ) {
344
+ val mergedEntries = ArrayList <HistoryEntry >()
345
+ var previousIntervalId: Long = Long .MAX_VALUE
346
+ try {
347
+ index.data.splitAtInterval(interval).iterateSuspending(index.graph) {
348
+ val intervalId = it.maxTime.toEpochMilliseconds() / interval.inWholeMilliseconds
349
+ require(intervalId <= previousIntervalId)
350
+ if (intervalId == previousIntervalId) {
351
+ val entry = mergedEntries[mergedEntries.lastIndex]
352
+ mergedEntries[mergedEntries.lastIndex] = HistoryEntry (
353
+ firstVersionHash = it.firstVersion.getHash(),
354
+ lastVersionHash = entry.lastVersionHash,
355
+ minTime = it.minTime.epochSeconds,
356
+ maxTime = entry.maxTime,
357
+ authors = entry.authors + it.authors,
358
+ )
359
+ } else {
360
+ if (mergedEntries.size >= limit) throw LimitedReached ()
361
+ previousIntervalId = intervalId
362
+ mergedEntries + = HistoryEntry (
363
+ firstVersionHash = it.firstVersion.getHash(),
364
+ lastVersionHash = it.lastVersion.getHash(),
365
+ minTime = it.minTime.epochSeconds,
366
+ maxTime = it.maxTime.epochSeconds,
367
+ authors = it.authors,
368
+ )
369
+ }
370
+ }
371
+ } catch (ex: LimitedReached ) {
372
+ // Expected exception used for exiting the iterateSuspending call
373
+ }
374
+ mergedEntries
375
+ } else {
376
+ index.data.getAllVersionsReversed().flatMap { it.resolve() }.map { CLVersion (it) }
377
+ .map {
378
+ val hash = it.getObjectHash()
379
+ val time = it.getTimestamp()?.epochSeconds
380
+ HistoryEntry (
381
+ firstVersionHash = hash,
382
+ lastVersionHash = hash,
383
+ minTime = time,
384
+ maxTime = time,
385
+ authors = setOfNotNull(it.author),
386
+ )
387
+ }
388
+ .take(limit)
389
+ .toList()
390
+ .getSuspending(index.graph)
391
+ }
392
+ return HistoryResponse (entries = entries, nextVersions = emptyList())
393
+ }
394
+
395
+ override suspend fun getHistoryRange (
396
+ repositoryId : RepositoryId ,
397
+ headVersion : ObjectHash ,
398
+ skip : Long ,
399
+ limit : Long ,
400
+ ): List <IVersion > {
401
+ val index: Object <HistoryIndexNode > = getHistoryIndex(repositoryId, headVersion)
402
+ return index.data.getRange(skip until (limit + skip))
403
+ .flatMap { it.getAllVersionsReversed() }
404
+ .flatMap { it.resolve() }
405
+ .map { CLVersion (it) }
406
+ .toList()
407
+ .getSuspending(index.graph)
408
+ }
409
+
410
+ suspend fun getHistoryIndex (
411
+ repositoryId : RepositoryId ? ,
412
+ versionHash : ObjectHash ,
413
+ ): Object <HistoryIndexNode > {
414
+ return httpClient.prepareGet {
415
+ url {
416
+ takeFrom(baseUrl)
417
+ if (repositoryId == null ) {
418
+ appendPathSegments(" versions" , versionHash.toString())
419
+ } else {
420
+ appendPathSegments(" repositories" , repositoryId.id, " versions" , versionHash.toString())
421
+ }
422
+ appendPathSegments(" history-index" )
423
+ }
424
+ }.execute { response ->
425
+ val graph = getObjectGraph(repositoryId).also { it.config = it.config.copy(lazyLoadingEnabled = true ) }
426
+ val text = response.bodyAsText()
427
+ val hashString = text.substringBefore(' \n ' )
428
+ val serialized = text.substringAfter(' \n ' )
429
+ val deserialized = HistoryIndexNode .deserialize(serialized, graph)
430
+ val ref = graph.fromDeserialized(ObjectHash (hashString), deserialized)
431
+ Object (deserialized, ref)
432
+ }
433
+ }
434
+
331
435
override suspend fun loadVersion (
332
436
repositoryId : RepositoryId ,
333
437
versionHash : String ,
@@ -968,3 +1072,5 @@ fun IVersion.runWrite(idGenerator: IIdGenerator, author: String?, body: (IBranch
968
1072
}
969
1073
970
1074
private fun String.ensureSuffix (suffix : String ) = if (endsWith(suffix)) this else this + suffix
1075
+
1076
+ private class LimitedReached : RuntimeException (" limited reached" )
0 commit comments