Skip to content

Commit 203d776

Browse files
committed
feat(model-server): validation and merge of pushed version is now done outside a transaction
Validation and merge can be expensive when the changes are big. This was done in a global transaction before, which blocks all other requests.
1 parent d1a7cff commit 203d776

File tree

3 files changed

+33
-7
lines changed

3 files changed

+33
-7
lines changed

model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ interface IRepositoriesManager {
5353

5454
@RequiresTransaction
5555
fun mergeChanges(branch: BranchReference, newVersionHash: String): String
56+
suspend fun mergeChangesWithoutPush(branch: BranchReference, newVersionHash: String): String
5657
suspend fun computeDelta(repository: RepositoryId?, versionHash: String, baseVersionHash: String?): ObjectData =
5758
computeDelta(repository, versionHash, ObjectDeltaFilter(knownVersions = setOfNotNull(baseVersionHash)))
5859
suspend fun computeDelta(repository: RepositoryId?, versionHash: String, filter: ObjectDeltaFilter): ObjectData

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,14 @@ class ModelReplicationServer(
228228
@OptIn(RequiresTransaction::class) // no transactions required for immutable store
229229
repositoriesManager.getStoreClient(RepositoryId(repository), true).putAll(objectsFromClient)
230230
}
231+
232+
// Run a merge outside a transaction to keep the transaction for the actual merge smaller.
233+
// If there are no concurrent pushes on the same branch, then all the work is done here.
234+
val preMergedVersion = repositoriesManager.mergeChangesWithoutPush(branchRef, deltaFromClient.versionHash)
235+
231236
@OptIn(RequiresTransaction::class)
232237
val mergedHash = runWrite {
233-
repositoriesManager.mergeChanges(branchRef, deltaFromClient.versionHash)
238+
repositoriesManager.mergeChanges(branchRef, preMergedVersion)
234239
}
235240
call.respondDelta(RepositoryId(repository), mergedHash, ObjectDeltaFilter(deltaFromClient.versionHash))
236241
}

model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.modelix.model.server.store.RequiresTransaction
2626
import org.modelix.model.server.store.StoreManager
2727
import org.modelix.model.server.store.assertWrite
2828
import org.modelix.model.server.store.pollEntry
29+
import org.modelix.model.server.store.runReadIO
2930
import org.modelix.streams.IExecutableStream
3031
import org.modelix.streams.IStream
3132
import org.modelix.streams.SimpleStreamExecutor
@@ -158,7 +159,8 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager {
158159
id = stores.idGenerator.generate(),
159160
time = Clock.System.now().epochSeconds.toString(),
160161
author = userName,
161-
tree = CLTree(null, null, stores.getLegacyObjectStore(repositoryId.takeIf { isolated }), useRoleIds = useRoleIds),
162+
tree = CLTree.builder(stores.getAsyncStore(repositoryId.takeIf { isolated }))
163+
.useRoleIds(useRoleIds).build(),
162164
baseVersion = null,
163165
operations = emptyArray(),
164166
)
@@ -243,13 +245,33 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager {
243245
override fun mergeChanges(branch: BranchReference, newVersionHash: String): String {
244246
val headHash = getVersionHash(branch)
245247
if (headHash == newVersionHash) return headHash
246-
val legacyObjectStore = getLegacyObjectStore(branch.repositoryId)
247-
val newVersion = CLVersion.loadFromHash(newVersionHash, legacyObjectStore)
248+
val mergedHash = validateAndMerge(newVersionHash, headHash, branch.repositoryId)
249+
ensureBranchInList(branch)
250+
putVersionHash(branch, mergedHash)
251+
return mergedHash
252+
}
253+
254+
override suspend fun mergeChangesWithoutPush(branch: BranchReference, newVersionHash: String): String {
255+
@OptIn(RequiresTransaction::class)
256+
val headHash = getTransactionManager().runReadIO {
257+
getVersionHash(branch)
258+
}
259+
return validateAndMerge(newVersionHash, headHash, branch.repositoryId)
260+
}
261+
262+
private fun validateAndMerge(
263+
newVersionHash: String,
264+
headHash: String?,
265+
repositoryId: RepositoryId,
266+
): String {
267+
if (headHash == newVersionHash) return headHash
268+
val asyncStore = getAsyncStore(repositoryId)
269+
val newVersion = CLVersion.loadFromHash(newVersionHash, asyncStore)
248270
val mergedHash = if (headHash == null) {
249271
validateVersion(newVersion, null)
250272
newVersionHash
251273
} else {
252-
val legacyObjectStore = getLegacyObjectStore(branch.repositoryId)
274+
val legacyObjectStore = getLegacyObjectStore(repositoryId)
253275
val headVersion = CLVersion.loadFromHash(headHash, legacyObjectStore)
254276
require(headVersion.getTree().getId() == newVersion.getTree().getId()) {
255277
"Attempt to merge a model with ID '${newVersion.getTree().getId()}'" +
@@ -260,8 +282,6 @@ class RepositoriesManager(val stores: StoreManager) : IRepositoriesManager {
260282
.mergeChange(headVersion, newVersion)
261283
mergedVersion.getContentHash()
262284
}
263-
ensureBranchInList(branch)
264-
putVersionHash(branch, mergedHash)
265285
return mergedHash
266286
}
267287

0 commit comments

Comments
 (0)