Skip to content

Commit 1defd3f

Browse files
committed
move indexer to a proper module
1 parent 8d965c5 commit 1defd3f

File tree

14 files changed

+182
-43
lines changed

14 files changed

+182
-43
lines changed
File renamed without changes.
File renamed without changes.
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2019 CERN.
4+
#
5+
# inspirehep is free software; you can redistribute it and/or modify it under
6+
# the terms of the MIT License; see LICENSE file for more details.
7+
8+
import structlog
9+
from celery import shared_task
10+
from elasticsearch import NotFoundError
11+
from sqlalchemy.exc import (
12+
DisconnectionError,
13+
OperationalError,
14+
ResourceClosedError,
15+
TimeoutError,
16+
UnboundExecutionError,
17+
)
18+
from sqlalchemy.orm.exc import NoResultFound, StaleDataError
19+
20+
from inspirehep.indexer.base import InspireRecordIndexer
21+
from inspirehep.indexer.utils import get_record
22+
from inspirehep.records.api import AuthorsRecord, LiteratureRecord
23+
24+
LOGGER = structlog.getLogger()
25+
26+
27+
CELERY_INDEX_RECORD_RETRY_ON_EXCEPTIONS = (
28+
NoResultFound,
29+
StaleDataError,
30+
DisconnectionError,
31+
TimeoutError,
32+
UnboundExecutionError,
33+
ResourceClosedError,
34+
OperationalError,
35+
)
36+
37+
38+
@shared_task(ignore_result=False, bind=True)
39+
def batch_index(self, records_uuids, request_timeout=None):
40+
"""Process all provided references and index them in bulk.
41+
Be sure that uuids are not duplicated in batch.
42+
Args:
43+
records_uuids (list): list of uuids to process. All duplicates will be removed.
44+
request_timeout: Timeout in which ES should respond. Otherwise break.
45+
46+
Returns:
47+
dict: dict with success count and failure list
48+
(with uuids of failed records)
49+
"""
50+
LOGGER.info(f"Starting task `batch_index for {len(records_uuids)} records")
51+
return InspireRecordIndexer().bulk_index(records_uuids, request_timeout)
52+
53+
54+
def process_references_for_record(record):
55+
"""Tries to find differences in record references.
56+
57+
Gets all references from reference field and publication_info.conference_record
58+
field and forces to reindex records which reference changed to update
59+
their statistics.
60+
61+
Args:
62+
record: Record object in which references has changed.
63+
(not possible to pas this when called as a celery task)
64+
65+
Returns:
66+
list(str): Statistics from the job.
67+
"""
68+
uuids = record.get_modified_references()
69+
uuids.extend(record.get_newest_linked_conferences_uuid())
70+
uuids = list(set(uuids))
71+
if uuids:
72+
LOGGER.info(
73+
f"Found {len(uuids)} references changed, indexing them", uuid=str(record.id)
74+
)
75+
return batch_index(uuids)
76+
LOGGER.info("No references changed", uuid=str(record.id))
77+
78+
79+
def process_author_papers_if_author_changed_name(record):
80+
"""Checks if author has changed his name and reindexes all his papers if he did
81+
82+
Checks `name` dictionary to check if name or preferred name changed.
83+
84+
Args:
85+
record(AuthorsRecord): Author record for which name could change.
86+
87+
Returns:
88+
list(str): Statistics from the job.
89+
"""
90+
if record.get("name") == record._previous_version.get("name"):
91+
return None
92+
# This is not 100% safe as it might happen that paper will be in the middle
93+
# of indexing (with author loaded before name changes) but not yet in ES.
94+
# This might result in paper not re-indexed with proper data.
95+
# Chances that this will happen are extremely small, but non 0.
96+
# For now we should try this solution as it's faster and cheaper,
97+
# but if we will notice records which are not updated,
98+
# we should consider more complex way.
99+
# Solution to this would be to create table similar to citations table which would
100+
# hold relation between papers and authors
101+
# and it would be source for papers of author.
102+
uuids = record.get_papers_uuids()
103+
if uuids:
104+
LOGGER.info(
105+
f"Found {len(uuids)} papers assigned to author whose name changed. "
106+
f"Indexing all of them.",
107+
uuid=str(record.id),
108+
)
109+
return batch_index(uuids)
110+
111+
112+
@shared_task(
113+
ignore_result=True,
114+
bind=True,
115+
retry_backoff=2,
116+
retry_kwargs={"max_retries": 6},
117+
autoretry_for=CELERY_INDEX_RECORD_RETRY_ON_EXCEPTIONS,
118+
)
119+
def index_record(self, uuid, record_version=None, force_delete=None):
120+
"""Record indexing.
121+
122+
Args:
123+
self: task instance (binded automatically)
124+
uuid (str): UUID of the record which should be reindexed.
125+
record_version (int): Version of the record to reindex (will be checked).
126+
force_delete (bool): if set to True will delete record from es even if
127+
metadata says that record is not deleted.
128+
Returns:
129+
list(dict): Statistics from processing references.
130+
"""
131+
LOGGER.debug("Indexing record", uuid=str(uuid), version=record_version)
132+
record = get_record(uuid, record_version)
133+
134+
if not force_delete:
135+
deleted = record.get("deleted", False)
136+
137+
if force_delete or deleted:
138+
try:
139+
InspireRecordIndexer().delete(record)
140+
LOGGER.debug("Record removed from ES", uuid=str(uuid))
141+
except NotFoundError:
142+
LOGGER.debug("Record to delete not found", uuid=str(uuid))
143+
else:
144+
InspireRecordIndexer().index(record)
145+
146+
if isinstance(record, LiteratureRecord):
147+
process_references_for_record(record=record)
148+
if isinstance(record, AuthorsRecord):
149+
process_author_papers_if_author_changed_name(record=record)
File renamed without changes.

backend/inspirehep/migrator/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from invenio_pidstore.models import PersistentIdentifier
2727
from jsonschema import ValidationError
2828

29+
from inspirehep.indexer.tasks import batch_index
2930
from inspirehep.migrator.models import LegacyRecordsMirror
3031
from inspirehep.migrator.utils import (
3132
cache_afs_file_locations,
@@ -37,7 +38,6 @@
3738
from inspirehep.pidstore.api import PidStoreBase
3839
from inspirehep.records.api import InspireRecord, LiteratureRecord
3940
from inspirehep.records.errors import DownloadFileError
40-
from inspirehep.records.indexer.tasks import batch_index
4141
from inspirehep.records.receivers import index_after_commit
4242
from inspirehep.records.tasks import update_records_relations
4343
from inspirehep.utils import chunker

backend/inspirehep/records/api/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
from sqlalchemy import tuple_
2424
from sqlalchemy.orm.attributes import flag_modified
2525

26+
from inspirehep.indexer.base import InspireRecordIndexer
2627
from inspirehep.pidstore.api import PidStoreBase
2728
from inspirehep.records.errors import MissingSerializerError, WrongRecordSubclass
28-
from inspirehep.records.indexer.base import InspireRecordIndexer
2929

3030
LOGGER = structlog.getLogger()
3131

@@ -377,7 +377,7 @@ def index(self, force_delete=None, delay=True):
377377
If not set, tries to determine automatically if record should be deleted
378378
delay: if True will start the index task async otherwise async.
379379
"""
380-
from inspirehep.records.indexer.tasks import index_record
380+
from inspirehep.indexer.tasks import index_record
381381

382382
arguments = {
383383
"uuid": str(self.id),

backend/inspirehep/records/indexer/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
)
1818
from sqlalchemy.orm.exc import NoResultFound, StaleDataError
1919

20+
from inspirehep.indexer.base import InspireRecordIndexer
21+
from inspirehep.indexer.utils import get_record
2022
from inspirehep.records.api import AuthorsRecord, LiteratureRecord
21-
from inspirehep.records.indexer.base import InspireRecordIndexer
22-
from inspirehep.records.indexer.utils import get_record
2323

2424
LOGGER = structlog.getLogger()
2525

backend/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
"inspirehep_migrator = inspirehep.migrator.models",
9393
],
9494
"invenio_celery.tasks": [
95-
"inspirehep_indexer = inspirehep.records.indexer.tasks",
95+
"inspirehep_indexer = inspirehep.indexer.tasks",
9696
"inspirehep_migrator = inspirehep.migrator.tasks",
9797
"inspirehep_orcid = inspirehep.orcid.tasks",
9898
"inspirehep_submissions = inspirehep.submissions.tasks",

backend/tests/integration-async/records/indexer/test_indexer_cli.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from invenio_records.models import RecordMetadata
1010
from mock import patch
1111

12+
from inspirehep.indexer.cli import get_query_records_to_index, reindex_records
1213
from inspirehep.records.api import (
1314
AuthorsRecord,
1415
ConferencesRecord,
@@ -19,7 +20,6 @@
1920
JournalsRecord,
2021
LiteratureRecord,
2122
)
22-
from inspirehep.records.indexer.cli import get_query_records_to_index, reindex_records
2323

2424

2525
@pytest.fixture
@@ -68,7 +68,7 @@ def test_reindex_record_lit_fails_with_invalid_record(
6868
check_n_records_reindex_for_pidtype,
6969
):
7070
broken_field = {"_desy_bookkeeping": {"date": '"2013-01-14_final'}}
71-
with patch("inspirehep.records.indexer.base.InspireRecordIndexer"):
71+
with patch("inspirehep.indexer.base.InspireRecordIndexer"):
7272
with patch("inspirehep.records.api.base.schema_validate"):
7373
generate_records(count=1, data=broken_field, skip_validation=True)
7474

@@ -84,7 +84,7 @@ def test_reindex_record_lit_fails_with_invalid_field_content(
8484
):
8585
invalid_field = {"titles": ["i am not an object"]}
8686

87-
with patch("inspirehep.records.indexer.base.InspireRecordIndexer"):
87+
with patch("inspirehep.indexer.base.InspireRecordIndexer"):
8888
with patch("inspirehep.records.api.base.schema_validate"):
8989
generate_records(count=1, data=invalid_field, skip_validation=True)
9090

@@ -101,7 +101,7 @@ def test_reindex_records_lit_one_fails_and_two_ok(
101101
invalid_field = {"titles": ["i am not an object"]}
102102

103103
generate_records(count=2)
104-
with patch("inspirehep.records.indexer.base.InspireRecordIndexer"):
104+
with patch("inspirehep.indexer.base.InspireRecordIndexer"):
105105
with patch("inspirehep.records.api.base.schema_validate"):
106106
generate_records(count=1, data=invalid_field, skip_validation=True)
107107

0 commit comments

Comments
 (0)