|
| 1 | +/* |
| 2 | + * Copyright (c) 2024. |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +package org.modelix.model.sync.bulk |
| 18 | + |
| 19 | +import mu.KotlinLogging |
| 20 | +import org.modelix.model.api.ConceptReference |
| 21 | +import org.modelix.model.api.INode |
| 22 | +import org.modelix.model.api.INodeReference |
| 23 | +import org.modelix.model.api.IReferenceLink |
| 24 | +import org.modelix.model.api.IReplaceableNode |
| 25 | +import org.modelix.model.api.IRole |
| 26 | +import org.modelix.model.api.isChildRoleOrdered |
| 27 | +import org.modelix.model.api.remove |
| 28 | +import org.modelix.model.data.NodeData |
| 29 | + |
| 30 | +/** |
| 31 | + * Similar to [ModelImporter], but the input is two [INode] instances instead of [INode] and [NodeData]. |
| 32 | + * |
| 33 | + * Changes to the behaviour of this class should also reflected in [ModelImporter]. |
| 34 | + * |
| 35 | + * @param filter determines which nodes need synchronization. |
| 36 | + * Nodes that do not match the filter are skipped and will remain unchanged. |
| 37 | + * @param sourceRoot root of the tree containing the expected nodes |
| 38 | + * @param targetRoot root of the tree that needs changes |
| 39 | + * @param nodeAssociation mapping between source and target nodes, that is used for internal optimizations |
| 40 | + */ |
| 41 | +class ModelSynchronizer( |
| 42 | + val filter: IFilter, |
| 43 | + val sourceRoot: INode, |
| 44 | + val targetRoot: INode, |
| 45 | + val nodeAssociation: INodeAssociation, |
| 46 | +) { |
| 47 | + private val nodesToRemove: MutableSet<INode> = HashSet() |
| 48 | + private val pendingReferences: MutableList<PendingReference> = ArrayList() |
| 49 | + private val logger = KotlinLogging.logger {} |
| 50 | + |
| 51 | + fun synchronize() { |
| 52 | + logger.info { "Synchronizing nodes..." } |
| 53 | + synchronizeNode(sourceRoot, targetRoot) |
| 54 | + logger.info { "Synchronizing pending references..." } |
| 55 | + pendingReferences.forEach { it.trySyncReference() } |
| 56 | + logger.info { "Removing extra nodes..." } |
| 57 | + nodesToRemove.filter { it.isValid }.forEach { it.remove() } |
| 58 | + logger.info { "Synchronization finished." } |
| 59 | + } |
| 60 | + |
| 61 | + private fun synchronizeNode(sourceNode: INode, targetNode: INode) { |
| 62 | + nodeAssociation.associate(sourceNode, targetNode) |
| 63 | + if (filter.needsSynchronization(sourceNode)) { |
| 64 | + logger.info { "Synchronizing changed node. sourceNode = $sourceNode" } |
| 65 | + synchronizeProperties(sourceNode, targetNode) |
| 66 | + synchronizeReferences(sourceNode, targetNode) |
| 67 | + |
| 68 | + val sourceConcept = sourceNode.getConceptReference() |
| 69 | + val targetConcept = targetNode.getConceptReference() |
| 70 | + |
| 71 | + val conceptCorrectedTargetNode = if (sourceConcept != targetConcept && targetNode is IReplaceableNode) { |
| 72 | + targetNode.replaceNode(sourceConcept?.getUID()?.let { ConceptReference(it) }) |
| 73 | + } else { |
| 74 | + targetNode |
| 75 | + } |
| 76 | + |
| 77 | + syncChildren(sourceNode, conceptCorrectedTargetNode) |
| 78 | + } else if (filter.needsDescentIntoSubtree(sourceNode)) { |
| 79 | + for (sourceChild in sourceNode.allChildren) { |
| 80 | + val targetChild = nodeAssociation.resolveTarget(sourceChild) ?: error("Expected target node was not found. sourceChild=$sourceChild") |
| 81 | + synchronizeNode(sourceChild, targetChild) |
| 82 | + } |
| 83 | + } else { |
| 84 | + logger.info { "Skipping subtree due to filter. root = $sourceNode" } |
| 85 | + } |
| 86 | + } |
| 87 | + |
| 88 | + private fun synchronizeReferences( |
| 89 | + sourceNode: INode, |
| 90 | + targetNode: INode, |
| 91 | + ) { |
| 92 | + iterateMergedRoles(sourceNode.getReferenceLinks(), targetNode.getReferenceLinks()) { role -> |
| 93 | + val pendingReference = PendingReference(sourceNode, targetNode, role) |
| 94 | + |
| 95 | + // If the reference target already exist we can synchronize it immediately and save memory between the |
| 96 | + // two synchronization phases. |
| 97 | + if (!pendingReference.trySyncReference()) { |
| 98 | + pendingReferences += pendingReference |
| 99 | + } |
| 100 | + } |
| 101 | + } |
| 102 | + |
| 103 | + private fun synchronizeProperties( |
| 104 | + sourceNode: INode, |
| 105 | + targetNode: INode, |
| 106 | + ) { |
| 107 | + iterateMergedRoles(sourceNode.getPropertyLinks(), targetNode.getPropertyLinks()) { role -> |
| 108 | + val oldValue = targetNode.getPropertyValue(role.preferTarget()) |
| 109 | + val newValue = sourceNode.getPropertyValue(role.preferSource()) |
| 110 | + if (oldValue != newValue) { |
| 111 | + targetNode.setPropertyValue(role.preferTarget(), newValue) |
| 112 | + } |
| 113 | + } |
| 114 | + } |
| 115 | + |
| 116 | + private fun syncChildren(sourceParent: INode, targetParent: INode) { |
| 117 | + val allRoles = (sourceParent.allChildren.map { it.roleInParent } + targetParent.allChildren.map { it.roleInParent }).distinct() |
| 118 | + for (role in allRoles) { |
| 119 | + val sourceNodes = sourceParent.getChildren(role).toList() |
| 120 | + val targetNodes = targetParent.getChildren(role).toList() |
| 121 | + |
| 122 | + val allExpectedNodesDoNotExist by lazy { |
| 123 | + sourceNodes.all { sourceNode -> |
| 124 | + val originalId = sourceNode.originalId() |
| 125 | + checkNotNull(originalId) { "Specified node '$sourceNode' has no ID." } |
| 126 | + nodeAssociation.resolveTarget(sourceNode) == null |
| 127 | + } |
| 128 | + } |
| 129 | + |
| 130 | + // optimization that uses the bulk operation .addNewChildren |
| 131 | + if (targetNodes.isEmpty() && allExpectedNodesDoNotExist) { |
| 132 | + targetParent.addNewChildren(role, -1, sourceNodes.map { it.getConceptReference() }) |
| 133 | + .zip(sourceNodes) |
| 134 | + .forEach { (newChild, sourceChild) -> |
| 135 | + val expectedId = sourceChild.originalId() |
| 136 | + checkNotNull(expectedId) { "Specified node '$sourceChild' has no ID." } |
| 137 | + nodeAssociation.associate(sourceChild, newChild) |
| 138 | + synchronizeNode(sourceChild, newChild) |
| 139 | + } |
| 140 | + continue |
| 141 | + } |
| 142 | + |
| 143 | + // optimization for when there is no change in the child list |
| 144 | + // size check first to avoid querying the original ID |
| 145 | + if (sourceNodes.size == targetNodes.size && sourceNodes.map { it.originalId() } == targetNodes.map { it.originalId() }) { |
| 146 | + sourceNodes.zip(targetNodes).forEach { synchronizeNode(it.first, it.second) } |
| 147 | + continue |
| 148 | + } |
| 149 | + |
| 150 | + val isOrdered = targetParent.isChildRoleOrdered(role) |
| 151 | + |
| 152 | + val newlyCreatedIds = mutableSetOf<String>() |
| 153 | + |
| 154 | + sourceNodes.forEachIndexed { indexInImport, expected -> |
| 155 | + val existingChildren = targetParent.getChildren(role).toList() |
| 156 | + val expectedId = checkNotNull(expected.originalId()) { "Specified node '$expected' has no id" } |
| 157 | + // newIndex is the index on which to import the expected child. |
| 158 | + // It might be -1 if the child does not exist and should be added at the end. |
| 159 | + val newIndex = if (isOrdered) { |
| 160 | + indexInImport |
| 161 | + } else { |
| 162 | + // The `existingChildren` are only searched once for the expected element before changing. |
| 163 | + // Therefore, indexing existing children will not be more efficient than iterating once. |
| 164 | + // (For the moment, this is fine because as we expect unordered children to be the exception, |
| 165 | + // Reusable indexing would be possible if we switch from |
| 166 | + // a depth-first import to a breadth-first import.) |
| 167 | + existingChildren |
| 168 | + .indexOfFirst { existingChild -> existingChild.originalId() == expected.originalId() } |
| 169 | + } |
| 170 | + // existingChildren.getOrNull handles `-1` as needed by returning `null`. |
| 171 | + val nodeAtIndex = existingChildren.getOrNull(newIndex) |
| 172 | + val expectedConcept = expected.getConceptReference() |
| 173 | + val childNode = if (nodeAtIndex?.originalId() != expectedId) { |
| 174 | + val existingNode = nodeAssociation.resolveTarget(expected) |
| 175 | + if (existingNode == null) { |
| 176 | + val newChild = targetParent.addNewChild(role, newIndex, expectedConcept) |
| 177 | + if (newChild.originalId() == null) { |
| 178 | + newChild.setPropertyValue(NodeData.idPropertyKey, expectedId) |
| 179 | + } |
| 180 | + newChild.originalId()?.let { newlyCreatedIds.add(it) } |
| 181 | + nodeAssociation.associate(expected, newChild) |
| 182 | + newChild |
| 183 | + } else { |
| 184 | + // The existing child node is not only moved to a new index, |
| 185 | + // it is potentially moved to a new parent and role. |
| 186 | + targetParent.moveChild(role, newIndex, existingNode) |
| 187 | + // If the old parent and old role synchronized before the move operation, |
| 188 | + // the existing child node would have been marked as to be deleted. |
| 189 | + // Now that it is used, it should not be deleted. |
| 190 | + nodesToRemove.remove(existingNode) |
| 191 | + existingNode |
| 192 | + } |
| 193 | + } else { |
| 194 | + nodeAtIndex |
| 195 | + } |
| 196 | + |
| 197 | + synchronizeNode(expected, childNode) |
| 198 | + } |
| 199 | + |
| 200 | + val expectedNodesIds = sourceNodes.map { it.originalId() }.toSet() |
| 201 | + // Do not use existingNodes, but call node.getChildren(role) because |
| 202 | + // the recursive synchronization in the meantime already removed some nodes from node.getChildren(role). |
| 203 | + nodesToRemove += targetParent.getChildren(role).filterNot { existingNode -> |
| 204 | + val id = existingNode.originalId() |
| 205 | + expectedNodesIds.contains(id) || newlyCreatedIds.contains(id) |
| 206 | + } |
| 207 | + } |
| 208 | + } |
| 209 | + |
| 210 | + inner class PendingReference(val sourceNode: INode, val targetNode: INode, val role: MergedRole<IReferenceLink>) { |
| 211 | + fun trySyncReference(): Boolean { |
| 212 | + val expectedRef = sourceNode.getReferenceTargetRef(role.preferSource()) |
| 213 | + if (expectedRef == null) { |
| 214 | + targetNode.setReferenceTarget(role.preferTarget(), null as INodeReference?) |
| 215 | + return true |
| 216 | + } |
| 217 | + val actualRef = targetNode.getReferenceTargetRef(role.preferTarget()) |
| 218 | + |
| 219 | + // Some reference targets may be excluded from the sync, |
| 220 | + // in that case a serialized reference is stored and no lookup of the target is required. |
| 221 | + if (actualRef?.serialize() == expectedRef.serialize()) { |
| 222 | + // already up to date |
| 223 | + return true |
| 224 | + } |
| 225 | + |
| 226 | + val referenceTargetInSource = sourceNode.getReferenceTarget(role.preferSource()) |
| 227 | + checkNotNull(referenceTargetInSource) { "Failed to resolve $expectedRef referenced by $sourceNode.${role.preferSource()}" } |
| 228 | + |
| 229 | + val referenceTargetInTarget = nodeAssociation.resolveTarget(referenceTargetInSource) |
| 230 | + ?: return false // Target cannot be resolved right now but might become resolvable later. |
| 231 | + |
| 232 | + if (referenceTargetInTarget.reference.serialize() != actualRef?.serialize()) { |
| 233 | + targetNode.setReferenceTarget(role.preferTarget(), referenceTargetInTarget) |
| 234 | + } |
| 235 | + return true |
| 236 | + } |
| 237 | + } |
| 238 | + |
| 239 | + private fun <T : IRole> iterateMergedRoles( |
| 240 | + sourceRoles: Iterable<T>, |
| 241 | + targetRoles: Iterable<T>, |
| 242 | + body: (role: MergedRole<T>) -> Unit, |
| 243 | + ) = iterateMergedRoles(sourceRoles.asSequence(), targetRoles.asSequence(), body) |
| 244 | + |
| 245 | + private fun <T : IRole> iterateMergedRoles( |
| 246 | + sourceRoles: Sequence<T>, |
| 247 | + targetRoles: Sequence<T>, |
| 248 | + body: (role: MergedRole<T>) -> Unit, |
| 249 | + ) { |
| 250 | + val sourceRolesMap = sourceRoles.filter { it.getUID() != NodeData.ID_PROPERTY_KEY }.associateBy { it.getUID() } |
| 251 | + val targetRolesMap = targetRoles.associateBy { it.getUID() } |
| 252 | + val roleUIDs = (sourceRolesMap.keys + targetRolesMap.keys).toSet() |
| 253 | + for (roleUID in roleUIDs) { |
| 254 | + val sourceRole = sourceRolesMap[roleUID] |
| 255 | + val targetRole = targetRolesMap[roleUID] |
| 256 | + body(MergedRole(sourceRole, targetRole)) |
| 257 | + } |
| 258 | + } |
| 259 | + |
| 260 | + class MergedRole<E : IRole>( |
| 261 | + private val source: E?, |
| 262 | + private val target: E?, |
| 263 | + ) { |
| 264 | + fun preferTarget(): E = (target ?: source)!! |
| 265 | + fun preferSource() = (source ?: target)!! |
| 266 | + } |
| 267 | + |
| 268 | + /** |
| 269 | + * Determines, which nodes need synchronization and which can be skipped. |
| 270 | + * |
| 271 | + * It is valid for [needsDescentIntoSubtree] and [needsSynchronization] to return true for the same node. |
| 272 | + */ |
| 273 | + interface IFilter { |
| 274 | + /** |
| 275 | + * Checks if a subtree needs synchronization. |
| 276 | + * |
| 277 | + * @param subtreeRoot root of the subtree to be checked |
| 278 | + * @return true iff the subtree must not be skipped |
| 279 | + */ |
| 280 | + fun needsDescentIntoSubtree(subtreeRoot: INode): Boolean |
| 281 | + |
| 282 | + /** |
| 283 | + * Checks if a single node needs synchronization. |
| 284 | + * |
| 285 | + * @param node node to be checked |
| 286 | + * @return true iff the node must not be skipped |
| 287 | + */ |
| 288 | + fun needsSynchronization(node: INode): Boolean |
| 289 | + } |
| 290 | +} |
0 commit comments