Skip to content

Commit b454885

Browse files
committed
fix(model-client): fixed unstable ReplicatedRepositoryTest
1 parent 9497bc8 commit b454885

File tree

3 files changed

+493
-107
lines changed

3 files changed

+493
-107
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/client2/ReplicatedModel.kt

Lines changed: 223 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -2,68 +2,61 @@ package org.modelix.model.client2
22

33
import kotlinx.coroutines.CoroutineScope
44
import kotlinx.coroutines.Dispatchers
5+
import kotlinx.coroutines.Job
56
import kotlinx.coroutines.cancel
67
import kotlinx.coroutines.delay
78
import kotlinx.coroutines.launch
89
import kotlinx.coroutines.sync.Mutex
910
import kotlinx.coroutines.sync.withLock
11+
import org.modelix.model.IVersion
1012
import org.modelix.model.VersionMerger
1113
import org.modelix.model.api.IBranch
1214
import org.modelix.model.api.IBranchListener
15+
import org.modelix.model.api.IIdGenerator
1316
import org.modelix.model.api.ITree
1417
import org.modelix.model.api.PBranch
18+
import org.modelix.model.api.runSynchronized
1519
import org.modelix.model.lazy.BranchReference
1620
import org.modelix.model.lazy.CLTree
1721
import org.modelix.model.lazy.CLVersion
1822
import org.modelix.model.operations.OTBranch
1923
import org.modelix.model.server.api.ModelQuery
20-
import kotlin.coroutines.cancellation.CancellationException
2124

22-
class ReplicatedModel(val client: IModelClientV2, val branchRef: BranchReference, val query: ModelQuery? = null) {
23-
private val scope = CoroutineScope(Dispatchers.Default)
25+
class ReplicatedModel(
26+
val client: IModelClientV2,
27+
val branchRef: BranchReference,
28+
private val providedScope: CoroutineScope? = null,
29+
@Deprecated("was never supported")
30+
val query: ModelQuery? = null,
31+
) {
32+
private val scope = providedScope ?: CoroutineScope(Dispatchers.Default)
2433
private var state = State.New
25-
private lateinit var otBranch: OTBranch
26-
private lateinit var rawBranch: IBranch
27-
28-
private lateinit var lastRemoteVersion: CLVersion
29-
private lateinit var localVersion: CLVersion
30-
private val mergeMutex = Mutex()
31-
32-
private var author: String? = null
34+
private lateinit var localModel: LocalModel
35+
private val remoteVersion = RemoteVersion(client, branchRef)
36+
private var pollingJob: Job? = null
3337

3438
fun getBranch(): IBranch {
3539
if (state != State.Started) throw IllegalStateException("state is $state")
36-
return otBranch
40+
return localModel.otBranch
3741
}
3842

3943
suspend fun start(): IBranch {
4044
if (state != State.New) throw IllegalStateException("already started")
4145
state = State.Starting
4246

43-
author = client.getUserId()
47+
val initialVersion = remoteVersion.pull()
48+
localModel = LocalModel(initialVersion, client.getIdGenerator(), { client.getUserId() })
4449

45-
lastRemoteVersion = if (query == null) {
46-
client.pull(branchRef, null)
47-
} else {
48-
client.pull(branchRef, null, query)
49-
} as CLVersion
50-
localVersion = lastRemoteVersion
51-
rawBranch = PBranch(lastRemoteVersion.getTree(), client.getIdGenerator())
52-
otBranch = OTBranch(rawBranch, client.getIdGenerator(), lastRemoteVersion.store)
53-
54-
scope.launch {
50+
// receive changes from the server
51+
pollingJob = scope.launch {
5552
var nextDelayMs: Long = 0
5653
while (state != State.Disposed) {
5754
if (nextDelayMs > 0) delay(nextDelayMs)
5855
try {
59-
val newRemoteVersion = if (query == null) {
60-
client.poll(branchRef, lastRemoteVersion)
61-
} else {
62-
client.poll(branchRef, lastRemoteVersion, query)
63-
} as CLVersion
56+
val newRemoteVersion = remoteVersion.poll()
6457
remoteVersionReceived(newRemoteVersion)
6558
nextDelayMs = 0
66-
} catch (ex: CancellationException) {
59+
} catch (ex: kotlinx.coroutines.CancellationException) {
6760
LOG.debug { "Stop to poll branch $branchRef after disposing." }
6861
throw ex
6962
} catch (ex: Throwable) {
@@ -73,79 +66,92 @@ class ReplicatedModel(val client: IModelClientV2, val branchRef: BranchReference
7366
}
7467
}
7568

76-
rawBranch.addListener(object : IBranchListener {
69+
// convergence watchdog
70+
// scope.launch {
71+
// var nextDelayMs: Long = 1000
72+
// while (state != State.Disposed) {
73+
// if (nextDelayMs > 0) delay(nextDelayMs)
74+
// try {
75+
// val newRemoteVersion = remoteVersion.pull()
76+
// remoteVersionReceived(newRemoteVersion)
77+
// nextDelayMs = 1000
78+
// } catch (ex: CancellationException) {
79+
// break
80+
// } catch (ex: Throwable) {
81+
// LOG.error(ex) { "Failed to pull branch $branchRef" }
82+
// nextDelayMs = (nextDelayMs * 3 / 2).coerceIn(1000, 30000)
83+
// }
84+
// }
85+
// }
86+
87+
localModel.rawBranch.addListener(object : IBranchListener {
7788
override fun treeChanged(oldTree: ITree?, newTree: ITree) {
78-
checkDisposed()
89+
if (isDisposed()) return
7990
scope.launch {
8091
pushLocalChanges()
92+
// while (state != State.Disposed) {
93+
// if (remoteVersion.getNumberOfUnconfirmed() == 0) {
94+
// pushLocalChanges()
95+
// break
96+
// } else {
97+
// delay(100.milliseconds)
98+
// }
99+
// }
81100
}
82101
}
83102
})
84103

85104
state = State.Started
86-
return otBranch
105+
return getBranch()
87106
}
88107

108+
suspend fun resetToServerVersion() {
109+
localModel.resetToVersion(client.pull(branchRef, lastKnownVersion = null).upcast())
110+
}
111+
112+
fun isDisposed(): Boolean = state == State.Disposed
113+
89114
private fun checkDisposed() {
90115
if (state == State.Disposed) throw IllegalStateException("disposed")
91116
}
92117

93118
fun dispose() {
94119
if (state == State.Disposed) return
120+
pollingJob?.cancel("disposed")
95121
state = State.Disposed
96-
scope.cancel("disposed")
122+
if (providedScope == null) {
123+
scope.cancel("disposed")
124+
}
97125
}
98126

99127
private suspend fun remoteVersionReceived(newRemoteVersion: CLVersion) {
100-
checkDisposed()
101-
if (lastRemoteVersion.getContentHash() == newRemoteVersion.getContentHash()) return
102-
lastRemoteVersion = newRemoteVersion
103-
mergeMutex.withLock {
104-
if (newRemoteVersion.getContentHash() != localVersion.getContentHash()) {
105-
otBranch.runWrite {
106-
applyPendingLocalChanges()
107-
try {
108-
localVersion = VersionMerger(newRemoteVersion.store, client.getIdGenerator())
109-
.mergeChange(localVersion, newRemoteVersion)
110-
} catch (ex: Exception) {
111-
LOG.warn(ex) { "Failed to merge remote version $newRemoteVersion into local version $localVersion. Resetting to remote version." }
112-
localVersion = newRemoteVersion
113-
}
114-
rawBranch.writeTransaction.tree = localVersion.tree
115-
}
128+
if (isDisposed()) return
129+
130+
val mergedVersion = try {
131+
localModel.mergeRemoteVersion(newRemoteVersion)
132+
} catch (ex: Exception) {
133+
val currentLocalVersion = localModel.getCurrentVersion()
134+
LOG.warn(ex) { "Failed to merge remote version $newRemoteVersion into local version $currentLocalVersion. Resetting to remote version." }
135+
localModel.resetToVersion(newRemoteVersion)
136+
newRemoteVersion
137+
}
138+
139+
if (mergedVersion.getContentHash() != newRemoteVersion.getContentHash()) {
140+
val received = remoteVersion.push(mergedVersion)
141+
if (received.getContentHash() != mergedVersion.getContentHash()) {
142+
remoteVersionReceived(received)
116143
}
117144
}
118145
}
119146

120147
private suspend fun pushLocalChanges() {
121-
checkDisposed()
122-
val createdVersion: CLVersion?
123-
mergeMutex.withLock {
124-
createdVersion = applyPendingLocalChanges()
125-
}
126-
if (createdVersion != null) {
127-
remoteVersionReceived(client.push(branchRef, createdVersion, baseVersion = lastRemoteVersion) as CLVersion)
128-
}
129-
}
148+
if (isDisposed()) return
130149

131-
private fun applyPendingLocalChanges(): CLVersion? {
132-
checkDisposed()
133-
require(mergeMutex.isLocked)
134-
val createdVersion = otBranch.computeRead {
135-
val (ops, newTree) = otBranch.operationsAndTree
136-
if (ops.isEmpty()) return@computeRead null
137-
CLVersion.createRegularVersion(
138-
id = client.getIdGenerator().generate(),
139-
author = author,
140-
tree = newTree as CLTree,
141-
baseVersion = localVersion,
142-
operations = ops.map { it.getOriginalOp() }.toTypedArray(),
143-
)
150+
val version = localModel.createNewLocalVersion() ?: localModel.getCurrentVersion()
151+
val received = remoteVersion.push(version)
152+
if (received.getContentHash() != version.getContentHash()) {
153+
remoteVersionReceived(received)
144154
}
145-
if (createdVersion != null) {
146-
localVersion = createdVersion
147-
}
148-
return createdVersion
149155
}
150156

151157
private enum class State {
@@ -164,7 +170,148 @@ fun IModelClientV2.getReplicatedModel(branchRef: BranchReference): ReplicatedMod
164170
return ReplicatedModel(this, branchRef)
165171
}
166172

173+
fun IModelClientV2.getReplicatedModel(branchRef: BranchReference, scope: CoroutineScope): ReplicatedModel {
174+
return ReplicatedModel(this, branchRef, scope)
175+
}
176+
167177
@Deprecated("ModelQuery is not supported and ignored", ReplaceWith("getReplicatedModel(branchRef)"))
168178
fun IModelClientV2.getReplicatedModel(branchRef: BranchReference, query: ModelQuery?): ReplicatedModel {
169-
return ReplicatedModel(this, branchRef, query)
179+
return ReplicatedModel(this, branchRef, providedScope = null, query = query)
180+
}
181+
182+
/**
183+
* Manages the locks during the creation and merge of versions.
184+
*/
185+
private class LocalModel(initialVersion: CLVersion, val idGenerator: IIdGenerator, val author: () -> String?) {
186+
187+
/**
188+
* The state of the local model is the state of localVersion.getTree() plus the pending changes in
189+
* OTBranch.completedChanges.
190+
*
191+
* All changes done to OTBranch assume that they are applied on top of the current value of localVersion.
192+
* This means, before updating localVersion ensure there are no pending changes in OTBranch and that there are no
193+
* active write transactions that will contribute to the pending changes when successful.
194+
*/
195+
private var localVersion: CLVersion = initialVersion
196+
get() {
197+
check(mutex.isLocked)
198+
return field
199+
}
200+
set(value) {
201+
check(mutex.isLocked)
202+
check(otBranch.canWrite()) { "Write transaction required to update the localVersion field" }
203+
field = value
204+
}
205+
206+
val rawBranch: IBranch = PBranch(initialVersion.getTree(), idGenerator)
207+
val otBranch = OTBranch(rawBranch, idGenerator, initialVersion.store)
208+
private val merger = VersionMerger(initialVersion.store, idGenerator)
209+
210+
private val mutex = Mutex()
211+
212+
suspend fun resetToVersion(version: CLVersion) {
213+
mutex.withLock {
214+
otBranch.computeWrite { // write transaction ensures there are no active changes done on an outdated version
215+
otBranch.getPendingChanges() // discard any pending changes
216+
localVersion = version
217+
rawBranch.writeTransaction.tree = version.getTree()
218+
}
219+
}
220+
}
221+
222+
suspend fun getCurrentVersion() = mutex.withLock { localVersion }
223+
224+
suspend fun mergeRemoteVersion(remoteVersion: CLVersion): CLVersion {
225+
return mutex.withLock {
226+
// Avoid triggering branch listeners (causing endless loops) if there is no change.
227+
if (localVersion.getContentHash() == remoteVersion.getContentHash()) return localVersion
228+
229+
otBranch.computeWrite {
230+
// Writing to localVersion requires that there are no pending operations in OTBranch. By creating a new
231+
// local version first, the pending operations become part of it.
232+
// Creating it inside a write transaction, guarantees that the list of pending changes stays empty util
233+
// we are done.
234+
doCreateNewLocalVersion()
235+
236+
// Now we can merge the remote version update the localVersion field without losing local changes.
237+
// TODO run the (potentially expensive) merge algorithm outside a write transaction to avoid blocking
238+
// the branch for too long. This requires to rerun the merge if new local changes were created in
239+
// the meantime.
240+
val mergedVersion = merger.mergeChange(localVersion, remoteVersion)
241+
242+
// The mutex guarantees that the localVersion field didn't change and the write transaction guarantees
243+
// that there are no local changes that would get lost. We are in a consistent state again.
244+
rawBranch.writeTransaction.tree = mergedVersion.getTree()
245+
localVersion = mergedVersion
246+
247+
// Return the new localVersion just for convenience.
248+
mergedVersion
249+
}
250+
}
251+
}
252+
253+
/**
254+
* @return null, if there are no pending changes and no new version was created.
255+
*/
256+
suspend fun createNewLocalVersion(): CLVersion? {
257+
return mutex.withLock { otBranch.computeWrite { doCreateNewLocalVersion() } }
258+
}
259+
260+
private fun doCreateNewLocalVersion(): CLVersion? {
261+
check(mutex.isLocked)
262+
val (ops, tree) = otBranch.getPendingChanges()
263+
check(tree is CLTree)
264+
265+
val baseVersion = localVersion
266+
267+
if (ops.isEmpty() && baseVersion.getTree().hash == tree.hash) return null
268+
val newVersion = CLVersion.createRegularVersion(
269+
id = idGenerator.generate(),
270+
author = author(),
271+
tree = tree,
272+
baseVersion = baseVersion,
273+
operations = ops.map { it.getOriginalOp() }.toTypedArray(),
274+
)
275+
localVersion = newVersion
276+
return newVersion
277+
}
278+
}
279+
280+
private class RemoteVersion(val client: IModelClientV2, val branchRef: BranchReference) {
281+
private var lastKnownRemoteVersion: CLVersion? = null
282+
private val unconfirmedVersions: MutableSet<String> = LinkedHashSet()
283+
284+
fun getNumberOfUnconfirmed() = runSynchronized(unconfirmedVersions) { unconfirmedVersions.size }
285+
286+
suspend fun pull(): CLVersion {
287+
return versionReceived(client.pull(branchRef, lastKnownVersion = lastKnownRemoteVersion).upcast())
288+
}
289+
290+
suspend fun poll(): CLVersion {
291+
return versionReceived(client.poll(branchRef, lastKnownVersion = lastKnownRemoteVersion).upcast())
292+
}
293+
294+
suspend fun push(version: CLVersion): CLVersion {
295+
if (lastKnownRemoteVersion?.getContentHash() == version.getContentHash()) return version
296+
runSynchronized(unconfirmedVersions) {
297+
if (!unconfirmedVersions.add(version.getContentHash())) return version
298+
}
299+
try {
300+
return versionReceived(client.push(branchRef, version, lastKnownRemoteVersion).upcast())
301+
} finally {
302+
runSynchronized(unconfirmedVersions) {
303+
unconfirmedVersions.remove(version.getContentHash())
304+
}
305+
}
306+
}
307+
308+
private fun versionReceived(v: CLVersion): CLVersion {
309+
runSynchronized(unconfirmedVersions) {
310+
unconfirmedVersions.remove(v.getContentHash())
311+
lastKnownRemoteVersion = v
312+
}
313+
return v
314+
}
170315
}
316+
317+
private fun IVersion.upcast(): CLVersion = this as CLVersion

0 commit comments

Comments
 (0)