Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,19 @@ def _get_statement(self, entity: dict) -> Optional[dict]:
return statement
return None

def _get_statements(self, entity: dict) -> List[dict]:
"""
Returns the statements that matches the command's value.

Returns an empty list if there is no matching statement.
"""
statements = entity["statements"].setdefault(self.prop, [])
matching = []
for i, statement in enumerate(statements):
if statement["value"] == self.statement_api_value:
matching.append(statement)
return matching

def _update_entity_statements(self, entity: dict):
"""
Modifies the entity json statements in-place.
Expand All @@ -1688,21 +1701,28 @@ def _remove_qualifier_or_reference(self, entity: dict):
"""
Removes a qualifier or a reference from the entity.
"""
statement = self._get_statement(entity)
statement = statement if statement else {}
statements = self._get_statements(entity)
found_qualifier = False
found_ref_part = False
for i, qual in enumerate(statement.get("qualifiers", [])):
if self.is_in_qualifiers(qual):
statement["qualifiers"].pop(i)
found_qualifier = True
for statement in statements:
for i, qual in enumerate(statement.get("qualifiers", [])):
if self.is_in_qualifiers(qual):
statement["qualifiers"].pop(i)
found_qualifier = True
break
if found_qualifier:
break
for i, ref in enumerate(statement.get("references", [])):
for j, part in enumerate(ref["parts"]):
if self.is_part_in_references(part):
statement["references"][i]["parts"].pop(j)
found_ref_part = True
for statement in statements:
for i, ref in enumerate(statement.get("references", [])):
for j, part in enumerate(ref["parts"]):
if self.is_part_in_references(part):
statement["references"][i]["parts"].pop(j)
found_ref_part = True
break
if found_ref_part:
break
# any better way to do this? :P
# (without refactoring into a different function...)
if found_ref_part:
break
if not found_qualifier and len(self.qualifiers_for_api()) > 0:
Expand Down
122 changes: 122 additions & 0 deletions src/core/tests/test_entity_patching.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ def assertQualCount(self, entity: dict, property_id: str, length: int, i: int =

def assertRefCount(self, entity: dict, property_id: str, length: int, i: int = 0):
refs = entity["statements"][property_id][i].get("references", [])
# FIXME: remove empty refs for now, original code should be doing that
refs = [r for r in refs if len(r["parts"]) > 0]
self.assertEqual(len(refs), length)

def assertRefPartsCount(
Expand Down Expand Up @@ -244,6 +246,126 @@ def test_remove_qualifier(self):
remove_nothing.update_entity_json(entity)
self.assertQualCount(entity, "P65", 1)

def test_remove_from_available_statement(self):
"""
this will create a new statement, which will be in the second position
then, it will try to remove the qualifier and the reference
from that new stamement,
both of which do not exist on the first
"""
text = """
REMOVE_QUAL|Q12345678|P65|42|P84267|123
REMOVE_REF|Q12345678|P65|42|S93|"https://pt.wikipedia.org/"

+Q12345678|P65|42|P84267|123|S93|"https://pt.wikipedia.org/"

REMOVE_QUAL|Q12345678|P65|42|P84267|123
REMOVE_REF|Q12345678|P65|42|S93|"https://pt.wikipedia.org/"
"""
batch = self.parse(text)
entity = copy.deepcopy(self.INITIAL)
# -----
remove_qual = batch.commands()[0]
with self.assertRaises(NoQualifiers):
remove_qual.update_entity_json(entity)
# -----
remove_ref_part = batch.commands()[1]
with self.assertRaises(NoReferenceParts):
remove_ref_part.update_entity_json(entity)
# -----
create_statement = batch.commands()[2]
self.assertStmtnCount(entity, "P65", 1)
create_statement.update_entity_json(entity)
self.assertStmtnCount(entity, "P65", 2)
# -----
remove_qual = batch.commands()[3]
self.assertQualCount(entity, "P65", 2, i=0)
self.assertQualCount(entity, "P65", 1, i=1)
# now it should try to remove from the first, then the second
remove_qual.update_entity_json(entity)
self.assertQualCount(entity, "P65", 2, i=0)
self.assertQualCount(entity, "P65", 0, i=1)
# -----
remove_ref_part = batch.commands()[4]
self.assertRefCount(entity, "P65", 0, i=0)
self.assertRefCount(entity, "P65", 1, i=1)
# now it should try to remove from the first, then the second
remove_ref_part.update_entity_json(entity)
self.assertRefCount(entity, "P65", 0, i=0)
self.assertRefCount(entity, "P65", 0, i=1)

def test_remove_qualifier_from_first_statement(self):
"""
this will create a new statement, which will be in the second position
then, it will try to remove the qualifier and the reference
that is equal on both
it will remove from the first, then the second, then error
"""
text = """
+Q12345678|P31|somevalue|P18|+2025-01-15T00:00:00Z/11|S93|"https://kernel.org/"
REMOVE_QUAL|Q12345678|P31|somevalue|P18|+2025-01-15T00:00:00Z/11
REMOVE_QUAL|Q12345678|P31|somevalue|P18|+2025-01-15T00:00:00Z/11
REMOVE_QUAL|Q12345678|P31|somevalue|P18|+2025-01-15T00:00:00Z/11
REMOVE_REF|Q12345678|P31|somevalue|S93|"https://kernel.org/"
REMOVE_REF|Q12345678|P31|somevalue|S93|"https://kernel.org/"
REMOVE_REF|Q12345678|P31|somevalue|S93|"https://kernel.org/"
"""
batch = self.parse(text)
entity = copy.deepcopy(self.INITIAL)
create_statement = batch.commands()[0]
self.assertStmtnCount(entity, "P31", 1)
create_statement.update_entity_json(entity)
self.assertStmtnCount(entity, "P31", 2)
# -----
remove_qual = batch.commands()[1]
self.assertQualCount(entity, "P31", 1, i=0)
self.assertQualCount(entity, "P31", 1, i=1)
remove_qual.update_entity_json(entity)
self.assertQualCount(entity, "P31", 0, i=0)
self.assertQualCount(entity, "P31", 1, i=1)
# -----
remove_qual = batch.commands()[2]
self.assertQualCount(entity, "P31", 0, i=0)
self.assertQualCount(entity, "P31", 1, i=1)
remove_qual.update_entity_json(entity)
self.assertQualCount(entity, "P31", 0, i=0)
self.assertQualCount(entity, "P31", 0, i=1)
# -----
remove_qual = batch.commands()[3]
with self.assertRaises(NoQualifiers):
remove_qual.update_entity_json(entity)
# -----
remove_ref_part = batch.commands()[4]
self.assertRefCount(entity, "P31", 2)
self.assertRefPartsCount(entity, "P31", 2, ipart=0)
self.assertRefPartsCount(entity, "P31", 3, ipart=1)
self.assertRefCount(entity, "P31", 1, i=1)
self.assertRefPartsCount(entity, "P31", 1, i=1, ipart=0)
remove_ref_part.update_entity_json(entity)
self.assertRefCount(entity, "P31", 2)
self.assertRefPartsCount(entity, "P31", 1, ipart=0)
self.assertRefPartsCount(entity, "P31", 3, ipart=1)
self.assertRefCount(entity, "P31", 1, i=1)
self.assertRefPartsCount(entity, "P31", 1, i=1, ipart=0)
# -----
remove_ref_part = batch.commands()[5]
self.assertRefCount(entity, "P31", 2)
self.assertRefPartsCount(entity, "P31", 1, ipart=0)
self.assertRefPartsCount(entity, "P31", 3, ipart=1)
self.assertRefCount(entity, "P31", 1, i=1)
self.assertRefPartsCount(entity, "P31", 1, i=1, ipart=0)
remove_ref_part.update_entity_json(entity)
self.assertRefCount(entity, "P31", 2)
self.assertRefPartsCount(entity, "P31", 1, ipart=0)
self.assertRefPartsCount(entity, "P31", 3, ipart=1)
self.assertRefCount(entity, "P31", 0, i=1)
self.assertRefPartsCount(entity, "P31", 0, i=1, ipart=0)
# -----
remove_ref_part = batch.commands()[6]
with self.assertRaises(NoReferenceParts):
remove_ref_part.update_entity_json(entity)


def test_remove_reference(self):
text = """
REMOVE_REF|Q12345678|P65|42|S31|somevalue
Expand Down