Skip to content

Commit 7a8cf44

Browse files
authored
⚡(search) use bulk API and prefetching for reindex_all (#595)
The previous implementation indexed each thread and message with individual OpenSearch HTTP calls and triggered N+1 DB queries for accesses, recipients and sender lookups. This rewrites reindex_all to batch documents via opensearchpy.helpers.bulk and prefetch related objects in a single queryset, drastically reducing both DB round-trips and HTTP overhead.
1 parent 1767e17 commit 7a8cf44

File tree

5 files changed

+532
-238
lines changed

5 files changed

+532
-238
lines changed

src/backend/core/management/commands/search_reindex.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from core.services.search import create_index_if_not_exists, delete_index
99
from core.services.search.tasks import (
1010
_reindex_all_base,
11+
_reindex_mailbox_base,
1112
reindex_all,
1213
reindex_mailbox_task,
1314
reindex_thread_task,
@@ -152,7 +153,15 @@ def _reindex_mailbox(self, mailbox_id, async_mode):
152153
self.style.SUCCESS(f"Reindexing task scheduled (ID: {task.id})")
153154
)
154155
else:
155-
result = reindex_mailbox_task(mailbox_id) # pylint: disable=no-value-for-parameter
156+
157+
def update_progress(current, total, success_count, failure_count):
158+
"""Update progress in the console."""
159+
self.stdout.write(
160+
f"Progress: {current}/{total} threads processed "
161+
f"({success_count} succeeded, {failure_count} failed)"
162+
)
163+
164+
result = _reindex_mailbox_base(str(mailbox_uuid), update_progress)
156165
self.stdout.write(
157166
self.style.SUCCESS(
158167
f"Reindexing completed: {result.get('success_count', 0)} succeeded, "

src/backend/core/models.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,10 +1306,8 @@ def thread_unread_filter(user, mailbox_id=None):
13061306

13071307
@staticmethod
13081308
def unread_filter():
1309-
"""Return a `Q` for filtering a `ThreadAccess` queryset to unread entries.
1310-
1311-
Used by `MailboxSerializer._get_cached_counts()` and
1312-
`compute_unread_mailboxes()` in the search index.
1309+
"""
1310+
Return a `Q` for filtering a `ThreadAccess` queryset to unread entries.
13131311
"""
13141312
return Q(read_at__isnull=True, thread__messaged_at__isnull=False) | Q(
13151313
read_at__lt=F("thread__messaged_at")
@@ -1347,7 +1345,7 @@ def thread_starred_filter(user, mailbox_id=None):
13471345
def starred_filter():
13481346
"""Return a `Q` for filtering a `ThreadAccess` queryset to starred entries.
13491347
1350-
Used by `compute_starred_mailboxes()` in the search index.
1348+
Used by `_compute_unread_starred_from_accesses()` in the search index.
13511349
"""
13521350
return Q(starred_at__isnull=False)
13531351

0 commit comments

Comments
 (0)