Skip to content

Commit 88bd1a8

Browse files
committed
perf(model-datastructure): split large lists of operations into smaller key-value entries
Running the model synchronizer can create millions of changes. Handling such a large entry causes performance and memory issues everywhere. Large lists are now split into a tree structure with limited node size.
1 parent ef6a16e commit 88bd1a8

File tree

5 files changed

+110
-51
lines changed

5 files changed

+110
-51
lines changed

model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package org.modelix.model.lazy
33
import com.badoo.reaktive.maybe.Maybe
44
import com.badoo.reaktive.maybe.map
55
import com.badoo.reaktive.observable.flatMapSingle
6+
import com.badoo.reaktive.observable.toList
67
import com.badoo.reaktive.single.map
78
import com.badoo.reaktive.single.notNull
89
import com.badoo.reaktive.single.singleOf
@@ -29,12 +30,13 @@ import org.modelix.model.operations.IOperation
2930
import org.modelix.model.operations.OTBranch
3031
import org.modelix.model.operations.SetReferenceOp
3132
import org.modelix.model.persistent.CPNode
32-
import org.modelix.model.persistent.CPOperationsList
3333
import org.modelix.model.persistent.CPTree
3434
import org.modelix.model.persistent.CPVersion
3535
import org.modelix.model.persistent.EntryAddedEvent
3636
import org.modelix.model.persistent.EntryChangedEvent
3737
import org.modelix.model.persistent.EntryRemovedEvent
38+
import org.modelix.model.persistent.OperationsList
39+
import org.modelix.streams.getSynchronous
3840
import org.modelix.streams.iterateSynchronous
3941
import kotlin.jvm.JvmName
4042

@@ -61,7 +63,7 @@ class CLVersion : IVersion {
6163
this.asyncStore = store
6264
this.store = store.getLegacyObjectStore()
6365
this.treeHash = KVEntryReference(treeData)
64-
val localizedOps = localizeOps(operations.asList()).toTypedArray()
66+
val localizedOps = localizeOps(operations.asList())
6567
if (localizedOps.size <= INLINED_OPS_LIMIT) {
6668
data = CPVersion(
6769
id = id,
@@ -73,12 +75,12 @@ class CLVersion : IVersion {
7375
baseVersion = baseVersion?.let { KVEntryReference(it.data!!) },
7476
mergedVersion1 = mergedVersion1?.let { KVEntryReference(it.data!!) },
7577
mergedVersion2 = mergedVersion2?.let { KVEntryReference(it.data!!) },
76-
operations = localizedOps,
78+
operations = localizedOps.toTypedArray(),
7779
operationsHash = null,
7880
numberOfOperations = localizedOps.size,
7981
)
8082
} else {
81-
val opsList = CPOperationsList(localizedOps)
83+
val opsList = OperationsList.of(localizedOps.toList())
8284
data = CPVersion(
8385
id = id,
8486
time = time,
@@ -157,8 +159,10 @@ class CLVersion : IVersion {
157159
val operations: Iterable<IOperation>
158160
get() {
159161
val operationsHash = data!!.operationsHash
160-
val ops = operationsHash?.getValue(store)?.operations ?: data!!.operations
161-
return globalizeOps((ops ?: arrayOf()).toList())
162+
val ops = operationsHash?.getValue(store)?.getOperations(asyncStore)?.toList()?.getSynchronous()
163+
?: data!!.operations?.toList()
164+
?: emptyList()
165+
return globalizeOps(ops)
162166
}
163167

164168
val numberOfOperations: Int

model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPOperationsList.kt

Lines changed: 0 additions & 40 deletions
This file was deleted.

model-datastructure/src/commonMain/kotlin/org/modelix/model/persistent/CPVersion.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class CPVersion(
1919
mergedVersion1: KVEntryReference<CPVersion>?, // null if this is not a merge
2020
mergedVersion2: KVEntryReference<CPVersion>?, // null if this is not a merge
2121
operations: Array<IOperation>?,
22-
operationsHash: KVEntryReference<CPOperationsList>?,
22+
operationsHash: KVEntryReference<OperationsList>?,
2323
numberOfOperations: Int,
2424
) : IKVValue {
2525
private val logger = mu.KotlinLogging.logger {}
@@ -42,7 +42,7 @@ class CPVersion(
4242
val mergedVersion2: KVEntryReference<CPVersion>?
4343

4444
val operations: Array<IOperation>?
45-
val operationsHash: KVEntryReference<CPOperationsList>?
45+
val operationsHash: KVEntryReference<OperationsList>?
4646
val numberOfOperations: Int
4747
override fun serialize(): String {
4848
val opsPart: String = operationsHash?.getHash()
@@ -107,7 +107,7 @@ class CPVersion(
107107
mergedVersion1 = emptyStringAsNull(parts[5])?.let { KVEntryReference(it, DESERIALIZER) },
108108
mergedVersion2 = emptyStringAsNull(parts[6])?.let { KVEntryReference(it, DESERIALIZER) },
109109
operations = ops,
110-
operationsHash = opsHash?.let { KVEntryReference(it, CPOperationsList.DESERIALIZER) },
110+
operationsHash = opsHash?.let { KVEntryReference(it, OperationsList.DESERIALIZER) },
111111
numberOfOperations = parts[7].toInt(),
112112
)
113113
data.isWritten = true
@@ -135,7 +135,7 @@ class CPVersion(
135135
mergedVersion1 = null,
136136
mergedVersion2 = null,
137137
ops,
138-
opsHash?.let { KVEntryReference(it, CPOperationsList.DESERIALIZER) },
138+
opsHash?.let { KVEntryReference(it, OperationsList.DESERIALIZER) },
139139
numOps,
140140
)
141141
data.isWritten = true
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package org.modelix.model.persistent
2+
3+
import com.badoo.reaktive.observable.Observable
4+
import com.badoo.reaktive.observable.flatMap
5+
import com.badoo.reaktive.single.flatMap
6+
import com.badoo.reaktive.single.flatMapObservable
7+
import org.modelix.model.async.IAsyncObjectStore
8+
import org.modelix.model.lazy.KVEntryReference
9+
import org.modelix.model.operations.IOperation
10+
import org.modelix.streams.asObservable
11+
12+
abstract class OperationsList() : IKVValue {
13+
companion object {
14+
val DESERIALIZER: (String) -> OperationsList = { deserialize(it) }
15+
private const val MAX_LIST_SIZE = 20
16+
private const val LARGE_LIST_PREFIX = "OL" + Separators.LEVEL1
17+
fun deserialize(input: String): OperationsList {
18+
val data = if (input.startsWith(LARGE_LIST_PREFIX)) {
19+
val subLists = input.substring(LARGE_LIST_PREFIX.length)
20+
.split(Separators.LEVEL2)
21+
.map { KVEntryReference(it, DESERIALIZER) }
22+
.toTypedArray()
23+
LargeOperationsList(subLists)
24+
} else {
25+
SmallOperationsList(
26+
input.split(Separators.LEVEL2)
27+
.filter { it.isNotEmpty() }
28+
.map { OperationSerializer.INSTANCE.deserialize(it) }
29+
.toTypedArray(),
30+
)
31+
}
32+
data.isWritten = true
33+
return data
34+
}
35+
36+
fun of(operations: List<IOperation>): OperationsList {
37+
return if (operations.size <= MAX_LIST_SIZE) {
38+
SmallOperationsList(operations.toTypedArray())
39+
} else {
40+
// split the operations into at most MAX_LIST_SIZE sub lists
41+
val sublistSizes = ((operations.size + MAX_LIST_SIZE - 1) / MAX_LIST_SIZE).coerceAtLeast(MAX_LIST_SIZE)
42+
LargeOperationsList(operations.chunked(sublistSizes) { KVEntryReference(of(it)) }.toTypedArray())
43+
}
44+
}
45+
}
46+
47+
abstract fun getOperations(store: IAsyncObjectStore): Observable<IOperation>
48+
}
49+
50+
class LargeOperationsList(val subLists: Array<out KVEntryReference<OperationsList>>) : OperationsList() {
51+
override var isWritten: Boolean = false
52+
53+
override fun serialize(): String {
54+
return "OL" + Separators.LEVEL1 + subLists.joinToString(Separators.LEVEL2) { it.getHash() }
55+
}
56+
57+
override val hash: String by lazy(LazyThreadSafetyMode.PUBLICATION) { HashUtil.sha256(serialize()) }
58+
59+
override fun getDeserializer(): (String) -> IKVValue = DESERIALIZER
60+
61+
override fun getReferencedEntries(): List<KVEntryReference<IKVValue>> {
62+
return subLists.toList()
63+
}
64+
65+
override fun getOperations(store: IAsyncObjectStore): Observable<IOperation> {
66+
return subLists.asObservable().flatMap {
67+
it.getValue(store).flatMapObservable { it.getOperations(store) }
68+
}
69+
}
70+
}
71+
72+
class SmallOperationsList(val operations: Array<out IOperation>) : OperationsList() {
73+
override var isWritten: Boolean = false
74+
75+
override fun serialize(): String {
76+
return if (operations.isEmpty()) {
77+
""
78+
} else {
79+
operations
80+
.joinToString(Separators.LEVEL2) { OperationSerializer.INSTANCE.serialize(it) }
81+
}
82+
}
83+
84+
override val hash: String by lazy(LazyThreadSafetyMode.PUBLICATION) { HashUtil.sha256(serialize()) }
85+
86+
override fun getDeserializer(): (String) -> IKVValue = DESERIALIZER
87+
88+
override fun getReferencedEntries(): List<KVEntryReference<IKVValue>> {
89+
return operations.map { it.getReferencedEntries() }.flatten()
90+
}
91+
92+
override fun getOperations(store: IAsyncObjectStore): Observable<IOperation> {
93+
return operations.asObservable()
94+
}
95+
}

model-datastructure/src/commonTest/kotlin/TreeSerializationTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class TreeSerializationTest {
5757
fun serializeAndDeserialize_AddNewChildSubtreeOp() {
5858
// the hash only ensures that JVM and JS produce the same serialized data
5959
// it can just be updated if the test fails
60-
serializeAndDeserialize(true, "I93pF*lmKS7ZoTHfi2dOugdeIKLGqyjmzIPc1mVUTkJ0")
60+
serializeAndDeserialize(true, "vjaVe*F7LtoakPasO0rAC3O3N47kLqq9c6iW3A8Ghv4s")
6161
}
6262

6363
fun serializeAndDeserialize(moreThan10ops: Boolean, expectedVersionHash: String) {

0 commit comments

Comments
 (0)