Skip to content

Commit 99f3eb2

Browse files
committed
fixfix: correct misunderstanding, handle conflicts
- messages can _only_ be ack'd thru the channel they were received by -- trying to recover a channel to ack thru does nothing - handle easily-detectable delete_by_query conflicts (so the indexer doesn't fall over each time)
1 parent c85fb0d commit 99f3eb2

File tree

3 files changed

+29
-37
lines changed

3 files changed

+29
-37
lines changed

share/search/daemon.py

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import threading
99
import time
1010

11-
import amqp.exceptions
1211
from django.conf import settings
1312
import kombu
1413
from kombu.mixins import ConsumerMixin
@@ -61,8 +60,6 @@ def start_daemonthreads_for_strategy(self, index_strategy):
6160
index_strategy=index_strategy,
6261
message_callback=_daemon.on_message,
6362
)
64-
# give the daemon a more robust callback for ack-ing
65-
_daemon.ack_callback = _consumer.ensure_ack
6663
# spin up daemonthreads, ready for messages
6764
self._daemonthreads.extend(_daemon.start())
6865
# start a thread to consume messages from this strategy's queues
@@ -130,28 +127,9 @@ def consume(self, *args, **kwargs):
130127
consume = self.connection.ensure(self.connection, super().consume)
131128
return consume(*args, **kwargs)
132129

133-
def ensure_ack(self, daemon_message: messages.DaemonMessage):
134-
# if the connection the message came thru is no longer usable,
135-
# use `kombu.Connection.autoretry` to revive it for an ack
136-
try:
137-
daemon_message.ack()
138-
except (ConnectionError, amqp.exceptions.ConnectionError):
139-
@self.connection.autoretry
140-
def _do_ack(*, channel):
141-
try:
142-
channel.basic_ack(daemon_message.kombu_message.delivery_tag)
143-
finally:
144-
channel.close()
145-
_do_ack()
146-
147-
148-
def _default_ack_callback(daemon_message: messages.DaemonMessage) -> None:
149-
daemon_message.ack()
150-
151130

152131
class IndexerDaemon:
153132
MAX_LOCAL_QUEUE_SIZE = 5000
154-
ack_callback: Callable[[messages.DaemonMessage], None]
155133

156134
def __init__(self, index_strategy, *, stop_event=None, daemonthread_context=None):
157135
self.stop_event = (
@@ -163,7 +141,6 @@ def __init__(self, index_strategy, *, stop_event=None, daemonthread_context=None
163141
self.__daemonthread_context = daemonthread_context or contextlib.nullcontext
164142
self.__local_message_queues = {}
165143
self.__started = False
166-
self.ack_callback = _default_ack_callback
167144

168145
def start(self) -> list[threading.Thread]:
169146
if self.__started:
@@ -192,7 +169,6 @@ def start_typed_loop_and_queue(self, message_type) -> threading.Thread:
192169
local_message_queue=_queue_from_rabbit_to_daemon,
193170
log_prefix=f'{repr(self)} MessageHandlingLoop: ',
194171
daemonthread_context=self.__daemonthread_context,
195-
ack_callback=self.ack_callback,
196172
)
197173
return _handling_loop.start_thread()
198174

@@ -226,7 +202,6 @@ class MessageHandlingLoop:
226202
local_message_queue: queue.Queue
227203
log_prefix: str
228204
daemonthread_context: Callable[[], contextlib.AbstractContextManager]
229-
ack_callback: Callable[[messages.DaemonMessage], None]
230205
_leftover_daemon_messages_by_target_id = None
231206

232207
def __post_init__(self):
@@ -310,7 +285,7 @@ def _handle_some_messages(self):
310285
sentry_sdk.capture_message('error handling message', extras={'message_response': message_response})
311286
target_id = message_response.index_message.target_id
312287
for daemon_message in daemon_messages_by_target_id.pop(target_id, ()):
313-
self.ack_callback(daemon_message)
288+
daemon_message.ack() # finally set it free
314289
if daemon_messages_by_target_id: # should be empty by now
315290
logger.error('%sUnhandled messages?? %s', self.log_prefix, len(daemon_messages_by_target_id))
316291
sentry_sdk.capture_message(

share/search/index_strategy/trovesearch_denorm.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from collections import abc, defaultdict
33
import dataclasses
44
import functools
5+
import itertools
56
import json
67
import logging
78
import re
@@ -154,16 +155,25 @@ def _paths_and_values_mappings(self):
154155

155156
# override method from Elastic8IndexStrategy
156157
def after_chunk(self, messages_chunk: messages.MessagesChunk, indexnames: Iterable[str]):
157-
# refresh to avoid delete-by-query conflicts
158-
self.es8_client.indices.refresh(index=','.join(indexnames))
159-
# delete any docs that belong to cards in this chunk but weren't touched by indexing
160-
self.es8_client.delete_by_query(
161-
index=list(indexnames),
162-
query={'bool': {'must': [
163-
{'terms': {'card.card_pk': messages_chunk.target_ids_chunk}},
164-
{'range': {'chunk_timestamp': {'lt': messages_chunk.timestamp}}},
165-
]}},
166-
)
158+
for _trycount in itertools.count(1): # keep trying until it works
159+
# delete any docs that belong to cards in this chunk but weren't touched by indexing
160+
_delete_resp = self.es8_client.delete_by_query(
161+
index=list(indexnames),
162+
query={'bool': {'must': [
163+
{'terms': {'card.card_pk': messages_chunk.target_ids_chunk}},
164+
{'range': {'chunk_timestamp': {'lt': messages_chunk.timestamp}}},
165+
]}},
166+
params={
167+
'slices': 'auto',
168+
'conflicts': 'proceed', # count conflicts instead of halting
169+
},
170+
)
171+
if _delete_resp.get('version_conflicts', 0):
172+
# refresh to avoid further conflicts and try again
173+
self.es8_client.indices.refresh(index=','.join(indexnames))
174+
else: # success!
175+
logger.debug('%s: after_chunk succeeded after %s tries', (self, _trycount))
176+
return
167177

168178
# abstract method from Elastic8IndexStrategy
169179
def build_elastic_actions(self, messages_chunk: messages.MessagesChunk):

share/search/messages.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import time
77
import typing
88

9+
import amqp.exceptions
10+
911
from share.search import exceptions
1012
from share.util import chunked
1113

@@ -142,7 +144,12 @@ def __init__(self, *, kombu_message=None):
142144
def ack(self):
143145
if self.kombu_message is None:
144146
raise exceptions.DaemonMessageError('ack! called DaemonMessage.ack() but there is nothing to ack')
145-
return self.kombu_message.ack()
147+
try:
148+
self.kombu_message.ack()
149+
except (ConnectionError, amqp.exceptions.ConnectionError):
150+
# acks must be on the same channel the message was received on --
151+
# if the channel failed, oh well, the message already got requeued
152+
pass
146153

147154
def requeue(self):
148155
if self.kombu_message is None:

0 commit comments

Comments
 (0)