Skip to content

Commit f1df675

Browse files
committed
fix(model-client): ReplicatedRepository failed to sync with the server
fixes MODELIX-515
1 parent 2d3d76a commit f1df675

File tree

11 files changed

+259
-154
lines changed

11 files changed

+259
-154
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/VersionMerger.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ class VersionMerger(private val storeCache: IDeserializingKeyValueStore, private
3232
if (newVersion.hash == lastMergedVersion.hash) {
3333
return lastMergedVersion
3434
}
35-
val merged = mergeHistory(lastMergedVersion, newVersion)
3635
checkRepositoryIds(lastMergedVersion, newVersion)
36+
val merged = mergeHistory(lastMergedVersion, newVersion)
3737
return merged
3838
}
3939

model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ interface IModelClientV2 {
3434
/**
3535
* The pushed version is merged automatically by the server with the current head.
3636
* The merge result is returned.
37+
* @param baseVersion Some version that is known to exist on the server.
38+
* Is used for optimizing the amount of data sent to the server.
3739
*/
38-
suspend fun push(branch: BranchReference, version: IVersion): IVersion
40+
suspend fun push(branch: BranchReference, version: IVersion, baseVersion: IVersion?): IVersion
3941

4042
suspend fun pull(branch: BranchReference, lastKnownVersion: IVersion?): IVersion
4143
suspend fun pull(branch: BranchReference, lastKnownVersion: IVersion?, filter: ModelQuery): IVersion

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ import org.modelix.model.lazy.BranchReference
3939
import org.modelix.model.lazy.CLVersion
4040
import org.modelix.model.lazy.ObjectStoreCache
4141
import org.modelix.model.lazy.RepositoryId
42+
import org.modelix.model.lazy.computeDelta
4243
import org.modelix.model.persistent.HashUtil
4344
import org.modelix.model.persistent.MapBaseStore
4445
import org.modelix.model.server.api.ModelQuery
4546
import org.modelix.model.server.api.v2.VersionDelta
46-
import kotlin.jvm.Synchronized
4747
import kotlin.time.Duration.Companion.seconds
4848

4949
class ModelClientV2(
@@ -53,7 +53,7 @@ class ModelClientV2(
5353
private var clientId: Int = 0
5454
private var idGenerator: IIdGenerator = IdGeneratorDummy()
5555
private var userId: String? = null
56-
private val kvStore = UncommitedEntriesStore()
56+
private val kvStore = MapBaseStore()
5757
val store = ObjectStoreCache(kvStore) // TODO the store will accumulate garbage
5858

5959
suspend fun init() {
@@ -129,17 +129,19 @@ class ModelClientV2(
129129
return createVersion(null, delta)
130130
}
131131

132-
override suspend fun push(branch: BranchReference, version: IVersion): IVersion {
132+
override suspend fun push(branch: BranchReference, version: IVersion, baseVersion: IVersion?): IVersion {
133+
LOG.debug { "${clientId.toString(16)}.push($branch, $version, $baseVersion)" }
133134
require(version is CLVersion)
135+
require(baseVersion is CLVersion?)
134136
version.write()
135-
val objects = kvStore.getUncommitedEntries().values.filterNotNull().toSet()
137+
val objects = version.computeDelta(baseVersion).values.filterNotNull().toSet()
136138
val response = httpClient.post {
137139
url {
138140
takeFrom(baseUrl)
139141
appendPathSegments("repositories", branch.repositoryId.id, "branches", branch.branchName)
140142
}
141143
contentType(ContentType.Application.Json)
142-
val body = VersionDelta(version.hash, null, objects)
144+
val body = VersionDelta(version.getContentHash(), null, objects)
143145
setBody(body)
144146
}
145147
val mergedVersionDelta = response.body<VersionDelta>()
@@ -157,7 +159,9 @@ class ModelClientV2(
157159
}
158160
}
159161
}
160-
return createVersion(lastKnownVersion, response.body())
162+
val receivedVersion = createVersion(lastKnownVersion, response.body())
163+
LOG.debug { "${clientId.toString(16)}.pull($branch, $lastKnownVersion) -> $receivedVersion" }
164+
return receivedVersion
161165
}
162166

163167
override suspend fun poll(branch: BranchReference, lastKnownVersion: IVersion?): IVersion {
@@ -171,7 +175,9 @@ class ModelClientV2(
171175
}
172176
}
173177
}
174-
return createVersion(lastKnownVersion, response.body())
178+
val receivedVersion = createVersion(lastKnownVersion, response.body())
179+
LOG.debug { "${clientId.toString(16)}.poll($branch, $lastKnownVersion) -> $receivedVersion" }
180+
return receivedVersion
175181
}
176182

177183
override suspend fun pull(branch: BranchReference, lastKnownVersion: IVersion?, filter: ModelQuery): IVersion {
@@ -201,6 +207,7 @@ class ModelClientV2(
201207
}
202208

203209
companion object {
210+
private val LOG = mu.KotlinLogging.logger {}
204211
fun builder(): ModelClientV2Builder = ModelClientV2PlatformSpecificBuilder()
205212
}
206213
}
@@ -268,20 +275,3 @@ abstract class ModelClientV2Builder {
268275
}
269276

270277
expect class ModelClientV2PlatformSpecificBuilder() : ModelClientV2Builder
271-
272-
private class UncommitedEntriesStore() : MapBaseStore() {
273-
private var uncommitedEntries: MutableMap<String, String?> = HashMap()
274-
275-
@Synchronized
276-
fun getUncommitedEntries(): Map<String, String?> {
277-
val result = uncommitedEntries
278-
uncommitedEntries = HashMap()
279-
return result
280-
}
281-
282-
@Synchronized
283-
override fun putAll(entries: Map<String, String?>) {
284-
uncommitedEntries.putAll(entries)
285-
super.putAll(entries)
286-
}
287-
}

model-client/src/commonMain/kotlin/org/modelix/model/client2/ReplicatedModel.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class ReplicatedModel(val client: IModelClientV2, val branchRef: BranchReference
111111
createdVersion = applyPendingLocalChanges()
112112
}
113113
if (createdVersion != null) {
114-
remoteVersionReceived(client.push(branchRef, createdVersion) as CLVersion)
114+
remoteVersionReceived(client.push(branchRef, createdVersion, baseVersion = lastRemoteVersion) as CLVersion)
115115
}
116116
}
117117

@@ -147,6 +147,11 @@ class ReplicatedModel(val client: IModelClientV2, val branchRef: BranchReference
147147
}
148148
}
149149

150-
fun IModelClientV2.getReplicatedModel(branchRef: BranchReference, query: ModelQuery? = null): ReplicatedModel {
150+
fun IModelClientV2.getReplicatedModel(branchRef: BranchReference): ReplicatedModel {
151+
return ReplicatedModel(this, branchRef)
152+
}
153+
154+
@Deprecated("ModelQuery is not supported and ignored", ReplaceWith("getReplicatedModel(branchRef)"))
155+
fun IModelClientV2.getReplicatedModel(branchRef: BranchReference, query: ModelQuery?): ReplicatedModel {
151156
return ReplicatedModel(this, branchRef, query)
152157
}

model-client/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ import kotlinx.datetime.Clock
1919
import kotlinx.datetime.Instant
2020
import kotlinx.datetime.TimeZone
2121
import kotlinx.datetime.toInstant
22+
import org.modelix.model.IKeyListener
23+
import org.modelix.model.IKeyValueStore
2224
import org.modelix.model.IVersion
25+
import org.modelix.model.LinearHistory
2326
import org.modelix.model.api.INodeReference
2427
import org.modelix.model.api.LocalPNodeReference
2528
import org.modelix.model.api.PNodeReference
2629
import org.modelix.model.operations.IOperation
2730
import org.modelix.model.operations.SetReferenceOp
31+
import org.modelix.model.persistent.CPNode
2832
import org.modelix.model.persistent.CPOperationsList
2933
import org.modelix.model.persistent.CPTree
3034
import org.modelix.model.persistent.CPVersion
@@ -280,3 +284,147 @@ class CLVersion : IVersion {
280284
}
281285
}
282286
}
287+
288+
fun CLVersion.computeDelta(baseVersion: CLVersion?): Map<String, String?> {
289+
return computeDelta(store.keyValueStore, this.getContentHash(), baseVersion?.getContentHash())
290+
}
291+
292+
private fun computeDelta(keyValueStore: IKeyValueStore, versionHash: String, baseVersionHash: String?): Map<String, String?> {
293+
val changedNodeIds = HashSet<Long>()
294+
val oldAndNewEntries: Map<String, String?> = trackAccessedEntries(keyValueStore) { store ->
295+
val version = CLVersion(versionHash, store)
296+
// generateSequence(version) { it.baseVersion }.map { it.getTree() }.count()
297+
298+
val visitedVersions = HashSet<String>()
299+
if (baseVersionHash != null) visitedVersions += baseVersionHash
300+
fun iterateHistory(v: CLVersion?) {
301+
if (v == null) return
302+
if (v.getContentHash() == baseVersionHash) return
303+
if (visitedVersions.contains(v.getContentHash())) return
304+
visitedVersions += v.getContentHash()
305+
val tree = v.getTree()
306+
v.operations.forEach {
307+
// we only need to record the required entries
308+
runCatching {
309+
it.captureIntend(tree, store)
310+
}
311+
}
312+
iterateHistory(v.baseVersion)
313+
iterateHistory(v.getMergedVersion1())
314+
iterateHistory(v.getMergedVersion2())
315+
}
316+
iterateHistory(version)
317+
318+
val baseVersion = baseVersionHash?.let { CLVersion(it, store) }
319+
// if (baseVersion != null) {
320+
// VersionMerger(store, IdGenerator.newInstance(0)).mergeChange(version, baseVersion)
321+
// }
322+
323+
val history = LinearHistory(baseVersionHash).load(version)
324+
val bulkQuery = BulkQuery(store)
325+
var v1 = baseVersion
326+
for (v2 in history) {
327+
v2.operations // include them in the result
328+
329+
if (v1 == null) {
330+
v2.getTree().root?.getDescendants(BulkQuery(store), true)?.execute()
331+
continue
332+
}
333+
334+
val oldTree = v1.getTree()
335+
v2.getTree().nodesMap!!.visitChanges(
336+
oldTree.nodesMap!!,
337+
object : CLHamtNode.IChangeVisitor {
338+
override fun visitChangesOnly(): Boolean = false
339+
override fun entryAdded(key: Long, value: KVEntryReference<CPNode>?) {
340+
changedNodeIds += key
341+
if (value != null) bulkQuery.query(value, {})
342+
}
343+
344+
override fun entryRemoved(key: Long, value: KVEntryReference<CPNode>?) {
345+
changedNodeIds += key
346+
}
347+
348+
override fun entryChanged(
349+
key: Long,
350+
oldValue: KVEntryReference<CPNode>?,
351+
newValue: KVEntryReference<CPNode>?,
352+
) {
353+
changedNodeIds += key
354+
if (newValue != null) bulkQuery.query(newValue, {})
355+
}
356+
},
357+
bulkQuery,
358+
)
359+
v1 = v2
360+
}
361+
bulkQuery.process()
362+
}
363+
val oldEntries: Map<String, String?> = trackAccessedEntries(keyValueStore) { store ->
364+
if (baseVersionHash == null) return@trackAccessedEntries
365+
366+
// record read access on the version data itself
367+
val baseVersion = CLVersion(baseVersionHash, store)
368+
baseVersion.operations
369+
370+
val oldTree = baseVersion.getTree()
371+
val bulkQuery = BulkQuery(store)
372+
373+
val nodesMap = oldTree.nodesMap!!
374+
changedNodeIds.forEach { changedNodeId ->
375+
nodesMap.get(changedNodeId, 0, bulkQuery).onSuccess { nodeRef: KVEntryReference<CPNode>? ->
376+
if (nodeRef != null) bulkQuery.query(nodeRef) { a: CPNode? -> }
377+
}
378+
}
379+
380+
bulkQuery.process()
381+
}
382+
return oldAndNewEntries - oldEntries.keys
383+
}
384+
385+
private fun trackAccessedEntries(store: IKeyValueStore, body: (IDeserializingKeyValueStore) -> Unit): Map<String, String?> {
386+
val accessTrackingStore = AccessTrackingStore(store)
387+
val objectStore = ObjectStoreCache(accessTrackingStore)
388+
body(objectStore)
389+
return accessTrackingStore.accessedEntries
390+
}
391+
392+
private class AccessTrackingStore(val store: IKeyValueStore) : IKeyValueStore {
393+
val accessedEntries: MutableMap<String, String?> = HashMap()
394+
395+
override fun get(key: String): String? {
396+
val value = store.get(key)
397+
accessedEntries.put(key, value)
398+
return value
399+
}
400+
401+
override fun put(key: String, value: String?) {
402+
TODO("Not yet implemented")
403+
}
404+
405+
override fun getAll(keys: Iterable<String>): Map<String, String?> {
406+
val entries = store.getAll(keys)
407+
accessedEntries.putAll(entries)
408+
return entries
409+
}
410+
411+
override fun putAll(entries: Map<String, String?>) {
412+
TODO("Not yet implemented")
413+
}
414+
415+
override fun prefetch(key: String) {
416+
TODO("Not yet implemented")
417+
}
418+
419+
override fun listen(key: String, listener: IKeyListener) {
420+
TODO("Not yet implemented")
421+
}
422+
423+
override fun removeListener(key: String, listener: IKeyListener) {
424+
TODO("Not yet implemented")
425+
}
426+
427+
override fun getPendingSize(): Int {
428+
TODO("Not yet implemented")
429+
}
430+
}

model-client/src/commonMain/kotlin/org/modelix/model/lazy/WrittenEntry.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ class WrittenEntry<E : IKVValue>(
2222
override fun getHash(): String = hash
2323

2424
override fun getValue(store: IDeserializingKeyValueStore): E {
25-
return store.get(hash, deserializer) ?: throw RuntimeException("Entry $hash not found")
25+
return store.get(hash, deserializer)
26+
?: throw RuntimeException("Entry $hash not found")
2627
}
2728

2829
override fun write(store: IDeserializingKeyValueStore) {}

model-server/src/main/kotlin/org/modelix/model/server/handlers/AccessTrackingStore.kt

Lines changed: 0 additions & 57 deletions
This file was deleted.

0 commit comments

Comments
 (0)