|
| 1 | +from abc import ABC, abstractmethod |
| 2 | +from dataclasses import dataclass |
| 3 | +from typing import Callable, List, Optional, TypeVar, Union |
| 4 | + |
| 5 | +from package_parser.processing.api.model import ( |
| 6 | + API, |
| 7 | + Attribute, |
| 8 | + Class, |
| 9 | + Function, |
| 10 | + Parameter, |
| 11 | + Result, |
| 12 | +) |
| 13 | + |
| 14 | +from ._differ import AbstractDiffer |
| 15 | + |
| 16 | +api_element = Union[Attribute, Class, Function, Parameter, Result] |
| 17 | +API_ELEMENTS = TypeVar("API_ELEMENTS", Attribute, Class, Function, Parameter, Result) |
| 18 | + |
| 19 | + |
| 20 | +@dataclass |
| 21 | +class Mapping(ABC): |
| 22 | + similarity: float |
| 23 | + |
| 24 | + @abstractmethod |
| 25 | + def get_apiv1_elements(self) -> list[api_element]: |
| 26 | + pass |
| 27 | + |
| 28 | + @abstractmethod |
| 29 | + def get_apiv2_elements(self) -> list[api_element]: |
| 30 | + pass |
| 31 | + |
| 32 | + def get_similarity(self) -> float: |
| 33 | + return self.similarity |
| 34 | + |
| 35 | + |
| 36 | +@dataclass |
| 37 | +class OneToOneMapping(Mapping): |
| 38 | + apiv1_element: api_element |
| 39 | + apiv2_element: api_element |
| 40 | + |
| 41 | + def get_apiv1_elements(self) -> list[api_element]: |
| 42 | + return [self.apiv1_element] |
| 43 | + |
| 44 | + def get_apiv2_elements(self) -> list[api_element]: |
| 45 | + return [self.apiv2_element] |
| 46 | + |
| 47 | + |
| 48 | +@dataclass |
| 49 | +class OneToManyMapping(Mapping): |
| 50 | + apiv1_element: api_element |
| 51 | + apiv2_elements: list[api_element] |
| 52 | + |
| 53 | + def get_apiv1_elements(self) -> list[api_element]: |
| 54 | + return [self.apiv1_element] |
| 55 | + |
| 56 | + def get_apiv2_elements(self) -> list[api_element]: |
| 57 | + return self.apiv2_elements |
| 58 | + |
| 59 | + |
| 60 | +@dataclass |
| 61 | +class ManyToOneMapping(Mapping): |
| 62 | + apiv1_elements: list[api_element] |
| 63 | + apiv2_element: api_element |
| 64 | + |
| 65 | + def get_apiv1_elements(self) -> list[api_element]: |
| 66 | + return self.apiv1_elements |
| 67 | + |
| 68 | + def get_apiv2_elements(self) -> list[api_element]: |
| 69 | + return [self.apiv2_element] |
| 70 | + |
| 71 | + |
| 72 | +@dataclass |
| 73 | +class ManyToManyMapping(Mapping): |
| 74 | + apiv1_elements: list[api_element] |
| 75 | + apiv2_elements: list[api_element] |
| 76 | + |
| 77 | + def get_apiv1_elements(self) -> list[api_element]: |
| 78 | + return self.apiv1_elements |
| 79 | + |
| 80 | + def get_apiv2_elements(self) -> list[api_element]: |
| 81 | + return self.apiv2_elements |
| 82 | + |
| 83 | + |
| 84 | +def merge_mappings(mapping_a: Mapping, mapping_b: Mapping) -> Mapping: |
| 85 | + similarity = (mapping_a.similarity + mapping_b.similarity) / 2 |
| 86 | + codomain: list[api_element] = list( |
| 87 | + set(mapping_a.get_apiv2_elements()) | set(mapping_b.get_apiv2_elements()) |
| 88 | + ) |
| 89 | + domain: list[api_element] = list( |
| 90 | + set(mapping_a.get_apiv1_elements()) | set(mapping_b.get_apiv1_elements()) |
| 91 | + ) |
| 92 | + if len(domain) == 1 and len(codomain) == 1: |
| 93 | + return OneToOneMapping(similarity, domain[0], codomain[0]) |
| 94 | + if len(domain) == 1: |
| 95 | + return OneToManyMapping(similarity, domain[0], codomain) |
| 96 | + if len(codomain) == 1: |
| 97 | + return ManyToOneMapping(similarity, domain, codomain[0]) |
| 98 | + return ManyToManyMapping(similarity, domain, codomain) |
| 99 | + |
| 100 | + |
| 101 | +class APIMapping: |
| 102 | + threshold_of_similarity_between_mappings: float |
| 103 | + threshold_of_similarity_for_creation_of_mappings: float |
| 104 | + apiv1: API |
| 105 | + apiv2: API |
| 106 | + differ: AbstractDiffer |
| 107 | + |
| 108 | + def __init__( |
| 109 | + self, |
| 110 | + apiv1: API, |
| 111 | + apiv2: API, |
| 112 | + differ: AbstractDiffer, |
| 113 | + threshold_of_similarity_for_creation_of_mappings=0.5, |
| 114 | + threshold_of_similarity_between_mappings=0.05, |
| 115 | + ): |
| 116 | + self.apiv1 = apiv1 |
| 117 | + self.apiv2 = apiv2 |
| 118 | + self.differ = differ |
| 119 | + self.threshold_of_similarity_for_creation_of_mappings = ( |
| 120 | + threshold_of_similarity_for_creation_of_mappings |
| 121 | + ) |
| 122 | + self.threshold_of_similarity_between_mappings = ( |
| 123 | + threshold_of_similarity_between_mappings |
| 124 | + ) |
| 125 | + |
| 126 | + def _get_mappings_for_api_elements( |
| 127 | + self, |
| 128 | + api_elementv1_list: List[API_ELEMENTS], |
| 129 | + api_elementv2_list: List[API_ELEMENTS], |
| 130 | + compute_similarity: Callable[[API_ELEMENTS, API_ELEMENTS], float], |
| 131 | + ) -> list[Mapping]: |
| 132 | + element_mappings: list[Mapping] = [] |
| 133 | + for api_elementv1 in api_elementv1_list: |
| 134 | + mapping_for_class_1: list[Mapping] = [] |
| 135 | + for api_elementv2 in api_elementv2_list: |
| 136 | + similarity = compute_similarity(api_elementv1, api_elementv2) |
| 137 | + if similarity >= self.threshold_of_similarity_for_creation_of_mappings: |
| 138 | + mapping_for_class_1.append( |
| 139 | + OneToOneMapping(similarity, api_elementv1, api_elementv2) |
| 140 | + ) |
| 141 | + mapping_for_class_1.sort(key=Mapping.get_similarity, reverse=True) |
| 142 | + new_mapping = self._merge_similar_mappings(mapping_for_class_1) |
| 143 | + if new_mapping is not None: |
| 144 | + self._merge_mappings_with_same_elements(new_mapping, element_mappings) |
| 145 | + return element_mappings |
| 146 | + |
| 147 | + def map_api(self) -> List[Mapping]: |
| 148 | + mappings: List[Mapping] = [] |
| 149 | + mappings.extend( |
| 150 | + self._get_mappings_for_api_elements( |
| 151 | + list(self.apiv1.classes.values()), |
| 152 | + list(self.apiv2.classes.values()), |
| 153 | + self.differ.compute_class_similarity, |
| 154 | + ) |
| 155 | + ) |
| 156 | + mappings.extend( |
| 157 | + self._get_mappings_for_api_elements( |
| 158 | + list(self.apiv1.functions.values()), |
| 159 | + list(self.apiv2.functions.values()), |
| 160 | + self.differ.compute_function_similarity, |
| 161 | + ) |
| 162 | + ) |
| 163 | + mappings.extend( |
| 164 | + self._get_mappings_for_api_elements( |
| 165 | + list(self.apiv1.parameters().values()), |
| 166 | + list(self.apiv2.parameters().values()), |
| 167 | + self.differ.compute_parameter_similarity, |
| 168 | + ) |
| 169 | + ) |
| 170 | + |
| 171 | + mappings.extend( |
| 172 | + self._get_mappings_for_api_elements( |
| 173 | + [ |
| 174 | + attribute |
| 175 | + for class_ in self.apiv1.classes.values() |
| 176 | + for attribute in class_.instance_attributes |
| 177 | + ], |
| 178 | + [ |
| 179 | + attribute |
| 180 | + for class_ in self.apiv2.classes.values() |
| 181 | + for attribute in class_.instance_attributes |
| 182 | + ], |
| 183 | + self.differ.compute_attribute_similarity, |
| 184 | + ) |
| 185 | + ) |
| 186 | + |
| 187 | + mappings.extend( |
| 188 | + self._get_mappings_for_api_elements( |
| 189 | + [ |
| 190 | + result |
| 191 | + for function in self.apiv1.functions.values() |
| 192 | + for result in function.results |
| 193 | + ], |
| 194 | + [ |
| 195 | + result |
| 196 | + for function in self.apiv2.functions.values() |
| 197 | + for result in function.results |
| 198 | + ], |
| 199 | + self.differ.compute_result_similarity, |
| 200 | + ) |
| 201 | + ) |
| 202 | + |
| 203 | + mappings.sort(key=Mapping.get_similarity, reverse=True) |
| 204 | + return mappings |
| 205 | + |
| 206 | + def _merge_similar_mappings(self, mappings: List[Mapping]) -> Optional[Mapping]: |
| 207 | + """ |
| 208 | + Given a list of OneToOne(Many)Mappings which apiv1 element is the same, this method returns the best mapping |
| 209 | + from this apiv1 element to apiv2 elements by merging the first and second elements recursively, |
| 210 | + if the difference in similarity is smaller than THRESHOLD_OF_SIMILARITY_BETWEEN_MAPPINGS. |
| 211 | +
|
| 212 | + :param mappings: mappings sorted by decreasing similarity, which apiv1 element is the same |
| 213 | + :return: the first element of the sorted list that could be a result of merged similar mappings |
| 214 | + """ |
| 215 | + if len(mappings) == 0: |
| 216 | + return None |
| 217 | + if len(mappings) == 1: |
| 218 | + return mappings[0] |
| 219 | + if ( |
| 220 | + mappings[0].similarity - mappings[1].similarity |
| 221 | + < self.threshold_of_similarity_between_mappings |
| 222 | + ): |
| 223 | + mappings[0] = merge_mappings(mappings[0], mappings[1]) |
| 224 | + mappings.pop(1) |
| 225 | + return self._merge_similar_mappings(mappings) |
| 226 | + return mappings[0] |
| 227 | + |
| 228 | + def _merge_mappings_with_same_elements( |
| 229 | + self, mapping_to_be_appended: Mapping, mappings: list[Mapping] |
| 230 | + ): |
| 231 | + """ |
| 232 | + This method prevents that an element in a mapping appears multiple times in a list of mappings |
| 233 | + by merging the affected mappings and include the result in the list. If there is no such element, |
| 234 | + the mapping will be included without any merge. |
| 235 | +
|
| 236 | + :param mapping_to_be_appended: the mapping that should be included in mappings |
| 237 | + :param mappings: the list, in which mapping_to_be_appended should be appended |
| 238 | + """ |
| 239 | + duplicated: list[Mapping] = [] |
| 240 | + for mapping in mappings: |
| 241 | + duplicated_element = False |
| 242 | + for element in mapping.get_apiv2_elements(): |
| 243 | + for element_2 in mapping_to_be_appended.get_apiv2_elements(): |
| 244 | + if element == element_2: |
| 245 | + duplicated_element = True |
| 246 | + break |
| 247 | + if duplicated_element: |
| 248 | + duplicated.append(mapping) |
| 249 | + |
| 250 | + if len(duplicated) == 0: |
| 251 | + mappings.append(mapping_to_be_appended) |
| 252 | + return |
| 253 | + |
| 254 | + for conflicted_mapping in duplicated: |
| 255 | + mapping_to_be_appended = merge_mappings( |
| 256 | + mapping_to_be_appended, conflicted_mapping |
| 257 | + ) |
| 258 | + mappings.remove(conflicted_mapping) |
| 259 | + |
| 260 | + mappings.append(mapping_to_be_appended) |
0 commit comments