Skip to content

Commit 96173f8

Browse files
committed
perf: efficient computation of delta between versions
1 parent 0a558b7 commit 96173f8

File tree

12 files changed

+417
-125
lines changed

12 files changed

+417
-125
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package org.modelix.model.client2
22

3+
import com.badoo.reaktive.coroutinesinterop.asFlow
4+
import com.badoo.reaktive.observable.Observable
5+
import com.badoo.reaktive.observable.map
6+
import com.badoo.reaktive.observable.toMap
37
import io.ktor.client.HttpClient
48
import io.ktor.client.HttpClientConfig
59
import io.ktor.client.call.body
@@ -52,7 +56,7 @@ import org.modelix.model.lazy.CLVersion
5256
import org.modelix.model.lazy.IDeserializingKeyValueStore
5357
import org.modelix.model.lazy.ObjectStoreCache
5458
import org.modelix.model.lazy.RepositoryId
55-
import org.modelix.model.lazy.computeDelta
59+
import org.modelix.model.lazy.fullDiff
5660
import org.modelix.model.oauth.IAuthConfig
5761
import org.modelix.model.oauth.IAuthRequestHandler
5862
import org.modelix.model.oauth.ModelixAuthClient
@@ -73,6 +77,8 @@ import org.modelix.model.server.api.v2.VersionDeltaStreamV2
7377
import org.modelix.model.server.api.v2.asStream
7478
import org.modelix.modelql.client.ModelQLClient
7579
import org.modelix.modelql.core.IMonoStep
80+
import org.modelix.streams.asObservable
81+
import org.modelix.streams.getSuspending
7682
import kotlin.time.Duration
7783
import kotlin.time.Duration.Companion.seconds
7884

@@ -298,14 +304,13 @@ class ModelClientV2(
298304
require(version is CLVersion)
299305
require(baseVersion is CLVersion?)
300306
version.write()
301-
val objects = version.computeDelta(baseVersion)
302-
HashUtil.checkObjectHashes(objects)
303-
val delta = if (objects.size > 1000) {
307+
val objects = version.fullDiff(baseVersion)
308+
val delta = if (true /* objects.size > 1000 */) {
304309
// large HTTP requests and large Json objects don't scale well
305-
pushObjects(branch.repositoryId, objects.asSequence().map { it.key to it.value })
310+
pushObjects(branch.repositoryId, objects.map { it.hash to it.serialize() })
306311
VersionDelta(version.getContentHash(), null)
307312
} else {
308-
VersionDelta(version.getContentHash(), null, objectsMap = objects)
313+
VersionDelta(version.getContentHash(), null, objectsMap = objects.toMap({ it.hash }, { it.serialize() }).getSuspending())
309314
}
310315
return httpClient.preparePost {
311316
url {
@@ -321,9 +326,13 @@ class ModelClientV2(
321326
}
322327

323328
override suspend fun pushObjects(repository: RepositoryId, objects: Sequence<ObjectHashAndSerializedObject>) {
329+
pushObjects(repository, objects.asObservable())
330+
}
331+
332+
private suspend fun pushObjects(repository: RepositoryId, objects: Observable<ObjectHashAndSerializedObject>) {
324333
LOG.debug { "${clientId.toString(16)}.pushObjects($repository)" }
325-
val maxBodySize = 16 * 1024 * 1024
326-
val chunkContent = StringBuilder(1024 * 1024)
334+
val maxBodySize = 2 * 1024 * 1024
335+
val chunkContent = StringBuilder()
327336

328337
suspend fun sendChunk() {
329338
httpClient.put {
@@ -337,9 +346,7 @@ class ModelClientV2(
337346
chunkContent.clear()
338347
}
339348

340-
val itr = objects.iterator()
341-
while (itr.hasNext()) {
342-
val entry = itr.next()
349+
objects.asFlow().collect { entry ->
343350
val entrySize = (if (chunkContent.isEmpty()) 0 else 1) + entry.first.length + 1 + entry.second.length
344351
if (chunkContent.length + entrySize > maxBodySize) {
345352
sendChunk()

model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt

Lines changed: 87 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,26 @@ package org.modelix.model.lazy
22

33
import com.badoo.reaktive.maybe.Maybe
44
import com.badoo.reaktive.maybe.map
5+
import com.badoo.reaktive.observable.Observable
6+
import com.badoo.reaktive.observable.asObservable
7+
import com.badoo.reaktive.observable.concatWith
8+
import com.badoo.reaktive.observable.flatMap
59
import com.badoo.reaktive.observable.flatMapSingle
610
import com.badoo.reaktive.observable.toList
11+
import com.badoo.reaktive.single.flatMapObservable
12+
import com.badoo.reaktive.single.flatten
713
import com.badoo.reaktive.single.map
814
import com.badoo.reaktive.single.notNull
915
import com.badoo.reaktive.single.singleOf
16+
import com.badoo.reaktive.single.zipWith
1017
import kotlinx.datetime.Clock
1118
import kotlinx.datetime.Instant
1219
import kotlinx.datetime.TimeZone
1320
import kotlinx.datetime.toInstant
1421
import org.modelix.model.IKeyValueStore
1522
import org.modelix.model.IVersion
1623
import org.modelix.model.LinearHistory
24+
import org.modelix.model.VersionMerger
1725
import org.modelix.model.api.IIdGenerator
1826
import org.modelix.model.api.INodeReference
1927
import org.modelix.model.api.ITree
@@ -35,16 +43,17 @@ import org.modelix.model.persistent.CPVersion
3543
import org.modelix.model.persistent.EntryAddedEvent
3644
import org.modelix.model.persistent.EntryChangedEvent
3745
import org.modelix.model.persistent.EntryRemovedEvent
46+
import org.modelix.model.persistent.IKVValue
3847
import org.modelix.model.persistent.OperationsList
48+
import org.modelix.model.persistent.getAllObjects
3949
import org.modelix.streams.getSynchronous
4050
import org.modelix.streams.iterateSynchronous
4151
import kotlin.jvm.JvmName
4252

4353
class CLVersion : IVersion {
4454
val asyncStore: IAsyncObjectStore
4555
var store: IDeserializingKeyValueStore
46-
var data: CPVersion? = null
47-
private set
56+
val data: CPVersion
4857
val treeHash: KVEntryReference<CPTree>
4958

5059
private constructor(
@@ -70,11 +79,11 @@ class CLVersion : IVersion {
7079
time = time,
7180
author = author,
7281
treeHash = this.treeHash,
73-
previousVersion = previousVersion?.let { KVEntryReference(it.data!!) },
74-
originalVersion = originalVersion?.let { KVEntryReference(it.data!!) },
75-
baseVersion = baseVersion?.let { KVEntryReference(it.data!!) },
76-
mergedVersion1 = mergedVersion1?.let { KVEntryReference(it.data!!) },
77-
mergedVersion2 = mergedVersion2?.let { KVEntryReference(it.data!!) },
82+
previousVersion = previousVersion?.let { KVEntryReference(it.data) },
83+
originalVersion = originalVersion?.let { KVEntryReference(it.data) },
84+
baseVersion = baseVersion?.let { KVEntryReference(it.data) },
85+
mergedVersion1 = mergedVersion1?.let { KVEntryReference(it.data) },
86+
mergedVersion2 = mergedVersion2?.let { KVEntryReference(it.data) },
7887
operations = localizedOps.toTypedArray(),
7988
operationsHash = null,
8089
numberOfOperations = localizedOps.size,
@@ -86,11 +95,11 @@ class CLVersion : IVersion {
8695
time = time,
8796
author = author,
8897
treeHash = this.treeHash,
89-
previousVersion = previousVersion?.let { KVEntryReference(it.data!!) },
90-
originalVersion = originalVersion?.let { KVEntryReference(it.data!!) },
91-
baseVersion = baseVersion?.let { KVEntryReference(it.data!!) },
92-
mergedVersion1 = mergedVersion1?.let { KVEntryReference(it.data!!) },
93-
mergedVersion2 = mergedVersion2?.let { KVEntryReference(it.data!!) },
98+
previousVersion = previousVersion?.let { KVEntryReference(it.data) },
99+
originalVersion = originalVersion?.let { KVEntryReference(it.data) },
100+
baseVersion = baseVersion?.let { KVEntryReference(it.data) },
101+
mergedVersion1 = mergedVersion1?.let { KVEntryReference(it.data) },
102+
mergedVersion2 = mergedVersion2?.let { KVEntryReference(it.data) },
94103
operations = null,
95104
operationsHash = KVEntryReference(opsList),
96105
numberOfOperations = localizedOps.size,
@@ -116,17 +125,17 @@ class CLVersion : IVersion {
116125
}
117126

118127
val author: String?
119-
get() = data!!.author
128+
get() = data.author
120129

121130
val id: Long
122-
get() = data!!.id
131+
get() = data.id
123132

124133
@Deprecated("Use getTimestamp()")
125134
val time: String?
126-
get() = data!!.time
135+
get() = data.time
127136

128137
fun getTimestamp(): Instant? {
129-
val dateTimeStr = data!!.time ?: return null
138+
val dateTimeStr = data.time ?: return null
130139
try {
131140
return Instant.fromEpochSeconds(dateTimeStr.toLong())
132141
} catch (ex: Exception) {}
@@ -138,47 +147,47 @@ class CLVersion : IVersion {
138147

139148
@Deprecated("Use getContentHash()", ReplaceWith("getContentHash()"))
140149
val hash: String
141-
get() = data!!.hash
150+
get() = data.hash
142151

143-
override fun getContentHash(): String = data!!.hash
152+
override fun getContentHash(): String = data.hash
144153

145154
@Deprecated("Use getTree()", ReplaceWith("getTree()"))
146155
@get:JvmName("getTree_()")
147156
val tree: CLTree
148-
get() = CLTree(treeHash!!.getValue(store), store)
157+
get() = CLTree(treeHash.getValue(store), store)
149158

150159
override fun getTree(): CLTree = tree
151160

152161
val baseVersion: CLVersion?
153162
get() {
154-
val previousVersionHash = data!!.baseVersion ?: data!!.previousVersion ?: return null
163+
val previousVersionHash = data.baseVersion ?: data.previousVersion ?: return null
155164
val previousVersion = previousVersionHash.getValue(store)
156165
return CLVersion(previousVersion, store)
157166
}
158167

159168
val operations: Iterable<IOperation>
160169
get() {
161-
val operationsHash = data!!.operationsHash
170+
val operationsHash = data.operationsHash
162171
val ops = operationsHash?.getValue(store)?.getOperations(asyncStore)?.toList()?.getSynchronous()
163-
?: data!!.operations?.toList()
172+
?: data.operations?.toList()
164173
?: emptyList()
165174
return globalizeOps(ops)
166175
}
167176

168177
val numberOfOperations: Int
169-
get() = data!!.numberOfOperations
178+
get() = data.numberOfOperations
170179

171180
fun operationsInlined(): Boolean {
172-
return data!!.operations != null
181+
return data.operations != null
173182
}
174183

175-
fun isMerge() = this.data!!.mergedVersion1 != null
184+
fun isMerge() = this.data.mergedVersion1 != null
176185

177-
fun getMergedVersion1() = this.data!!.mergedVersion1?.let { CLVersion(it.getValue(store), store) }
178-
fun getMergedVersion2() = this.data!!.mergedVersion2?.let { CLVersion(it.getValue(store), store) }
186+
fun getMergedVersion1() = this.data.mergedVersion1?.let { CLVersion(it.getValue(store), store) }
187+
fun getMergedVersion2() = this.data.mergedVersion2?.let { CLVersion(it.getValue(store), store) }
179188

180189
fun write(): String {
181-
KVEntryReference(data!!).write(store)
190+
KVEntryReference(data).write(store)
182191
return hash
183192
}
184193

@@ -188,13 +197,13 @@ class CLVersion : IVersion {
188197

189198
other as CLVersion
190199

191-
if (data?.id != other.data?.id) return false
200+
if (data.id != other.data.id) return false
192201

193202
return true
194203
}
195204

196205
override fun hashCode(): Int {
197-
return data?.id?.hashCode() ?: 0
206+
return data.id.hashCode()
198207
}
199208

200209
override fun toString(): String {
@@ -313,6 +322,54 @@ class CLVersion : IVersion {
313322
}
314323
}
315324
}
325+
326+
fun getParents(stopAt: CLVersion?): List<CLVersion> {
327+
if (stopAt != null && this.getContentHash() == stopAt.getContentHash()) {
328+
return emptyList()
329+
}
330+
val ancestors = if (isMerge()) {
331+
listOf(getMergedVersion1()!!, getMergedVersion2()!!)
332+
} else {
333+
listOfNotNull(baseVersion)
334+
}
335+
return ancestors.filter { stopAt == null || it.getContentHash() != stopAt.getContentHash() }
336+
}
337+
338+
fun getAncestors(includeSelf: Boolean, stopAt: CLVersion?): List<CLVersion> {
339+
if (stopAt != null && this.getContentHash() == stopAt.getContentHash()) {
340+
return emptyList()
341+
}
342+
return if (includeSelf) {
343+
listOf(this) + getAncestors(false, stopAt)
344+
} else {
345+
getParents(stopAt).flatMap { it.getAncestors(true, stopAt) }
346+
}
347+
}
348+
}
349+
350+
fun CLVersion.fullDiff(baseVersion: CLVersion?): Observable<IKVValue> {
351+
val history = historyDiff(baseVersion)
352+
return history.concatWith(
353+
history.flatMap { version ->
354+
val baseVersion = version.baseVersion?.getValue(asyncStore) ?: singleOf(null)
355+
val currentVersion = version.treeHash.getValue(asyncStore)
356+
val treeDiff = currentVersion.zipWith(baseVersion) { v, b ->
357+
if (b == null) v.getAllObjects(asyncStore) else v.objectDiff(b, asyncStore)
358+
}.flatten()
359+
if (version.operationsHash != null) {
360+
val operations = version.operationsHash.getValue(asyncStore).flatMapObservable { it.getAllObjects(asyncStore) }
361+
treeDiff.concatWith(operations)
362+
} else {
363+
treeDiff
364+
}
365+
},
366+
)
367+
}
368+
369+
fun CLVersion.historyDiff(baseVersion: CLVersion?): Observable<CPVersion> {
370+
val commonBase = VersionMerger.commonBaseVersion(this, baseVersion)
371+
val history = getAncestors(true, commonBase).map { it.data }
372+
return history.asObservable()
316373
}
317374

318375
fun CLVersion.computeDelta(baseVersion: CLVersion?): Map<String, String> {

model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/WrittenEntry.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ class WrittenEntry<E : IKVValue>(
1414

1515
override fun getValue(store: IDeserializingKeyValueStore): E {
1616
return store.get(hash, deserializer)
17-
?: throw RuntimeException("Entry $hash not found")
17+
?: throw MissingEntryException("Entry $hash not found")
1818
}
1919

2020
override fun getValue(store: IAsyncObjectStore): Single<E> {
21-
return store.get(toObjectHash()).asSingleOrError { NoSuchElementException("Entry not found: $this") }
21+
return store.get(toObjectHash()).asSingleOrError { MissingEntryException("Entry not found: $this") }
2222
}
2323

2424
override fun getUnwrittenValue(): E {
@@ -35,3 +35,5 @@ class WrittenEntry<E : IKVValue>(
3535
return hash
3636
}
3737
}
38+
39+
class MissingEntryException(val hash: String) : NoSuchElementException("Entry not found: $hash")

0 commit comments

Comments
 (0)