|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import copy |
| 4 | +from typing import Any, Callable, Dict, List, Optional, Union |
| 5 | + |
| 6 | +import wikibaseintegrator.models.basemodel |
| 7 | +from wikibaseintegrator.models.qualifiers import Qualifiers |
| 8 | +from wikibaseintegrator.models.reference import Reference |
| 9 | +from wikibaseintegrator.models.references import References |
| 10 | +from wikibaseintegrator.models.snak import Snak |
| 11 | +from wikibaseintegrator.models.snaks import Snaks |
| 12 | +from wikibaseintegrator.wbi_enums import WikibaseRank |
| 13 | + |
| 14 | + |
| 15 | +class Claim(wikibaseintegrator.models.basemodel.BaseModel): |
| 16 | + DTYPE = 'claim' |
| 17 | + |
| 18 | + def __init__(self, qualifiers: Qualifiers = None, rank: WikibaseRank = None, references: Union[References, List[Union[Claim, List[Claim]]]] = None) -> None: |
| 19 | + """ |
| 20 | +
|
| 21 | + :param qualifiers: |
| 22 | + :param rank: |
| 23 | + :param references: A References object, a list of Claim object or a list of list of Claim object |
| 24 | + """ |
| 25 | + self.mainsnak = Snak(datatype=self.DTYPE) |
| 26 | + self.type = 'statement' |
| 27 | + self.qualifiers = qualifiers or Qualifiers() |
| 28 | + self.qualifiers_order = [] |
| 29 | + self.id = None |
| 30 | + self.rank = rank or WikibaseRank.NORMAL |
| 31 | + self.removed = False |
| 32 | + |
| 33 | + self.references = References() |
| 34 | + |
| 35 | + if isinstance(references, References): |
| 36 | + self.references = references |
| 37 | + elif isinstance(references, list): |
| 38 | + for ref_list in references: |
| 39 | + ref = Reference() |
| 40 | + if isinstance(ref_list, list): |
| 41 | + snaks = Snaks() |
| 42 | + for ref_claim in ref_list: |
| 43 | + if isinstance(ref_claim, Claim): |
| 44 | + snaks.add(Snak().from_json(ref_claim.get_json()['mainsnak'])) |
| 45 | + else: |
| 46 | + raise ValueError("The references must be a References object or a list of Claim object") |
| 47 | + ref.snaks = snaks |
| 48 | + elif isinstance(ref_list, Claim): |
| 49 | + ref.snaks = Snaks().add(Snak().from_json(ref_list.get_json()['mainsnak'])) |
| 50 | + elif isinstance(ref_list, Reference): |
| 51 | + ref = ref_list |
| 52 | + self.references.add(reference=ref) |
| 53 | + elif references is not None: |
| 54 | + raise ValueError("The references must be a References object or a list of Claim object") |
| 55 | + |
| 56 | + @property |
| 57 | + def mainsnak(self) -> Snak: |
| 58 | + return self.__mainsnak |
| 59 | + |
| 60 | + @mainsnak.setter |
| 61 | + def mainsnak(self, value: Snak): |
| 62 | + self.__mainsnak = value |
| 63 | + |
| 64 | + @property |
| 65 | + def type(self) -> Union[str, Dict]: |
| 66 | + return self.__type |
| 67 | + |
| 68 | + @type.setter |
| 69 | + def type(self, value: Union[str, Dict]): |
| 70 | + self.__type = value |
| 71 | + |
| 72 | + @property |
| 73 | + def qualifiers(self) -> Qualifiers: |
| 74 | + return self.__qualifiers |
| 75 | + |
| 76 | + @qualifiers.setter |
| 77 | + def qualifiers(self, value: Qualifiers) -> None: |
| 78 | + assert isinstance(value, (Qualifiers, list)) |
| 79 | + self.__qualifiers: Qualifiers = Qualifiers().set(value) if isinstance(value, list) else value |
| 80 | + |
| 81 | + @property |
| 82 | + def qualifiers_order(self) -> List[str]: |
| 83 | + return self.__qualifiers_order |
| 84 | + |
| 85 | + @qualifiers_order.setter |
| 86 | + def qualifiers_order(self, value: List[str]): |
| 87 | + self.__qualifiers_order = value |
| 88 | + |
| 89 | + @property |
| 90 | + def id(self) -> Optional[str]: |
| 91 | + return self.__id |
| 92 | + |
| 93 | + @id.setter |
| 94 | + def id(self, value: Optional[str]): |
| 95 | + self.__id = value |
| 96 | + |
| 97 | + @property |
| 98 | + def rank(self) -> WikibaseRank: |
| 99 | + return self.__rank |
| 100 | + |
| 101 | + @rank.setter |
| 102 | + def rank(self, value: WikibaseRank): |
| 103 | + """Parse the rank. The enum thows an error if it is not one of the recognized values""" |
| 104 | + self.__rank = WikibaseRank(value) |
| 105 | + |
| 106 | + @property |
| 107 | + def references(self) -> References: |
| 108 | + return self.__references |
| 109 | + |
| 110 | + @references.setter |
| 111 | + def references(self, value: References): |
| 112 | + self.__references = value |
| 113 | + |
| 114 | + @property |
| 115 | + def removed(self) -> bool: |
| 116 | + return self.__removed |
| 117 | + |
| 118 | + @removed.setter |
| 119 | + def removed(self, value: bool): |
| 120 | + self.__removed = value |
| 121 | + |
| 122 | + def remove(self, remove=True) -> None: |
| 123 | + self.removed = remove |
| 124 | + |
| 125 | + def update(self, claim: Claim) -> None: |
| 126 | + self.mainsnak = claim.mainsnak |
| 127 | + self.qualifiers = claim.qualifiers |
| 128 | + self.qualifiers_order = claim.qualifiers_order |
| 129 | + self.rank = claim.rank |
| 130 | + self.references = claim.references |
| 131 | + |
| 132 | + def from_json(self, json_data: Dict[str, Any]) -> Claim: |
| 133 | + """ |
| 134 | +
|
| 135 | + :param json_data: a JSON representation of a Claim |
| 136 | + """ |
| 137 | + self.mainsnak = Snak().from_json(json_data['mainsnak']) |
| 138 | + self.type = str(json_data['type']) |
| 139 | + if 'qualifiers' in json_data: |
| 140 | + self.qualifiers = Qualifiers().from_json(json_data['qualifiers']) |
| 141 | + if 'qualifiers-order' in json_data: |
| 142 | + self.qualifiers_order = list(json_data['qualifiers-order']) |
| 143 | + self.id = str(json_data['id']) |
| 144 | + self.rank: WikibaseRank = WikibaseRank(json_data['rank']) |
| 145 | + if 'references' in json_data: |
| 146 | + self.references = References().from_json(json_data['references']) |
| 147 | + |
| 148 | + return self |
| 149 | + |
| 150 | + def get_json(self) -> Dict[str, Any]: |
| 151 | + json_data: Dict[str, Union[str, List[Dict], List[str], Dict[str, str], Dict[str, List], None]] = { |
| 152 | + 'mainsnak': self.mainsnak.get_json(), |
| 153 | + 'type': self.type, |
| 154 | + 'id': self.id, |
| 155 | + 'rank': self.rank.value |
| 156 | + } |
| 157 | + # Remove id if it's a temporary one |
| 158 | + if not self.id: |
| 159 | + del json_data['id'] |
| 160 | + if len(self.qualifiers) > 0: |
| 161 | + json_data['qualifiers'] = self.qualifiers.get_json() |
| 162 | + json_data['qualifiers-order'] = list(self.qualifiers_order) |
| 163 | + if len(self.references) > 0: |
| 164 | + json_data['references'] = self.references.get_json() |
| 165 | + if self.removed: |
| 166 | + if self.id: |
| 167 | + json_data['remove'] = '' |
| 168 | + return json_data |
| 169 | + |
| 170 | + def has_equal_qualifiers(self, other: Claim) -> bool: |
| 171 | + # check if the qualifiers are equal with the 'other' object |
| 172 | + self_qualifiers = copy.deepcopy(self.qualifiers) |
| 173 | + other_qualifiers = copy.deepcopy(other.qualifiers) |
| 174 | + |
| 175 | + if len(self_qualifiers) != len(other_qualifiers): |
| 176 | + return False |
| 177 | + |
| 178 | + for property_number in self_qualifiers.qualifiers: |
| 179 | + if property_number not in other_qualifiers.qualifiers: |
| 180 | + return False |
| 181 | + |
| 182 | + if len(self_qualifiers.qualifiers[property_number]) != len(other_qualifiers.qualifiers[property_number]): |
| 183 | + return False |
| 184 | + |
| 185 | + flg = [False for _ in range(len(self_qualifiers.qualifiers[property_number]))] |
| 186 | + for count, i in enumerate(self_qualifiers.qualifiers[property_number]): |
| 187 | + for q in other_qualifiers: |
| 188 | + if i == q: |
| 189 | + flg[count] = True |
| 190 | + if not all(flg): |
| 191 | + return False |
| 192 | + |
| 193 | + return True |
| 194 | + |
| 195 | + # TODO: rewrite this? |
| 196 | + def __contains__(self, item): |
| 197 | + if isinstance(item, Claim): |
| 198 | + return self == item |
| 199 | + |
| 200 | + if isinstance(item, str): |
| 201 | + return self.mainsnak.datavalue == item |
| 202 | + |
| 203 | + return super().__contains__(item) |
| 204 | + |
| 205 | + def __eq__(self, other): |
| 206 | + if isinstance(other, Claim): |
| 207 | + return self.mainsnak.datavalue == other.mainsnak.datavalue and self.mainsnak.property_number == other.mainsnak.property_number and self.has_equal_qualifiers(other) |
| 208 | + |
| 209 | + if isinstance(other, str): |
| 210 | + return self.mainsnak.property_number == other |
| 211 | + |
| 212 | + raise super().__eq__(other) |
| 213 | + |
| 214 | + def equals(self, that: Claim, include_ref: bool = False, fref: Callable = None) -> bool: |
| 215 | + """ |
| 216 | + Tests for equality of two statements. |
| 217 | + If comparing references, the order of the arguments matters!!! |
| 218 | + self is the current statement, the next argument is the new statement. |
| 219 | + Allows passing in a function to use to compare the references 'fref'. Default is equality. |
| 220 | + fref accepts two arguments 'oldrefs' and 'newrefs', each of which are a list of references, |
| 221 | + where each reference is a list of statements |
| 222 | + """ |
| 223 | + |
| 224 | + if not include_ref: |
| 225 | + # return the result of BaseDataType.__eq__, which is testing for equality of value and qualifiers |
| 226 | + return self == that |
| 227 | + |
| 228 | + if self != that: |
| 229 | + return False |
| 230 | + |
| 231 | + if fref is None: |
| 232 | + return Claim.refs_equal(self, that) |
| 233 | + |
| 234 | + return fref(self, that) |
| 235 | + |
| 236 | + @staticmethod |
| 237 | + def refs_equal(olditem: Claim, newitem: Claim) -> bool: |
| 238 | + """ |
| 239 | + tests for exactly identical references |
| 240 | + """ |
| 241 | + |
| 242 | + oldrefs = olditem.references |
| 243 | + newrefs = newitem.references |
| 244 | + |
| 245 | + def ref_equal(oldref: References, newref: References) -> bool: |
| 246 | + return (len(oldref) == len(newref)) and all(x in oldref for x in newref) |
| 247 | + |
| 248 | + return len(oldrefs) == len(newrefs) and all(any(ref_equal(oldref, newref) for oldref in oldrefs) for newref in newrefs) |
| 249 | + |
| 250 | + def get_sparql_value(self) -> str: |
| 251 | + pass |
0 commit comments