diff --git a/docker-compose.yml b/docker-compose.yml index f1437bebf..98e63509d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -184,7 +184,9 @@ services: worker: image: quay.io/centerforopenscience/share:develop - command: /usr/local/bin/celery --app project worker --uid daemon -l INFO + command: + chown -R daemon:daemon /elastic8_certs/ && + /usr/local/bin/celery --app project worker --uid daemon -l INFO depends_on: - postgres - rabbitmq diff --git a/project/settings.py b/project/settings.py index 0dafab53f..7b092fc10 100644 --- a/project/settings.py +++ b/project/settings.py @@ -314,6 +314,7 @@ def split(string, delim): 'TIMEOUT': int(os.environ.get('ELASTICSEARCH_TIMEOUT', '45')), 'CHUNK_SIZE': int(os.environ.get('ELASTICSEARCH_CHUNK_SIZE', 2000)), 'MAX_RETRIES': int(os.environ.get('ELASTICSEARCH_MAX_RETRIES', 7)), + 'POST_INDEX_DELAY': int(os.environ.get('ELASTICSEARCH_POST_INDEX_DELAY', 3)), } ELASTICSEARCH5_URL = ( os.environ.get('ELASTICSEARCH5_URL') diff --git a/share/search/daemon.py b/share/search/daemon.py index 017fede33..1fa7cce23 100644 --- a/share/search/daemon.py +++ b/share/search/daemon.py @@ -8,7 +8,6 @@ import threading import time -import amqp.exceptions from django.conf import settings import kombu from kombu.mixins import ConsumerMixin @@ -61,8 +60,6 @@ def start_daemonthreads_for_strategy(self, index_strategy): index_strategy=index_strategy, message_callback=_daemon.on_message, ) - # give the daemon a more robust callback for ack-ing - _daemon.ack_callback = _consumer.ensure_ack # spin up daemonthreads, ready for messages self._daemonthreads.extend(_daemon.start()) # start a thread to consume messages from this strategy's queues @@ -82,7 +79,7 @@ def stop_daemonthreads(self, *, wait=False): class KombuMessageConsumer(ConsumerMixin): - PREFETCH_COUNT = 7500 + PREFETCH_COUNT = settings.ELASTICSEARCH['CHUNK_SIZE'] should_stop: bool # (from ConsumerMixin) @@ -130,28 +127,9 @@ def consume(self, *args, **kwargs): consume = self.connection.ensure(self.connection, super().consume) return consume(*args, **kwargs) - def ensure_ack(self, daemon_message: messages.DaemonMessage): - # if the connection the message came thru is no longer usable, - # use `kombu.Connection.autoretry` to revive it for an ack - try: - daemon_message.ack() - except (ConnectionError, amqp.exceptions.ConnectionError): - @self.connection.autoretry - def _do_ack(*, channel): - try: - channel.basic_ack(daemon_message.kombu_message.delivery_tag) - finally: - channel.close() - _do_ack() - - -def _default_ack_callback(daemon_message: messages.DaemonMessage) -> None: - daemon_message.ack() - class IndexerDaemon: - MAX_LOCAL_QUEUE_SIZE = 5000 - ack_callback: Callable[[messages.DaemonMessage], None] + MAX_LOCAL_QUEUE_SIZE = settings.ELASTICSEARCH['CHUNK_SIZE'] def __init__(self, index_strategy, *, stop_event=None, daemonthread_context=None): self.stop_event = ( @@ -163,7 +141,6 @@ def __init__(self, index_strategy, *, stop_event=None, daemonthread_context=None self.__daemonthread_context = daemonthread_context or contextlib.nullcontext self.__local_message_queues = {} self.__started = False - self.ack_callback = _default_ack_callback def start(self) -> list[threading.Thread]: if self.__started: @@ -192,7 +169,6 @@ def start_typed_loop_and_queue(self, message_type) -> threading.Thread: local_message_queue=_queue_from_rabbit_to_daemon, log_prefix=f'{repr(self)} MessageHandlingLoop: ', daemonthread_context=self.__daemonthread_context, - ack_callback=self.ack_callback, ) return _handling_loop.start_thread() @@ -226,7 +202,6 @@ class MessageHandlingLoop: local_message_queue: queue.Queue log_prefix: str daemonthread_context: Callable[[], contextlib.AbstractContextManager] - ack_callback: Callable[[messages.DaemonMessage], None] _leftover_daemon_messages_by_target_id = None def __post_init__(self): @@ -310,7 +285,7 @@ def _handle_some_messages(self): sentry_sdk.capture_message('error handling message', extras={'message_response': message_response}) target_id = message_response.index_message.target_id for daemon_message in daemon_messages_by_target_id.pop(target_id, ()): - self.ack_callback(daemon_message) + daemon_message.ack() # finally set it free if daemon_messages_by_target_id: # should be empty by now logger.error('%sUnhandled messages?? %s', self.log_prefix, len(daemon_messages_by_target_id)) sentry_sdk.capture_message( diff --git a/share/search/index_strategy/elastic8.py b/share/search/index_strategy/elastic8.py index 052eadfdb..546889e9f 100644 --- a/share/search/index_strategy/elastic8.py +++ b/share/search/index_strategy/elastic8.py @@ -60,8 +60,11 @@ def index_mappings(self): raise NotImplementedError @abc.abstractmethod - def build_elastic_actions(self, messages_chunk: messages.MessagesChunk) -> typing.Iterable[tuple[int, dict]]: - # yield (message_target_id, elastic_action) pairs + def build_elastic_actions( + self, + messages_chunk: messages.MessagesChunk, + ) -> typing.Iterable[tuple[int, dict | typing.Iterable[dict]]]: + # yield (message_target_id, [elastic_action, ...]) pairs raise NotImplementedError def before_chunk( @@ -148,10 +151,17 @@ def pls_handle_messages_chunk(self, messages_chunk): _indexname = _response_body['_index'] _is_done = _ok or (_op_type == 'delete' and _status == 404) if _is_done: - _action_tracker.action_done(_indexname, _docid) + _finished_message_id = _action_tracker.action_done(_indexname, _docid) + if _finished_message_id is not None: + yield messages.IndexMessageResponse( + is_done=True, + index_message=messages.IndexMessage(messages_chunk.message_type, _finished_message_id), + status_code=HTTPStatus.OK.value, + error_text=None, + ) + _action_tracker.forget_message(_finished_message_id) else: _action_tracker.action_errored(_indexname, _docid) - # yield error responses immediately yield messages.IndexMessageResponse( is_done=False, index_message=messages.IndexMessage( @@ -161,16 +171,14 @@ def pls_handle_messages_chunk(self, messages_chunk): status_code=_status, error_text=str(_response_body), ) - self.after_chunk(messages_chunk, _indexnames) - # yield successes after the whole chunk completes - # (since one message may involve several actions) - for _messageid in _action_tracker.all_done_messages(): + for _message_id in _action_tracker.remaining_done_messages(): yield messages.IndexMessageResponse( is_done=True, - index_message=messages.IndexMessage(messages_chunk.message_type, _messageid), + index_message=messages.IndexMessage(messages_chunk.message_type, _message_id), status_code=HTTPStatus.OK.value, error_text=None, ) + self.after_chunk(messages_chunk, _indexnames) # abstract method from IndexStrategy def pls_make_default_for_searching(self, specific_index: IndexStrategy.SpecificIndex): @@ -202,14 +210,18 @@ def _alias_for_keeping_live(self): def _elastic_actions_with_index(self, messages_chunk, indexnames, action_tracker: _ActionTracker): if not indexnames: raise ValueError('cannot index to no indexes') - for _message_target_id, _elastic_action in self.build_elastic_actions(messages_chunk): - _docid = _elastic_action['_id'] - for _indexname in indexnames: - action_tracker.add_action(_message_target_id, _indexname, _docid) - yield { - **_elastic_action, - '_index': _indexname, - } + for _message_target_id, _elastic_actions in self.build_elastic_actions(messages_chunk): + if isinstance(_elastic_actions, dict): # allow a single action + _elastic_actions = [_elastic_actions] + for _elastic_action in _elastic_actions: + _docid = _elastic_action['_id'] + for _indexname in indexnames: + action_tracker.add_action(_message_target_id, _indexname, _docid) + yield { + **_elastic_action, + '_index': _indexname, + } + action_tracker.done_scheduling(_message_target_id) def _get_indexnames_for_alias(self, alias_name) -> set[str]: try: @@ -371,24 +383,37 @@ class _ActionTracker: default_factory=lambda: collections.defaultdict(set), ) errored_messageids: set[int] = dataclasses.field(default_factory=set) + fully_scheduled_messageids: set[int] = dataclasses.field(default_factory=set) def add_action(self, message_id: int, index_name: str, doc_id: str): self.messageid_by_docid[doc_id] = message_id self.actions_by_messageid[message_id].add((index_name, doc_id)) - def action_done(self, index_name: str, doc_id: str): - _messageid = self.messageid_by_docid[doc_id] - _message_actions = self.actions_by_messageid[_messageid] - _message_actions.discard((index_name, doc_id)) + def action_done(self, index_name: str, doc_id: str) -> int | None: + _messageid = self.get_message_id(doc_id) + _remaining_message_actions = self.actions_by_messageid[_messageid] + _remaining_message_actions.discard((index_name, doc_id)) + # return the message id only if this was the last action for that message + return ( + None + if _remaining_message_actions or (_messageid not in self.fully_scheduled_messageids) + else _messageid + ) def action_errored(self, index_name: str, doc_id: str): _messageid = self.messageid_by_docid[doc_id] self.errored_messageids.add(_messageid) + def done_scheduling(self, message_id: int): + self.fully_scheduled_messageids.add(message_id) + + def forget_message(self, message_id: int): + del self.actions_by_messageid[message_id] + def get_message_id(self, doc_id: str): return self.messageid_by_docid[doc_id] - def all_done_messages(self): + def remaining_done_messages(self): for _messageid, _actions in self.actions_by_messageid.items(): if _messageid not in self.errored_messageids: assert not _actions diff --git a/share/search/index_strategy/trovesearch_denorm.py b/share/search/index_strategy/trovesearch_denorm.py index 8bbbbc7c5..83402272e 100644 --- a/share/search/index_strategy/trovesearch_denorm.py +++ b/share/search/index_strategy/trovesearch_denorm.py @@ -11,6 +11,7 @@ Literal, ) +import celery from django.conf import settings import elasticsearch8 from primitive_metadata import primitive_rdf as rdf @@ -154,15 +155,14 @@ def _paths_and_values_mappings(self): # override method from Elastic8IndexStrategy def after_chunk(self, messages_chunk: messages.MessagesChunk, indexnames: Iterable[str]): - # refresh to avoid delete-by-query conflicts - self.es8_client.indices.refresh(index=','.join(indexnames)) - # delete any docs that belong to cards in this chunk but weren't touched by indexing - self.es8_client.delete_by_query( - index=list(indexnames), - query={'bool': {'must': [ - {'terms': {'card.card_pk': messages_chunk.target_ids_chunk}}, - {'range': {'chunk_timestamp': {'lt': messages_chunk.timestamp}}}, - ]}}, + task__delete_iri_value_scraps.apply_async( + kwargs={ + 'index_strategy_name': self.name, + 'indexnames': list(indexnames), + 'card_pks': messages_chunk.target_ids_chunk, + 'timestamp': messages_chunk.timestamp, + }, + countdown=settings.ELASTICSEARCH['POST_INDEX_DELAY'], ) # abstract method from Elastic8IndexStrategy @@ -173,12 +173,13 @@ def build_elastic_actions(self, messages_chunk: messages.MessagesChunk): _docbuilder = self._SourcedocBuilder(_indexcard_rdf, messages_chunk.timestamp) if not _docbuilder.should_skip(): # if skipped, will be deleted _indexcard_pk = _indexcard_rdf.indexcard_id - for _doc_id, _doc in _docbuilder.build_docs(): - _index_action = self.build_index_action( + yield _indexcard_pk, ( + self.build_index_action( doc_id=_doc_id, doc_source=_doc, ) - yield _indexcard_pk, _index_action + for _doc_id, _doc in _docbuilder.build_docs() + ) _remaining_indexcard_pks.discard(_indexcard_pk) # delete any that were skipped for any reason for _indexcard_pk in _remaining_indexcard_pks: @@ -279,7 +280,10 @@ def should_skip(self) -> bool: def build_docs(self) -> Iterator[tuple[str, dict]]: # index once without `iri_value` - yield self._doc_id(), {'card': self._card_subdoc} + yield self._doc_id(), { + 'card': self._card_subdoc, + 'chunk_timestamp': self.chunk_timestamp, + } for _iri in self._fullwalk.paths_by_iri: yield self._doc_id(_iri), { 'card': self._card_subdoc, @@ -888,3 +892,46 @@ def _any_query(queries: abc.Collection[dict]): (_query,) = queries return _query return {'bool': {'should': list(queries), 'minimum_should_match': 1}} + + +@celery.shared_task( + name='share.search.index_strategy.trovesearch_denorm.task__delete_iri_value_scraps', + max_retries=None, # retries only on delete_by_query conflicts -- should work eventually! + retry_backoff=True, + bind=True, # for explicit retry +) +def task__delete_iri_value_scraps( + task: celery.Task, + index_strategy_name: str, + card_pks: list[int], + indexnames: list[str], + timestamp: int, +): + '''followup task to delete value-docs no longer present + + each time an index-card is updated, value-docs are created (or updated) for each iri value + present in the card's contents -- if some values are absent from a later update, the + corresponding docs will remain untouched + + this task deletes those untouched value-docs after the index has refreshed at its own pace + (allowing a slightly longer delay for items to _stop_ matching queries for removed values) + ''' + from share.search.index_strategy import get_index_strategy + _index_strategy = get_index_strategy(index_strategy_name) + assert isinstance(_index_strategy, Elastic8IndexStrategy) + # delete any docs that belong to cards in this chunk but weren't touched by indexing + _delete_resp = _index_strategy.es8_client.delete_by_query( + index=indexnames, + query={'bool': {'must': [ + {'terms': {'card.card_pk': card_pks}}, + {'range': {'chunk_timestamp': {'lt': timestamp}}}, + ]}}, + params={ + 'slices': 'auto', + 'conflicts': 'proceed', # count conflicts instead of halting + 'request_cache': False, + }, + ) + _conflict_count = _delete_resp.get('version_conflicts', 0) + if _conflict_count > 0: + raise task.retry() diff --git a/share/search/messages.py b/share/search/messages.py index 5ba2e466a..ae6fbc670 100644 --- a/share/search/messages.py +++ b/share/search/messages.py @@ -142,7 +142,7 @@ def __init__(self, *, kombu_message=None): def ack(self): if self.kombu_message is None: raise exceptions.DaemonMessageError('ack! called DaemonMessage.ack() but there is nothing to ack') - return self.kombu_message.ack() + self.kombu_message.ack() def requeue(self): if self.kombu_message is None: diff --git a/tests/share/search/index_strategy/_with_real_services.py b/tests/share/search/index_strategy/_with_real_services.py index 46f133121..11c24594d 100644 --- a/tests/share/search/index_strategy/_with_real_services.py +++ b/tests/share/search/index_strategy/_with_real_services.py @@ -24,6 +24,16 @@ def setUp(self): super().setUp() self.enterContext(mock.patch('share.models.core._setup_user_token_and_groups')) self.index_strategy = self.get_index_strategy() + + def _fake_get_index_strategy(name): + if self.index_strategy.name == name: + return self.index_strategy + raise ValueError(f'unknown index strategy in test: {name}') + + self.enterContext(mock.patch( + 'share.search.index_strategy.get_index_strategy', + new=_fake_get_index_strategy, + )) self.index_messenger = IndexMessenger( celery_app=celery_app, index_strategys=[self.index_strategy], diff --git a/tests/share/search/index_strategy/test_trovesearch_denorm.py b/tests/share/search/index_strategy/test_trovesearch_denorm.py index 60a0e9771..9a94928d3 100644 --- a/tests/share/search/index_strategy/test_trovesearch_denorm.py +++ b/tests/share/search/index_strategy/test_trovesearch_denorm.py @@ -1,9 +1,29 @@ -from share.search.index_strategy.trovesearch_denorm import TrovesearchDenormIndexStrategy +from unittest import mock + +from share.search.index_strategy.trovesearch_denorm import ( + TrovesearchDenormIndexStrategy, + task__delete_iri_value_scraps, +) from . import _common_trovesearch_tests -class TestTroveIndexcardFlats(_common_trovesearch_tests.CommonTrovesearchTests): +class TestTrovesearchDenorm(_common_trovesearch_tests.CommonTrovesearchTests): + def setUp(self): + super().setUp() + + # make the followup delete task eager + def _fake_apply_async(*args, **kwargs): + kwargs['countdown'] = 0 # don't wait + task__delete_iri_value_scraps.apply(*args, **kwargs) + self.enterContext( + mock.patch.object( + task__delete_iri_value_scraps, + 'apply_async', + new=_fake_apply_async, + ) + ) + # for RealElasticTestCase def get_index_strategy(self): return TrovesearchDenormIndexStrategy('test_trovesearch_denorm')