|
| 1 | +""" |
| 2 | +Copyright (c) 2021, Oracle and/or its affiliates. |
| 3 | +Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. |
| 4 | +""" |
| 5 | + |
| 6 | +from oracle.weblogic.deploy.util import PyOrderedDict |
| 7 | + |
| 8 | +from wlsdeploy.aliases import alias_utils |
| 9 | +from wlsdeploy.aliases.alias_constants import ALIAS_LIST_TYPES |
| 10 | +from wlsdeploy.aliases.alias_constants import PROPERTIES |
| 11 | +from wlsdeploy.aliases.location_context import LocationContext |
| 12 | +from wlsdeploy.aliases.model_constants import APPLICATION |
| 13 | +from wlsdeploy.aliases.model_constants import KUBERNETES |
| 14 | +from wlsdeploy.aliases.model_constants import LIBRARY |
| 15 | +from wlsdeploy.aliases.model_constants import SOURCE_PATH |
| 16 | +from wlsdeploy.exception import exception_helper |
| 17 | +from wlsdeploy.json.json_translator import COMMENT_MATCH |
| 18 | +from wlsdeploy.logging.platform_logger import PlatformLogger |
| 19 | +from wlsdeploy.util import dictionary_utils |
| 20 | +from wlsdeploy.util import model_helper |
| 21 | + |
| 22 | + |
| 23 | +class ModelComparer(object): |
| 24 | + """ |
| 25 | + Class for comparing two WDT models. |
| 26 | + """ |
| 27 | + _class_name = "ModelComparer" |
| 28 | + _logger = PlatformLogger('wlsdeploy.compare_model') |
| 29 | + |
| 30 | + SOURCE_PATH_FOLDERS = [APPLICATION, LIBRARY] |
| 31 | + |
| 32 | + def __init__(self, current_model_dict, past_model_dict, aliases, messages): |
| 33 | + """ |
| 34 | + :param current_model_dict: the new dictionary being compared |
| 35 | + :param past_model_dict: the old dictionary being compared |
| 36 | + :param aliases: a reference to an Aliases class instance |
| 37 | + :param messages: a Set to be updated with messages |
| 38 | + """ |
| 39 | + self._current_model_dict = current_model_dict |
| 40 | + self._past_model_dict = past_model_dict |
| 41 | + self._aliases = aliases |
| 42 | + self._messages = messages |
| 43 | + |
| 44 | + def compare_models(self): |
| 45 | + """ |
| 46 | + Compare the current an past models from the top level. |
| 47 | + :return: a dictionary of differences between these models |
| 48 | + """ |
| 49 | + self._messages.clear() |
| 50 | + location = None |
| 51 | + change_model_dict = self._compare_folders(self._current_model_dict, self._past_model_dict, location, location) |
| 52 | + return change_model_dict |
| 53 | + |
| 54 | + def _compare_folders(self, current_folder, past_folder, location, attributes_location): |
| 55 | + """ |
| 56 | + Compare folders after determining if the folder has named sub-folders. |
| 57 | + :param current_folder: a folder in the current model |
| 58 | + :param past_folder: corresponding folder in the past model |
| 59 | + :param location: the location for the specified folders |
| 60 | + :param attributes_location: the attribute location for the specified folders |
| 61 | + :return: a dictionary of differences between these folders |
| 62 | + """ |
| 63 | + _method_name = '_compare_folders' |
| 64 | + |
| 65 | + # determine if the specified location has named folders, such as topology/Server |
| 66 | + has_named_folders = False |
| 67 | + if (location is not None) and not self._aliases.is_artificial_type_folder(location): |
| 68 | + has_named_folders = self._aliases.supports_multiple_mbean_instances(location) or \ |
| 69 | + self._aliases.requires_artificial_type_subfolder_handling(location) |
| 70 | + |
| 71 | + if has_named_folders: |
| 72 | + return self._compare_named_folders(current_folder, past_folder, location, attributes_location) |
| 73 | + else: |
| 74 | + return self._compare_folder_contents(current_folder, past_folder, location, attributes_location) |
| 75 | + |
| 76 | + def _compare_named_folders(self, current_folder, past_folder, location, attributes_location): |
| 77 | + """ |
| 78 | + Compare current and past named folders using the specified locations. |
| 79 | + A named folder is a subfolder of a multiple-MBean folder, such as topology/Server/my-server |
| 80 | + :param current_folder: a folder in the current model |
| 81 | + :param past_folder: corresponding folder in the past model |
| 82 | + :param location: the location for the specified folders |
| 83 | + :param attributes_location: the attribute location for the specified folders |
| 84 | + :return: a dictionary of differences between these folders |
| 85 | + """ |
| 86 | + change_folder = PyOrderedDict() |
| 87 | + |
| 88 | + for name in current_folder: |
| 89 | + # check if name is present in both folders. |
| 90 | + # if found, compare the two folders recursively. |
| 91 | + if name in past_folder: |
| 92 | + next_current = current_folder[name] |
| 93 | + next_past = past_folder[name] |
| 94 | + location.add_name_token(self._aliases.get_name_token(location), name) |
| 95 | + attributes_location.add_name_token(self._aliases.get_name_token(attributes_location), name) |
| 96 | + changes = self._compare_folder_contents(next_current, next_past, location, attributes_location) |
| 97 | + if changes: |
| 98 | + change_folder[name] = changes |
| 99 | + |
| 100 | + # check for added names. |
| 101 | + # if found, add the entire folder contents. |
| 102 | + else: |
| 103 | + change_folder[name] = current_folder[name] |
| 104 | + pass |
| 105 | + |
| 106 | + # check for deleted names. |
| 107 | + # if name is not in the current folder, add its delete name. |
| 108 | + for name in past_folder: |
| 109 | + if name not in current_folder: |
| 110 | + delete_name = model_helper.get_delete_name(name) |
| 111 | + change_folder[delete_name] = PyOrderedDict() |
| 112 | + |
| 113 | + return change_folder |
| 114 | + |
| 115 | + def _compare_folder_contents(self, current_folder, past_folder, location, attributes_location): |
| 116 | + """ |
| 117 | + Compare the contents of current and past folders using the specified locations. |
| 118 | + :param current_folder: a folder in the current model |
| 119 | + :param past_folder: corresponding folder in the past model |
| 120 | + :param location: the location for the specified folders |
| 121 | + :param attributes_location: the attribute location for the specified folders |
| 122 | + :return: a dictionary of differences between these folders |
| 123 | + """ |
| 124 | + change_folder = PyOrderedDict() |
| 125 | + |
| 126 | + attribute_names = [] |
| 127 | + if attributes_location is not None: |
| 128 | + attribute_names = self._aliases.get_model_attribute_names(attributes_location) |
| 129 | + |
| 130 | + # check if keys in the current folder are present in the past folder |
| 131 | + for key in current_folder: |
| 132 | + if not self._check_key(key, location): |
| 133 | + continue |
| 134 | + |
| 135 | + if key in past_folder: |
| 136 | + current_value = current_folder[key] |
| 137 | + past_value = past_folder[key] |
| 138 | + |
| 139 | + if key in attribute_names: |
| 140 | + self._compare_attribute(current_value, past_value, attributes_location, key, change_folder) |
| 141 | + |
| 142 | + else: |
| 143 | + next_location, next_attributes_location = self._get_next_location(location, key) |
| 144 | + next_change = self._compare_folders(current_value, past_value, next_location, |
| 145 | + next_attributes_location) |
| 146 | + |
| 147 | + if next_change: |
| 148 | + change_folder[key] = next_change |
| 149 | + |
| 150 | + else: |
| 151 | + # key is present the current folder, not in the past folder. |
| 152 | + # just add to the change folder, no further recursion needed. |
| 153 | + change_folder[key] = current_folder[key] |
| 154 | + |
| 155 | + # check if keys in the past folder are not in the current folder |
| 156 | + for key in past_folder: |
| 157 | + if not self._check_key(key, location): |
| 158 | + continue |
| 159 | + |
| 160 | + if key not in current_folder: |
| 161 | + if key in attribute_names: |
| 162 | + # if an attribute was deleted, just add a message |
| 163 | + change_path = self._aliases.get_model_folder_path(location) + "/" + key |
| 164 | + self._messages.add(('WLSDPLY-05701', change_path)) |
| 165 | + |
| 166 | + else: |
| 167 | + # if a folder was deleted, keep recursing through the past model. |
| 168 | + # there may be named elements underneath that need to be deleted. |
| 169 | + current_value = PyOrderedDict() |
| 170 | + past_value = past_folder[key] |
| 171 | + next_location, next_attributes_location = self._get_next_location(location, key) |
| 172 | + next_change = self._compare_folders(current_value, past_value, next_location, |
| 173 | + next_attributes_location) |
| 174 | + |
| 175 | + if next_change: |
| 176 | + change_folder[key] = next_change |
| 177 | + |
| 178 | + self._finalize_folder(current_folder, past_folder, change_folder, location) |
| 179 | + return change_folder |
| 180 | + |
| 181 | + def _get_next_location(self, location, key): |
| 182 | + """ |
| 183 | + Get the next locations for the specified key and location. |
| 184 | + :param location: the current location (None indicates model root) |
| 185 | + :param key: the key of the next location |
| 186 | + :return: a tuple with the next location and the next attributes location |
| 187 | + """ |
| 188 | + if location is None: |
| 189 | + next_location = LocationContext() |
| 190 | + next_attributes_location = self._aliases.get_model_section_attribute_location(key) |
| 191 | + else: |
| 192 | + next_location = LocationContext(location) |
| 193 | + next_location.append_location(key) |
| 194 | + next_location.add_name_token(self._aliases.get_name_token(next_location), 'FOLDER') |
| 195 | + next_attributes_location = next_location |
| 196 | + |
| 197 | + return next_location, next_attributes_location |
| 198 | + |
| 199 | + def _compare_attribute(self, current_value, past_value, location, key, change_folder): |
| 200 | + """ |
| 201 | + Compare values of an attribute from the current and past folders. |
| 202 | + The change value and any comments will be added to the change folder. |
| 203 | + :param current_value: the value from the current model |
| 204 | + :param past_value: the value from the past model |
| 205 | + :param key: the key of the attribute |
| 206 | + :param change_folder: the folder in the change model to be updated |
| 207 | + :param location: the location for attributes in the specified folders |
| 208 | + """ |
| 209 | + if current_value != past_value: |
| 210 | + attribute_type = self._aliases.get_model_attribute_type(location, key) |
| 211 | + if attribute_type in ALIAS_LIST_TYPES: |
| 212 | + current_list = alias_utils.create_list(current_value, 'WLSDPLY-08001') |
| 213 | + previous_list = alias_utils.create_list(past_value, 'WLSDPLY-08000') |
| 214 | + |
| 215 | + change_list = list(previous_list) |
| 216 | + for item in current_list: |
| 217 | + if item in previous_list: |
| 218 | + change_list.remove(item) |
| 219 | + else: |
| 220 | + change_list.append(item) |
| 221 | + for item in previous_list: |
| 222 | + if item not in current_list: |
| 223 | + change_list.remove(item) |
| 224 | + change_list.append(model_helper.get_delete_name(item)) |
| 225 | + |
| 226 | + current_text = ','.join(current_list) |
| 227 | + previous_text = ','.join(previous_list) |
| 228 | + comment = key + ": '" + previous_text + "' -> '" + current_text + "'" |
| 229 | + _add_comment(comment, change_folder) |
| 230 | + change_folder[key] = ','.join(change_list) |
| 231 | + |
| 232 | + elif attribute_type == PROPERTIES: |
| 233 | + self._compare_properties(current_value, past_value, key, change_folder) |
| 234 | + |
| 235 | + else: |
| 236 | + if not isinstance(past_value, dict): |
| 237 | + comment = key + ": '" + str(past_value) + "'" |
| 238 | + _add_comment(comment, change_folder) |
| 239 | + change_folder[key] = current_value |
| 240 | + |
| 241 | + def _compare_properties(self, current_value, past_value, key, change_folder): |
| 242 | + """ |
| 243 | + Compare values of a properties attribute from the current and past folders. |
| 244 | + The change value and any comments will be added to the change folder. |
| 245 | + :param current_value: the value from the current model |
| 246 | + :param past_value: the value from the past model |
| 247 | + :param key: the key of the attribute |
| 248 | + :param change_folder: the folder in the change model to be updated |
| 249 | + """ |
| 250 | + property_dict = PyOrderedDict() |
| 251 | + for property_key in current_value: |
| 252 | + current_property_value = current_value[property_key] |
| 253 | + if property_key in past_value: |
| 254 | + past_property_value = past_value[property_key] |
| 255 | + if past_property_value != current_property_value: |
| 256 | + comment = property_key + ": '" + str(past_property_value) + "'" |
| 257 | + _add_comment(comment, property_dict) |
| 258 | + property_dict[property_key] = current_property_value |
| 259 | + else: |
| 260 | + property_dict[property_key] = current_property_value |
| 261 | + |
| 262 | + # property values don't support delete notation, |
| 263 | + # so any deleted keys in the current value will be ignored. |
| 264 | + |
| 265 | + if property_dict: |
| 266 | + change_folder[key] = property_dict |
| 267 | + |
| 268 | + def _check_key(self, key, location): |
| 269 | + """ |
| 270 | + Determine if the specified key and location will be compared. |
| 271 | + :param key: the key to be checked |
| 272 | + :param location: the location to be checked |
| 273 | + :return: True if the key and location will be compared, False otherwise |
| 274 | + """ |
| 275 | + _method_name = '_check_key' |
| 276 | + |
| 277 | + if (location is None) and (key == KUBERNETES): |
| 278 | + self._logger.info('WLSDPLY-05713', KUBERNETES, class_name=self._class_name, method_name=_method_name) |
| 279 | + return False |
| 280 | + return True |
| 281 | + |
| 282 | + def _finalize_folder(self, current_folder, past_folder, change_folder, location): |
| 283 | + """ |
| 284 | + Perform any adjustments after a folder has been evaluated. |
| 285 | + :param current_folder: folder in the current model |
| 286 | + :param past_folder: corresponding folder in the past model |
| 287 | + :param change_folder: the folder with the changed attributes and sub-folders |
| 288 | + :param location: the location for the specified folders |
| 289 | + """ |
| 290 | + _method_name = '_finalize_folder' |
| 291 | + |
| 292 | + folder_path = [] |
| 293 | + if location is not None: |
| 294 | + folder_path = location.get_model_folders() |
| 295 | + |
| 296 | + # Application and Library should include SourcePath if they have any other elements |
| 297 | + if (len(folder_path) == 1) and (folder_path[0] in self.SOURCE_PATH_FOLDERS): |
| 298 | + if change_folder and (SOURCE_PATH not in change_folder): |
| 299 | + # if SourcePath not present, past and current folder had matching values |
| 300 | + source_path = dictionary_utils.get_element(current_folder, SOURCE_PATH) |
| 301 | + if source_path is not None: |
| 302 | + comment = exception_helper.get_message('WLSDPLY-05714', SOURCE_PATH) |
| 303 | + _add_comment(comment, change_folder) |
| 304 | + change_folder[SOURCE_PATH] = source_path |
| 305 | + |
| 306 | + |
| 307 | +def _add_comment(comment, dictionary): |
| 308 | + """ |
| 309 | + Add a comment to the specified dictionary |
| 310 | + :param comment: the comment text |
| 311 | + :param dictionary: the dictionary to be appended |
| 312 | + """ |
| 313 | + # make comment key unique, key will not appear in output |
| 314 | + comment_key = COMMENT_MATCH + comment |
| 315 | + dictionary[comment_key] = comment |
0 commit comments