Skip to content

Commit 037c8be

Browse files
committed
feat(model-client): new filter parameter for pull
Clients can now specify which objects they need, instead of receiving the whole history.
1 parent 71dc86b commit 037c8be

File tree

10 files changed

+131
-64
lines changed

10 files changed

+131
-64
lines changed

model-client/src/commonMain/kotlin/org/modelix/model/client2/IModelClientV2.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.modelix.model.client2
22

33
import org.modelix.kotlin.utils.DeprecationInfo
44
import org.modelix.model.IVersion
5+
import org.modelix.model.ObjectDeltaFilter
56
import org.modelix.model.api.IIdGenerator
67
import org.modelix.model.api.INode
78
import org.modelix.model.lazy.BranchReference
@@ -55,7 +56,11 @@ interface IModelClientV2 {
5556
*/
5657
suspend fun push(branch: BranchReference, version: IVersion, baseVersion: IVersion?): IVersion
5758

58-
suspend fun pull(branch: BranchReference, lastKnownVersion: IVersion?): IVersion
59+
suspend fun pull(
60+
branch: BranchReference,
61+
lastKnownVersion: IVersion?,
62+
filter: ObjectDeltaFilter = ObjectDeltaFilter(),
63+
): IVersion
5964

6065
suspend fun pullIfExists(branch: BranchReference): IVersion?
6166

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import kotlinx.coroutines.flow.flow
3838
import kotlinx.coroutines.launch
3939
import org.modelix.kotlin.utils.DeprecationInfo
4040
import org.modelix.model.IVersion
41+
import org.modelix.model.ObjectDeltaFilter
4142
import org.modelix.model.api.IBranch
4243
import org.modelix.model.api.IIdGenerator
4344
import org.modelix.model.api.INode
@@ -371,7 +372,7 @@ class ModelClientV2(
371372
return chunkEntries
372373
}
373374

374-
override suspend fun pull(branch: BranchReference, lastKnownVersion: IVersion?): IVersion {
375+
override suspend fun pull(branch: BranchReference, lastKnownVersion: IVersion?, filter: ObjectDeltaFilter): IVersion {
375376
require(lastKnownVersion is CLVersion?)
376377
return httpClient.prepareGet {
377378
url {
@@ -380,6 +381,9 @@ class ModelClientV2(
380381
if (lastKnownVersion != null) {
381382
parameters["lastKnown"] = lastKnownVersion.getContentHash()
382383
}
384+
if (filter != ObjectDeltaFilter()) {
385+
parameters["filter"] = filter.toJson()
386+
}
383387
}
384388
useVersionStreamFormat()
385389
}.execute { response ->

model-datastructure/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
plugins {
22
`maven-publish`
33
`modelix-kotlin-multiplatform`
4+
alias(libs.plugins.kotlin.serialization)
45
}
56

67
kotlin {
@@ -12,6 +13,7 @@ kotlin {
1213
implementation(libs.kotlin.coroutines.core)
1314
implementation(libs.kotlin.logging)
1415
implementation(libs.kotlin.datetime)
16+
implementation(libs.kotlin.serialization.json)
1517
}
1618
}
1719
val commonTest by getting {
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.modelix.model
2+
3+
import kotlinx.serialization.Serializable
4+
import kotlinx.serialization.json.Json
5+
6+
@Serializable
7+
data class ObjectDeltaFilter(
8+
/**
9+
* Hashes of version objects that are already available and should be excluded.
10+
* The traversal stops at these hashes and all transitively referenced objects are also excluded.
11+
*/
12+
val knownVersions: Set<String> = emptySet(),
13+
14+
/**
15+
* Objects required to access the operations of a version. They are usually only required for merging versions.
16+
* If a client doesn't do local merges, but let the server do the merge, the operations are never accessed and can
17+
* be excluded.
18+
*/
19+
val includeOperations: Boolean = true,
20+
21+
/**
22+
* If false, then the requested version is the only included version object.
23+
*/
24+
val includeHistory: Boolean = true,
25+
26+
/**
27+
* If false, then only the version objects are included, but not their tree data.
28+
*/
29+
val includeTrees: Boolean = true,
30+
) {
31+
constructor(baseVersion: String?) : this(knownVersions = setOfNotNull(baseVersion))
32+
33+
fun toJson() = Json.encodeToString(this)
34+
35+
companion object {
36+
fun fromJson(serializedJson: String): ObjectDeltaFilter = Json.decodeFromString(serializedJson)
37+
}
38+
}

model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLVersion.kt

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import kotlinx.datetime.Instant
55
import kotlinx.datetime.TimeZone
66
import kotlinx.datetime.toInstant
77
import org.modelix.model.IVersion
8+
import org.modelix.model.ObjectDeltaFilter
89
import org.modelix.model.VersionMerger
910
import org.modelix.model.api.IIdGenerator
1011
import org.modelix.model.api.INodeReference
@@ -26,7 +27,6 @@ import org.modelix.model.persistent.OperationsList
2627
import org.modelix.model.persistent.getAllObjects
2728
import org.modelix.streams.IStream
2829
import org.modelix.streams.flatten
29-
import org.modelix.streams.notNull
3030
import org.modelix.streams.plus
3131
import kotlin.jvm.JvmName
3232

@@ -303,52 +303,51 @@ class CLVersion : IVersion {
303303
}
304304
}
305305

306-
fun getParents(stopAt: CLVersion?): List<CLVersion> {
307-
if (stopAt != null && this.getContentHash() == stopAt.getContentHash()) {
308-
return emptyList()
309-
}
310-
val ancestors = if (isMerge()) {
311-
listOf(getMergedVersion1()!!, getMergedVersion2()!!)
312-
} else {
313-
listOfNotNull(baseVersion)
314-
}
315-
return ancestors.filter { stopAt == null || it.getContentHash() != stopAt.getContentHash() }
316-
}
317-
318-
fun collectAncestors(stopAt: CLVersion?, result: MutableMap<String, CLVersion>) {
319-
if (stopAt != null && this.getContentHash() == stopAt.getContentHash()) return
306+
fun collectAncestors(stopAt: Set<String>, result: MutableMap<String, CLVersion>) {
307+
if (stopAt.contains(this.getContentHash())) return
320308
if (result.contains(getContentHash())) return
321309
result[getContentHash()] = this
322-
for (parent in getParents(stopAt)) {
310+
for (parent in listOfNotNull(baseVersion, getMergedVersion1(), getMergedVersion2())) {
323311
parent.collectAncestors(stopAt, result)
324312
}
325313
}
326314
}
327315

328316
fun CLVersion.fullDiff(baseVersion: CLVersion?): IStream.Many<IKVValue> {
329-
val history = historyDiff(baseVersion)
330-
return history.plus(
331-
history.flatMap { version ->
317+
return diff(ObjectDeltaFilter(knownVersions = setOfNotNull(baseVersion?.getContentHash())))
318+
}
319+
320+
fun CLVersion.diff(filter: ObjectDeltaFilter): IStream.Many<IKVValue> {
321+
val history = historyDiff(filter)
322+
return history.flatMap { version ->
323+
var result: IStream.Many<IKVValue> = IStream.of(version)
324+
if (filter.includeTrees) {
332325
val baseVersion = version.baseVersion?.getValue(asyncStore) ?: IStream.of(null)
333326
val currentVersion = version.treeHash.getValue(asyncStore)
334-
val treeDiff = currentVersion.zipWith(baseVersion) { v, b ->
327+
result += currentVersion.zipWith(baseVersion) { v, b ->
335328
if (b == null) v.getAllObjects(asyncStore) else v.objectDiff(b, asyncStore)
336329
}.flatten()
337-
if (version.operationsHash != null) {
338-
val operations = version.operationsHash.getValue(asyncStore).flatMap { it.getAllObjects(asyncStore) }
339-
treeDiff.plus(operations)
340-
} else {
341-
treeDiff
342-
}
343-
},
344-
)
330+
}
331+
if (filter.includeOperations && version.operationsHash != null) {
332+
result += version.operationsHash.getValue(asyncStore).flatMap { it.getAllObjects(asyncStore) }
333+
}
334+
result
335+
}
345336
}
346337

347-
fun CLVersion.historyDiff(baseVersion: CLVersion?): IStream.Many<CPVersion> {
348-
val commonBase = VersionMerger.commonBaseVersion(this, baseVersion)
349-
val history = LinkedHashMap<String, CLVersion>()
350-
collectAncestors(commonBase, history)
351-
return IStream.many(history.values.map { it.data })
338+
fun CLVersion.historyDiff(filter: ObjectDeltaFilter): IStream.Many<CPVersion> {
339+
if (filter.includeHistory) {
340+
val knownVersions = asyncStore.getStreamExecutor().query {
341+
IStream.many(filter.knownVersions).flatMap { CLVersion.tryLoadFromHash(it, asyncStore) }.toList()
342+
}
343+
val commonBases = knownVersions.mapNotNull { VersionMerger.commonBaseVersion(this, it) }
344+
.map { it.getContentHash() }.toSet()
345+
val history = LinkedHashMap<String, CLVersion>()
346+
collectAncestors(stopAt = commonBases, result = history)
347+
return IStream.many(history.values.map { it.data })
348+
} else {
349+
return IStream.of(this.data)
350+
}
352351
}
353352

354353
@Suppress("UNCHECKED_CAST")

model-server-openapi/specifications/model-server-v2.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ paths:
145145
required: false
146146
schema:
147147
type: string
148+
- name: filter
149+
in: "query"
150+
required: false
151+
schema:
152+
type: string
148153
- name: repository
149154
in: "path"
150155
required: true

model-server/src/main/kotlin/org/modelix/model/server/handlers/IRepositoriesManager.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.modelix.model.server.handlers
22

3+
import org.modelix.model.ObjectDeltaFilter
34
import org.modelix.model.async.IAsyncObjectStore
45
import org.modelix.model.lazy.BranchReference
56
import org.modelix.model.lazy.CLVersion
@@ -52,7 +53,9 @@ interface IRepositoriesManager {
5253

5354
@RequiresTransaction
5455
fun mergeChanges(branch: BranchReference, newVersionHash: String): String
55-
suspend fun computeDelta(repository: RepositoryId?, versionHash: String, baseVersionHash: String?): ObjectData
56+
suspend fun computeDelta(repository: RepositoryId?, versionHash: String, baseVersionHash: String?): ObjectData =
57+
computeDelta(repository, versionHash, ObjectDeltaFilter(knownVersions = setOfNotNull(baseVersionHash)))
58+
suspend fun computeDelta(repository: RepositoryId?, versionHash: String, filter: ObjectDeltaFilter): ObjectData
5659

5760
/**
5861
* The data of a repository is stored separately from other repositories, but that wasn't always the case.

model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.modelix.authorization.checkPermission
2727
import org.modelix.authorization.getUserName
2828
import org.modelix.authorization.hasPermission
2929
import org.modelix.authorization.requiresLogin
30+
import org.modelix.model.ObjectDeltaFilter
3031
import org.modelix.model.api.IBranch
3132
import org.modelix.model.api.PBranch
3233
import org.modelix.model.api.TreeAsBranch
@@ -110,6 +111,7 @@ class ModelReplicationServer(
110111
repository: String,
111112
branch: String,
112113
lastKnown: String?,
114+
filter: String?,
113115
) {
114116
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
115117
val branchRef = repositoryId(repository).getBranchReference(branch)
@@ -118,13 +120,20 @@ class ModelReplicationServer(
118120
val versionHash = runRead {
119121
repositoriesManager.getVersionHash(branchRef) ?: throw BranchNotFoundException(branchRef)
120122
}
121-
call.respondDelta(RepositoryId(repository), versionHash, lastKnown)
123+
call.respondDelta(RepositoryId(repository), versionHash, parseFilter(filter, lastKnown))
124+
}
125+
126+
private fun parseFilter(filterAsJson: String?, lastKnown: String?): ObjectDeltaFilter {
127+
return (filterAsJson?.let { ObjectDeltaFilter.fromJson(it) } ?: ObjectDeltaFilter()).let {
128+
if (lastKnown == null) it else it.copy(knownVersions = it.knownVersions + lastKnown)
129+
}
122130
}
123131

124132
override suspend fun RoutingContext.getRepositoryBranchV1(
125133
repository: String,
126134
branch: String,
127135
lastKnown: String?,
136+
filter: String?,
128137
) {
129138
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
130139
val branchRef = repositoryId(repository).getBranchReference(branch)
@@ -189,7 +198,7 @@ class ModelReplicationServer(
189198
legacyGlobalStorage ?: false,
190199
)
191200
}
192-
call.respondDelta(RepositoryId(repository), initialVersion.getContentHash(), null)
201+
call.respondDelta(RepositoryId(repository), initialVersion.getContentHash(), ObjectDeltaFilter())
193202
}
194203

195204
override suspend fun RoutingContext.deleteRepository(repository: String) {
@@ -223,7 +232,7 @@ class ModelReplicationServer(
223232
val mergedHash = runWrite {
224233
repositoriesManager.mergeChanges(branchRef, deltaFromClient.versionHash)
225234
}
226-
call.respondDelta(RepositoryId(repository), mergedHash, deltaFromClient.versionHash)
235+
call.respondDelta(RepositoryId(repository), mergedHash, ObjectDeltaFilter(deltaFromClient.versionHash))
227236
}
228237

229238
override suspend fun RoutingContext.pollRepositoryBranch(
@@ -234,7 +243,7 @@ class ModelReplicationServer(
234243
checkPermission(ModelServerPermissionSchema.repository(repository).branch(branch).pull)
235244
val branchRef = repositoryId(repository).getBranchReference(branch)
236245
val newVersionHash = repositoriesManager.pollVersionHash(branchRef, lastKnown)
237-
call.respondDelta(RepositoryId(repository), newVersionHash, lastKnown)
246+
call.respondDelta(RepositoryId(repository), newVersionHash, ObjectDeltaFilter(lastKnown))
238247
}
239248

240249
override suspend fun RoutingContext.postRepositoryObjectsGetAll(repository: String) {
@@ -282,7 +291,7 @@ class ModelReplicationServer(
282291
if (repositoriesManager.getVersion(repositoryId(repository), versionHash) == null) {
283292
throw VersionNotFoundException(versionHash)
284293
}
285-
call.respondDelta(RepositoryId(repository), versionHash, lastKnown)
294+
call.respondDelta(RepositoryId(repository), versionHash, ObjectDeltaFilter(lastKnown))
286295
}
287296

288297
override suspend fun RoutingContext.postRepositoryBranchQuery(
@@ -384,40 +393,40 @@ class ModelReplicationServer(
384393
if (runRead { stores.getGlobalStoreClient()[versionHash] } == null) {
385394
throw VersionNotFoundException(versionHash)
386395
}
387-
call.respondDelta(null, versionHash, lastKnown)
396+
call.respondDelta(null, versionHash, ObjectDeltaFilter(lastKnown))
388397
}
389398

390-
private suspend fun ApplicationCall.respondDelta(repositoryId: RepositoryId?, versionHash: String, baseVersionHash: String?) {
399+
private suspend fun ApplicationCall.respondDelta(repositoryId: RepositoryId?, versionHash: String, filter: ObjectDeltaFilter) {
391400
val expectedTypes = request.acceptItems().map { ContentType.parse(it.value) }
392401
return if (expectedTypes.any { it.match(VersionDeltaStreamV2.CONTENT_TYPE) }) {
393-
respondDeltaAsObjectStreamV2(repositoryId, versionHash, baseVersionHash)
402+
respondDeltaAsObjectStreamV2(repositoryId, versionHash, filter)
394403
} else if (expectedTypes.any { it.match(VersionDeltaStream.CONTENT_TYPE) }) {
395-
respondDeltaAsObjectStreamV1(repositoryId, versionHash, baseVersionHash, false)
404+
respondDeltaAsObjectStreamV1(repositoryId, versionHash, filter, false)
396405
} else if (expectedTypes.any { it.match(ContentType.Application.Json) }) {
397-
respondDeltaAsJson(repositoryId, versionHash, baseVersionHash)
406+
respondDeltaAsJson(repositoryId, versionHash, filter)
398407
} else {
399-
respondDeltaAsObjectStreamV1(repositoryId, versionHash, baseVersionHash, true)
408+
respondDeltaAsObjectStreamV1(repositoryId, versionHash, filter, true)
400409
}
401410
}
402411

403-
private suspend fun ApplicationCall.respondDeltaAsJson(repositoryId: RepositoryId?, versionHash: String, baseVersionHash: String?) {
412+
private suspend fun ApplicationCall.respondDeltaAsJson(repositoryId: RepositoryId?, versionHash: String, filter: ObjectDeltaFilter) {
404413
val delta = VersionDelta(
405414
versionHash,
406-
baseVersionHash,
407-
objectsMap = repositoriesManager.computeDelta(repositoryId, versionHash, baseVersionHash).asMap(),
415+
filter.knownVersions.firstOrNull(),
416+
objectsMap = repositoriesManager.computeDelta(repositoryId, versionHash, filter).asMap(),
408417
)
409418
respond(delta)
410419
}
411420

412421
private suspend fun ApplicationCall.respondDeltaAsObjectStreamV1(
413422
repositoryId: RepositoryId?,
414423
versionHash: String,
415-
baseVersionHash: String?,
424+
filter: ObjectDeltaFilter,
416425
plainText: Boolean,
417426
) {
418427
// Call `computeDelta` before starting to respond.
419428
// It could already throw an exception, and in that case we do not want a successful response status.
420-
val objectData = repositoriesManager.computeDelta(repositoryId, versionHash, baseVersionHash)
429+
val objectData = repositoriesManager.computeDelta(repositoryId, versionHash, filter)
421430
val contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE
422431
respondBytesWriter(contentType) {
423432
this.useClosingWithoutCause {
@@ -434,8 +443,8 @@ class ModelReplicationServer(
434443
}
435444
}
436445

437-
private suspend fun ApplicationCall.respondDeltaAsObjectStreamV2(repositoryId: RepositoryId?, versionHash: String, baseVersionHash: String?) {
438-
val objectData = repositoriesManager.computeDelta(repositoryId, versionHash, baseVersionHash)
446+
private suspend fun ApplicationCall.respondDeltaAsObjectStreamV2(repositoryId: RepositoryId?, versionHash: String, filter: ObjectDeltaFilter) {
447+
val objectData = repositoriesManager.computeDelta(repositoryId, versionHash, filter)
439448
respondBytesWriter(VersionDeltaStreamV2.CONTENT_TYPE) {
440449
this.useClosingWithoutCause {
441450
VersionDeltaStreamV2.encodeVersionDeltaStreamV2(this, versionHash, objectData.asStream())

0 commit comments

Comments
 (0)