@@ -18,6 +18,7 @@ import org.modelix.model.persistent.CPVersion
18
18
import org.modelix.model.persistent.Separators
19
19
import org.modelix.streams.IStream
20
20
import org.modelix.streams.flatten
21
+ import org.modelix.streams.getBlocking
21
22
import org.modelix.streams.plus
22
23
import kotlin.math.abs
23
24
import kotlin.math.max
@@ -38,7 +39,7 @@ sealed class HistoryIndexNode : IObjectData {
38
39
abstract fun getAllVersionsReversed (): IStream .Many <ObjectReference <CPVersion >>
39
40
abstract fun getRange (indexRange : LongRange ): IStream .Many <HistoryIndexNode >
40
41
fun getRange (indexRange : IntRange ) = getRange(indexRange.first.toLong().. indexRange.last.toLong())
41
- abstract fun merge (self : Object <HistoryIndexNode >, otherObj : Object <HistoryIndexNode >): Object <HistoryIndexNode >
42
+ abstract fun merge (self : Object <HistoryIndexNode >, otherObj : Object <HistoryIndexNode >): IStream . One < Object <HistoryIndexNode > >
42
43
43
44
/* *
44
45
* Each returned node spans at most the duration specified in [interval].
@@ -120,12 +121,13 @@ sealed class HistoryIndexNode : IObjectData {
120
121
}
121
122
122
123
fun of (version1 : Object <CPVersion >, version2 : Object <CPVersion >): HistoryIndexNode {
123
- return of(version1).asObject(version1.graph).merge(of(version2).asObject(version2.graph)).data
124
+ return of(version1).asObject(version1.graph).merge(of(version2).asObject(version2.graph)).getBlocking(version1.graph). data
124
125
}
125
126
}
126
127
}
127
128
128
- fun Object<HistoryIndexNode>.merge (otherObj : Object <HistoryIndexNode >): Object <HistoryIndexNode > = data.merge(this , otherObj)
129
+ fun Object<HistoryIndexNode>.merge (otherObj : Object <HistoryIndexNode >) = data.merge(this , otherObj)
130
+ fun IStream.One<Object<HistoryIndexNode>>.merge (otherObj : Object <HistoryIndexNode >) = flatMapOne { it.merge(otherObj) }
129
131
fun Object<HistoryIndexNode>.concatUnbalanced (otherObj : Object <HistoryIndexNode >): Object <HistoryIndexNode > = data.concat(this , otherObj)
130
132
val Object <HistoryIndexLeafNode >.time get() = data.time
131
133
@@ -191,7 +193,7 @@ data class HistoryIndexLeafNode(
191
193
override fun merge (
192
194
self : Object <HistoryIndexNode >,
193
195
otherObj : Object <HistoryIndexNode >,
194
- ): Object <HistoryIndexNode > {
196
+ ): IStream . One < Object <HistoryIndexNode > > {
195
197
val other = otherObj.data
196
198
return when (other) {
197
199
is HistoryIndexLeafNode -> {
@@ -202,7 +204,7 @@ data class HistoryIndexLeafNode(
202
204
versions = (versions.associateBy { it.getHash() } + other.versions.associateBy { it.getHash() }).values.toList(),
203
205
authors = authors + other.authors,
204
206
time = time,
205
- ).asObject(self.graph)
207
+ ).asObject(self.graph). let { IStream .of(it) }
206
208
}
207
209
}
208
210
is HistoryIndexRangeNode -> {
@@ -275,62 +277,62 @@ data class HistoryIndexRangeNode(
275
277
TODO (" Not yet implemented" )
276
278
}
277
279
278
- override fun merge (self : Object <HistoryIndexNode >, otherObj : Object <HistoryIndexNode >): Object <HistoryIndexNode > {
280
+ override fun merge (self : Object <HistoryIndexNode >, otherObj : Object <HistoryIndexNode >): IStream . One < Object <HistoryIndexNode > > {
279
281
val self = self as Object <HistoryIndexRangeNode >
280
282
val other = otherObj.data
281
- val resolvedChild1 = child1.resolveNow()
282
- val resolvedChild2 = child2.resolveNow()
283
- when (other) {
284
- is HistoryIndexLeafNode -> {
285
- val range1 = resolvedChild1.data.timeRange
286
- val range2 = resolvedChild2.data.timeRange
287
- return when {
288
- other.time < range1.start -> otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2)
289
- other.time <= range1.endInclusive -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2)
290
- other.time < range2.start -> if (resolvedChild1.size <= resolvedChild2.size) {
291
- resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2)
292
- } else {
293
- resolvedChild1.concatBalanced(otherObj.concatBalanced(resolvedChild2))
294
- }
295
- other.time <= range2.endInclusive -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj))
296
- else -> resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj))
297
- }
298
- }
299
- is HistoryIndexRangeNode -> {
300
- val range1 = resolvedChild1.data.timeRange
301
- val range2 = resolvedChild2.data.timeRange
302
- val intersects1 = other.timeRange.intersects(range1)
303
- val intersects2 = other.timeRange.intersects(range2)
304
- return when {
305
- intersects1 && intersects2 -> {
306
- resolvedChild1.merge(otherObj).merge(resolvedChild2)
307
- }
308
- intersects1 -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2)
309
- intersects2 -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj))
310
- other.maxTime < range1.start -> {
311
- if (other.size < resolvedChild2.size) {
312
- otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2)
283
+ return child1.requestBoth(child2) { resolvedChild1, resolvedChild2 ->
284
+ when (other) {
285
+ is HistoryIndexLeafNode -> {
286
+ val range1 = resolvedChild1.data.timeRange
287
+ val range2 = resolvedChild2.data.timeRange
288
+ when {
289
+ other.time < range1.start -> otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2)
290
+ other.time <= range1.endInclusive -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2)
291
+ other.time < range2.start -> if (resolvedChild1.size <= resolvedChild2.size) {
292
+ resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2)
313
293
} else {
314
- otherObj.concatBalanced(self)
315
- }
316
- }
317
- other.maxTime < range2.start -> {
318
- if (resolvedChild2.size < resolvedChild1.size) {
319
294
resolvedChild1.concatBalanced(otherObj.concatBalanced(resolvedChild2))
320
- } else {
321
- resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2)
322
295
}
296
+ other.time <= range2.endInclusive -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj))
297
+ else -> resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj))
323
298
}
324
- else -> {
325
- if (other.size < resolvedChild1.size) {
326
- resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj))
327
- } else {
328
- self.concatBalanced(otherObj)
299
+ }
300
+ is HistoryIndexRangeNode -> {
301
+ val range1 = resolvedChild1.data.timeRange
302
+ val range2 = resolvedChild2.data.timeRange
303
+ val intersects1 = other.timeRange.intersects(range1)
304
+ val intersects2 = other.timeRange.intersects(range2)
305
+ when {
306
+ intersects1 && intersects2 -> {
307
+ resolvedChild1.merge(otherObj).merge(resolvedChild2)
308
+ }
309
+ intersects1 -> resolvedChild1.merge(otherObj).concatBalanced(resolvedChild2)
310
+ intersects2 -> resolvedChild1.concatBalanced(resolvedChild2.merge(otherObj))
311
+ other.maxTime < range1.start -> {
312
+ if (other.size < resolvedChild2.size) {
313
+ otherObj.concatBalanced(resolvedChild1).concatBalanced(resolvedChild2)
314
+ } else {
315
+ otherObj.concatBalanced(self)
316
+ }
317
+ }
318
+ other.maxTime < range2.start -> {
319
+ if (resolvedChild2.size < resolvedChild1.size) {
320
+ resolvedChild1.concatBalanced(otherObj.concatBalanced(resolvedChild2))
321
+ } else {
322
+ resolvedChild1.concatBalanced(otherObj).concatBalanced(resolvedChild2)
323
+ }
324
+ }
325
+ else -> {
326
+ if (other.size < resolvedChild1.size) {
327
+ resolvedChild1.concatBalanced(resolvedChild2.concatBalanced(otherObj))
328
+ } else {
329
+ self.concatBalanced(otherObj)
330
+ }
329
331
}
330
332
}
331
333
}
332
334
}
333
- }
335
+ }.flatten()
334
336
}
335
337
336
338
override fun splitAtInterval (interval : Duration ): IStream .Many <HistoryIndexNode > {
@@ -397,31 +399,42 @@ fun LongRange.intersect(other: LongRange): LongRange {
397
399
return if (this .first > other.first) other.intersect(this ) else other.first.. min(this .last, other.last)
398
400
}
399
401
400
- fun Object<HistoryIndexNode>.rebalance (otherObj : Object <HistoryIndexNode >): Pair <Object <HistoryIndexNode >, Object<HistoryIndexNode>> {
401
- if (otherObj.height > height + 1 ) {
402
- val split1 = (otherObj.data as HistoryIndexRangeNode ).child1.resolveNow()
403
- val split2 = (otherObj.data as HistoryIndexRangeNode ).child2.resolveNow()
404
- val rebalanced = this .rebalance(split1)
405
- if (rebalanced.first.height <= split2.height) {
406
- return rebalanced.first.concatUnbalanced(rebalanced.second) to split2
407
- } else {
408
- return rebalanced.first to rebalanced.second.concatUnbalanced(split2)
409
- }
402
+ private fun Object<HistoryIndexNode>.rebalance (otherObj : Object <HistoryIndexNode >): IStream .One <Pair <Object <HistoryIndexNode >, Object<HistoryIndexNode>>> {
403
+ return if (otherObj.height > height + 1 ) {
404
+ (otherObj.data as HistoryIndexRangeNode ).child1.requestBoth((otherObj.data as HistoryIndexRangeNode ).child2) { split1, split2 ->
405
+ this .rebalance(split1).map { rebalanced ->
406
+ if (rebalanced.first.height <= split2.height) {
407
+ rebalanced.first.concatUnbalanced(rebalanced.second) to split2
408
+ } else {
409
+ rebalanced.first to rebalanced.second.concatUnbalanced(split2)
410
+ }
411
+ }
412
+ }.flatten()
410
413
} else if (height > otherObj.height + 1 ) {
411
- val split1 = (this .data as HistoryIndexRangeNode ).child1.resolveNow()
412
- val split2 = (this .data as HistoryIndexRangeNode ).child2.resolveNow()
413
- val rebalanced = split2.rebalance(otherObj)
414
- if (rebalanced.second.height > split1.height) {
415
- return split1.concatUnbalanced(rebalanced.first) to rebalanced.second
416
- } else {
417
- return split1 to rebalanced.first.concatUnbalanced(rebalanced.second)
418
- }
414
+ (this .data as HistoryIndexRangeNode ).child1.requestBoth((this .data as HistoryIndexRangeNode ).child2) { split1, split2 ->
415
+ split2.rebalance(otherObj).map { rebalanced ->
416
+ if (rebalanced.second.height > split1.height) {
417
+ split1.concatUnbalanced(rebalanced.first) to rebalanced.second
418
+ } else {
419
+ split1 to rebalanced.first.concatUnbalanced(rebalanced.second)
420
+ }
421
+ }
422
+ }.flatten()
419
423
} else {
420
- return this to otherObj
424
+ IStream .of(this to otherObj)
425
+ }
426
+ }
427
+
428
+ private fun Object<HistoryIndexNode>.concatBalanced (otherObj : Object <HistoryIndexNode >): IStream .One <Object <HistoryIndexNode >> {
429
+ return this .rebalance(otherObj).map { rebalanced ->
430
+ rebalanced.first.concatUnbalanced(rebalanced.second)
421
431
}
422
432
}
423
433
424
- fun Object<HistoryIndexNode>.concatBalanced (otherObj : Object <HistoryIndexNode >): Object <HistoryIndexNode > {
425
- val rebalanced = this .rebalance(otherObj)
426
- return rebalanced.first.concatUnbalanced(rebalanced.second)
434
+ private fun IStream.One<Object<HistoryIndexNode>>.concatBalanced (otherObj : Object <HistoryIndexNode >): IStream .One <Object <HistoryIndexNode >> {
435
+ return flatMapOne { it.concatBalanced(otherObj) }
436
+ }
437
+
438
+ private fun Object<HistoryIndexNode>.concatBalanced (otherObj : IStream .One <Object <HistoryIndexNode >>): IStream .One <Object <HistoryIndexNode >> {
439
+ return otherObj.flatMapOne { this .concatBalanced(it) }
427
440
}
0 commit comments