From 148b73aa9a34c6bfe260eeecf822142fca4c9517 Mon Sep 17 00:00:00 2001 From: Myst <1592048+LeMyst@users.noreply.github.com> Date: Sat, 18 Dec 2021 01:41:30 +0100 Subject: [PATCH] Update wbi_helpers.py --- wikibaseintegrator/wbi_helpers.py | 61 +++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/wikibaseintegrator/wbi_helpers.py b/wikibaseintegrator/wbi_helpers.py index 0012bfd2..78ab3eb3 100644 --- a/wikibaseintegrator/wbi_helpers.py +++ b/wikibaseintegrator/wbi_helpers.py @@ -15,6 +15,8 @@ import ujson from requests import Session +from wikibaseintegrator.datatypes import ExternalID, Form, Item, Lexeme, Property, Quantity, Sense, String +from wikibaseintegrator.models import Claim from wikibaseintegrator.wbi_backoff import wbi_backoff from wikibaseintegrator.wbi_config import config from wikibaseintegrator.wbi_exceptions import MaxRetriesReachedException, ModificationFailed, MWApiError, NonExistentEntityError, SearchError @@ -743,6 +745,65 @@ def generate_entity_instances(entities: str | list[str], allow_anonymous: bool = return entity_instances +def wb_cirrus_search(haswbstatement=None, hasnotwbstatement=None, use_statement_quantity=False, hasdescription=None, haslabel=None, max_results=500, dict_result=False, + allow_anonymous=True, **kwargs): + if isinstance(haswbstatement, Claim): + haswbstatement = [haswbstatement] + + search_string = "" + + for statement in haswbstatement: + if isinstance(statement, (ExternalID, Form, Item, Lexeme, Property, Sense, String)): + search_string += f" haswbstatement:{statement.mainsnak.property_number}={statement.mainsnak.datavalue}" + elif isinstance(statement, Quantity): + search_string += f" wbstatementquantity:{statement.mainsnak.property_number}={statement.mainsnak.datavalue}" + else: + raise ValueError(f"Instance type {statement.DTYPE} is not supported.") + + params = { + 'action': 'query', + 'list': 'search', + 'srsearch': search_string, + 'srlimit': 50, + 'format': 'json' + } + + cont_count = 0 + results = [] + + while True: + params.update({'continue': cont_count}) + + search_results = mediawiki_api_call_helper(data=params, allow_anonymous=allow_anonymous, **kwargs) + + if search_results['success'] != 1: + raise SearchError('Wikibase API wbsearchentities failed') + + for i in search_results['search']: + if dict_result: + description = i['description'] if 'description' in i else None + aliases = i['aliases'] if 'aliases' in i else None + results.append({ + 'id': i['id'], + 'label': i['label'], + 'match': i['match'], + 'description': description, + 'aliases': aliases + }) + else: + results.append(i['id']) + + if 'search-continue' not in search_results: + break + + cont_count = search_results['search-continue'] + + if cont_count >= max_results: + break + + return results + + def delete_page(title: str | None = None, pageid: int | None = None, reason: str | None = None, deletetalk: bool = False, watchlist: str = 'preferences', watchlistexpiry: str | None = None, login: _Login | None = None, **kwargs: Any) -> dict: """