Skip to content

Commit 3b24507

Browse files
benedekhslisson
authored andcommitted
feat(model-client): start ReplicatedModel with a lastKnownRemoteVersion explicitly
1 parent 7a9dc07 commit 3b24507

File tree

2 files changed

+216
-15
lines changed

2 files changed

+216
-15
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/client2/ReplicatedModel.kt

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,44 @@ import org.modelix.model.operations.OTBranch
2323

2424
/**
2525
* Dispose should be called on this, as otherwise a regular polling will go on.
26+
*
27+
* @property client the model client to connect to the model server
28+
* @property branchRef the model server branch to fetch the data from
29+
* @property providedScope the CoroutineScope to use for the suspendable tasks
30+
* @property initialRemoteVersion the last version on the server from which we want to start the synchronization
2631
*/
2732
class ReplicatedModel(
2833
val client: IModelClientV2,
2934
val branchRef: BranchReference,
3035
private val providedScope: CoroutineScope? = null,
36+
initialRemoteVersion: CLVersion? = null,
3137
) {
3238
private val scope = providedScope ?: CoroutineScope(Dispatchers.Default)
3339
private var state = State.New
34-
private lateinit var localModel: LocalModel
35-
private val remoteVersion = RemoteVersion(client, branchRef)
40+
private var localModel: LocalModel? = null
41+
private val remoteVersion = RemoteVersion(client, branchRef, initialRemoteVersion)
3642
private var pollingJob: Job? = null
3743

44+
init {
45+
if (initialRemoteVersion != null) {
46+
localModel = LocalModel(initialRemoteVersion, client.getIdGenerator()) { client.getUserId() }
47+
}
48+
}
49+
50+
private fun getLocalModel(): LocalModel = checkNotNull(localModel) { "Model is not initialized yet" }
51+
3852
fun getBranch(): IBranch {
39-
if (state != State.Started) throw IllegalStateException("state is $state")
40-
return localModel.otBranch
53+
return getLocalModel().otBranch
4154
}
4255

4356
suspend fun start(): IBranch {
4457
if (state != State.New) throw IllegalStateException("already started")
4558
state = State.Starting
4659

47-
val initialVersion = remoteVersion.pull()
48-
localModel = LocalModel(initialVersion, client.getIdGenerator(), { client.getUserId() })
60+
if (localModel == null) {
61+
val initialVersion = remoteVersion.pull()
62+
localModel = LocalModel(initialVersion, client.getIdGenerator()) { client.getUserId() }
63+
}
4964

5065
// receive changes from the server
5166
pollingJob = scope.launch {
@@ -66,7 +81,7 @@ class ReplicatedModel(
6681
}
6782
}
6883

69-
localModel.rawBranch.addListener(object : IBranchListener {
84+
getLocalModel().rawBranch.addListener(object : IBranchListener {
7085
override fun treeChanged(oldTree: ITree?, newTree: ITree) {
7186
if (isDisposed()) return
7287
scope.launch {
@@ -80,7 +95,7 @@ class ReplicatedModel(
8095
}
8196

8297
suspend fun resetToServerVersion() {
83-
localModel.resetToVersion(client.pull(branchRef, lastKnownVersion = null).upcast())
98+
getLocalModel().resetToVersion(client.pull(branchRef, lastKnownVersion = null).upcast())
8499
}
85100

86101
fun isDisposed(): Boolean = state == State.Disposed
@@ -102,11 +117,11 @@ class ReplicatedModel(
102117
if (isDisposed()) return
103118

104119
val mergedVersion = try {
105-
localModel.mergeRemoteVersion(newRemoteVersion)
120+
getLocalModel().mergeRemoteVersion(newRemoteVersion)
106121
} catch (ex: Exception) {
107-
val currentLocalVersion = localModel.getCurrentVersion()
122+
val currentLocalVersion = getLocalModel().getCurrentVersion()
108123
LOG.warn(ex) { "Failed to merge remote version $newRemoteVersion into local version $currentLocalVersion. Resetting to remote version." }
109-
localModel.resetToVersion(newRemoteVersion)
124+
getLocalModel().resetToVersion(newRemoteVersion)
110125
newRemoteVersion
111126
}
112127

@@ -121,15 +136,15 @@ class ReplicatedModel(
121136
private suspend fun pushLocalChanges() {
122137
if (isDisposed()) return
123138

124-
val version = localModel.createNewLocalVersion() ?: localModel.getCurrentVersion()
139+
val version = getLocalModel().createNewLocalVersion() ?: getLocalModel().getCurrentVersion()
125140
val received = remoteVersion.push(version)
126141
if (received.getContentHash() != version.getContentHash()) {
127142
remoteVersionReceived(received)
128143
}
129144
}
130145

131146
suspend fun getCurrentVersion(): CLVersion {
132-
return localModel.getCurrentVersion()
147+
return getLocalModel().getCurrentVersion()
133148
}
134149

135150
private enum class State {
@@ -250,8 +265,11 @@ private class LocalModel(initialVersion: CLVersion, val idGenerator: IIdGenerato
250265
}
251266
}
252267

253-
private class RemoteVersion(val client: IModelClientV2, val branchRef: BranchReference) {
254-
private var lastKnownRemoteVersion: CLVersion? = null
268+
private class RemoteVersion(
269+
val client: IModelClientV2,
270+
val branchRef: BranchReference,
271+
private var lastKnownRemoteVersion: CLVersion? = null,
272+
) {
255273
private val unconfirmedVersions: MutableSet<String> = LinkedHashSet()
256274

257275
fun getNumberOfUnconfirmed() = runSynchronized(unconfirmedVersions) { unconfirmedVersions.size }
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright (c) 2024.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.modelix.model.server
18+
19+
import io.ktor.server.testing.ApplicationTestBuilder
20+
import io.ktor.server.testing.testApplication
21+
import io.ktor.util.reflect.instanceOf
22+
import kotlinx.coroutines.CoroutineScope
23+
import kotlinx.coroutines.Dispatchers
24+
import kotlinx.coroutines.launch
25+
import org.junit.Assert.assertFalse
26+
import org.modelix.authorization.installAuthentication
27+
import org.modelix.model.api.ChildLinkFromName
28+
import org.modelix.model.api.ConceptReference
29+
import org.modelix.model.api.IBranch
30+
import org.modelix.model.api.ITree
31+
import org.modelix.model.api.PBranch
32+
import org.modelix.model.api.getRootNode
33+
import org.modelix.model.client2.ModelClientV2
34+
import org.modelix.model.client2.ReplicatedModel
35+
import org.modelix.model.lazy.BranchReference
36+
import org.modelix.model.lazy.CLTree
37+
import org.modelix.model.lazy.CLVersion
38+
import org.modelix.model.lazy.RepositoryId
39+
import org.modelix.model.operations.OTBranch
40+
import org.modelix.model.server.handlers.ModelReplicationServer
41+
import org.modelix.model.server.store.InMemoryStoreClient
42+
import org.modelix.model.server.store.forContextRepository
43+
import java.util.UUID
44+
import java.util.concurrent.CountDownLatch
45+
import java.util.concurrent.TimeUnit
46+
import kotlin.test.Test
47+
import kotlin.test.assertEquals
48+
import kotlin.test.assertTrue
49+
50+
class ReplicatedModelTest {
51+
52+
@Test
53+
fun startsWithLatestVersion() = runTest {
54+
val client = createModelClient()
55+
val repositoryId = RepositoryId(UUID.randomUUID().toString())
56+
57+
// Step 1: prepare repository with two versions beside the initial version
58+
// Step 1.1: create an empty repository
59+
val initialVersion = client.initRepository(repositoryId) as CLVersion
60+
61+
// Step 1.2: add a new child node to get a new version
62+
val defaultBranchReference = repositoryId.getBranchReference()
63+
addHelloChild(initialVersion, client, defaultBranchReference)
64+
65+
// Step 2: in a new client, fetch the latest repository data
66+
val newClient = createModelClient()
67+
// we do not provide an initial version, so we expect to fetch the latest one (with one "hello" child)
68+
val scope = CoroutineScope(Dispatchers.Default)
69+
val replicatedModel = ReplicatedModel(newClient, defaultBranchReference, scope)
70+
try {
71+
replicatedModel.getBranch()
72+
// if we get here, then we missed an expected exception
73+
assertFalse(true)
74+
} catch (ex: Exception) {
75+
/*
76+
Expected exception, because we did not specify an initial version.
77+
So without an explicit start we do not expect anything useful here.
78+
*/
79+
assertTrue(ex.instanceOf(IllegalStateException::class))
80+
}
81+
82+
val branch = replicatedModel.start()
83+
// Step 3: wait a bit so replicated model can fetch the new versions from the server
84+
waitUntilChildArrives(branch, scope, 500)
85+
86+
// Step 4: check, eventually we must have the one "hello" child
87+
val children = getHelloChildrenOfRootNode(branch)
88+
assertEquals(1, children.size)
89+
}
90+
91+
@Test
92+
fun startsWithSpecificVersion() = runTest {
93+
val client = createModelClient()
94+
val repositoryId = RepositoryId(UUID.randomUUID().toString())
95+
val defaultBranchReference = repositoryId.getBranchReference()
96+
97+
// Step 1: prepare repository with two versions beside the initial version
98+
// Step 1.1: create an empty repository
99+
val initialVersion = client.initRepository(repositoryId) as CLVersion
100+
101+
// Step 1.2: add a new child node to get a new version
102+
addHelloChild(initialVersion, client, defaultBranchReference)
103+
104+
// Step 2: in a new client, fetch the oneHelloChildVersion
105+
val newClient = createModelClient()
106+
107+
// Step 2.1: to avoid version was not created by this client exception
108+
val initialVersionClone = newClient.loadVersion(repositoryId, initialVersion.getContentHash(), null)
109+
110+
val scope = CoroutineScope(Dispatchers.Default)
111+
// Step 2.2: we provide an initial version, so we expect to fetch it first (0 "hello" child)
112+
val replicatedModel = ReplicatedModel(
113+
newClient,
114+
defaultBranchReference,
115+
providedScope = scope,
116+
initialRemoteVersion = initialVersionClone as CLVersion,
117+
)
118+
val branch = replicatedModel.getBranch()
119+
120+
// Step 3: check, here we must have 0 "hello" child
121+
val emptyChildren = getHelloChildrenOfRootNode(branch)
122+
assertTrue(emptyChildren.isEmpty())
123+
124+
replicatedModel.start()
125+
// Step 4: wait a bit so replicated model can fetch the new versions from the server
126+
waitUntilChildArrives(branch, scope, 500)
127+
128+
// Step 5: check, eventually we must have 1 "hello" child
129+
val children = getHelloChildrenOfRootNode(branch)
130+
assertEquals(1, children.size)
131+
}
132+
133+
private fun runTest(block: suspend ApplicationTestBuilder.() -> Unit) = testApplication {
134+
application {
135+
installAuthentication(unitTestMode = true)
136+
installDefaultServerPlugins()
137+
ModelReplicationServer(InMemoryStoreClient().forContextRepository()).init(this)
138+
}
139+
block()
140+
}
141+
142+
private fun waitUntilChildArrives(branch: IBranch, scope: CoroutineScope, timeout: Long) {
143+
val barrier = CountDownLatch(1)
144+
scope.launch {
145+
var childArrived = false
146+
while (!childArrived) {
147+
childArrived = getHelloChildrenOfRootNode(branch).isNotEmpty()
148+
}
149+
barrier.countDown()
150+
}
151+
// wait at most timeout ms for the child to arrive
152+
barrier.await(timeout, TimeUnit.MILLISECONDS)
153+
}
154+
155+
private suspend fun addHelloChild(
156+
baseVersion: CLVersion,
157+
client: ModelClientV2,
158+
branchReference: BranchReference,
159+
): CLVersion {
160+
val branch =
161+
OTBranch(
162+
PBranch(baseVersion.getTree(), client.getIdGenerator()),
163+
client.getIdGenerator(),
164+
baseVersion.store,
165+
)
166+
branch.runWriteT { t ->
167+
t.addNewChild(ITree.ROOT_ID, "hello", -1, null as ConceptReference?)
168+
}
169+
val (ops, newTree) = branch.getPendingChanges()
170+
val newVersion = CLVersion.createRegularVersion(
171+
client.getIdGenerator().generate(),
172+
null,
173+
null,
174+
newTree as CLTree,
175+
baseVersion,
176+
ops.map { it.getOriginalOp() }.toTypedArray(),
177+
)
178+
return client.push(branchReference, newVersion, baseVersion) as CLVersion
179+
}
180+
181+
private fun getHelloChildrenOfRootNode(branch: IBranch) =
182+
branch.computeReadT { it.branch.getRootNode().getChildren(ChildLinkFromName("hello")).toList() }
183+
}

0 commit comments

Comments
 (0)