Skip to content

Commit 0ca7b32

Browse files
sampaccoudjoehybird
authored andcommitted
✨ (backend) Add async triggers to enable document indexation with find.
Signed-off-by: Fabre Florian <[email protected]>
1 parent 8220fc1 commit 0ca7b32

File tree

9 files changed

+381
-20
lines changed

9 files changed

+381
-20
lines changed

src/backend/core/api/viewsets.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -980,6 +980,11 @@ def duplicate(self, request, *args, **kwargs):
980980
{"id": str(duplicated_document.id)}, status=status.HTTP_201_CREATED
981981
)
982982

983+
# TODO
984+
# @drf.decorators.action(detail=False, methods=["get"])
985+
# def search(self, request, *args, **kwargs):
986+
# index.search()
987+
983988
@drf.decorators.action(detail=True, methods=["get"], url_path="versions")
984989
def versions_list(self, request, *args, **kwargs):
985990
"""

src/backend/core/management/commands/index.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ def handle(self, *args, **options):
2525
FindDocumentIndexer().index()
2626

2727
duration = time.perf_counter() - start
28-
logger.info(f"Search index regenerated in {duration:.2f} seconds.")
28+
logger.info("Search index regenerated in %.2f seconds.", duration)

src/backend/core/models.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
from django.core.files.storage import default_storage
2121
from django.core.mail import send_mail
2222
from django.db import models, transaction
23+
from django.db.models import signals
2324
from django.db.models.functions import Left, Length
25+
from django.dispatch import receiver
2426
from django.template.loader import render_to_string
2527
from django.utils import timezone
2628
from django.utils.functional import cached_property
@@ -39,6 +41,7 @@
3941
RoleChoices,
4042
get_equivalent_link_definition,
4143
)
44+
from .tasks.find import trigger_document_indexer
4245

4346
logger = getLogger(__name__)
4447

@@ -949,6 +952,16 @@ def restore(self):
949952
)
950953

951954

955+
@receiver(signals.post_save, sender=Document)
956+
def document_post_save(sender, instance, **kwargs):
957+
"""
958+
Asynchronous call to the document indexer at the end of the transaction.
959+
Note : Within the transaction we can have an empty content and a serialization
960+
error.
961+
"""
962+
trigger_document_indexer(instance, on_commit=True)
963+
964+
952965
class LinkTrace(BaseModel):
953966
"""
954967
Relation model to trace accesses to a document via a link by a logged-in user.
@@ -1174,6 +1187,15 @@ def get_abilities(self, user):
11741187
}
11751188

11761189

1190+
@receiver(signals.post_save, sender=DocumentAccess)
1191+
def document_access_post_save(sender, instance, created, **kwargs):
1192+
"""
1193+
Asynchronous call to the document indexer at the end of the transaction.
1194+
"""
1195+
if not created:
1196+
trigger_document_indexer(instance.document, on_commit=True)
1197+
1198+
11771199
class DocumentAskForAccess(BaseModel):
11781200
"""Relation model to ask for access to a document."""
11791201

src/backend/core/services/search_indexers.py

Lines changed: 101 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from collections import defaultdict
66

77
from django.conf import settings
8+
from django.contrib.auth.models import AnonymousUser
89

910
import requests
1011

@@ -18,11 +19,13 @@ def get_batch_accesses_by_users_and_teams(paths):
1819
Get accesses related to a list of document paths,
1920
grouped by users and teams, including all ancestor paths.
2021
"""
21-
print("paths: ", paths)
22-
ancestor_map = utils.get_ancestor_to_descendants_map(paths, steplen=models.Document.steplen)
22+
# print("paths: ", paths)
23+
ancestor_map = utils.get_ancestor_to_descendants_map(
24+
paths, steplen=models.Document.steplen
25+
)
2326
ancestor_paths = list(ancestor_map.keys())
24-
print("ancestor map: ", ancestor_map)
25-
print("ancestor paths: ", ancestor_paths)
27+
# print("ancestor map: ", ancestor_map)
28+
# print("ancestor paths: ", ancestor_paths)
2629

2730
access_qs = models.DocumentAccess.objects.filter(
2831
document__path__in=ancestor_paths
@@ -44,6 +47,22 @@ def get_batch_accesses_by_users_and_teams(paths):
4447
return dict(access_by_document_path)
4548

4649

50+
def get_visited_document_ids_of(user):
51+
if isinstance(user, AnonymousUser):
52+
return []
53+
54+
# TODO : exclude links when user already have a specific access to the doc
55+
qs = models.LinkTrace.objects.filter(
56+
user=user
57+
).exclude(
58+
document__accesses__user=user,
59+
)
60+
61+
return list({
62+
str(id) for id in qs.values_list("document_id", flat=True)
63+
})
64+
65+
4766
class BaseDocumentIndexer(ABC):
4867
"""
4968
Base class for document indexers.
@@ -84,6 +103,7 @@ def index(self):
84103
serialized_batch = [
85104
self.serialize_document(document, accesses_by_document_path)
86105
for document in documents_batch
106+
if document.content
87107
]
88108
self.push(serialized_batch)
89109

@@ -103,6 +123,38 @@ def push(self, data):
103123
Must be implemented by subclasses.
104124
"""
105125

126+
def search(self, text, user, token):
127+
"""
128+
Search for documents in Find app.
129+
"""
130+
visited_ids = get_visited_document_ids_of(user)
131+
132+
response = self.search_query(data={
133+
"q": text,
134+
"visited": visited_ids,
135+
"services": ["docs"],
136+
}, token=token)
137+
138+
print(response)
139+
140+
return self.format_response(response)
141+
142+
@abstractmethod
143+
def search_query(self, data, token) -> dict:
144+
"""
145+
Retreive documents from the Find app API.
146+
147+
Must be implemented by subclasses.
148+
"""
149+
150+
@abstractmethod
151+
def format_response(self, data: dict):
152+
"""
153+
Convert the JSON response from Find app as document queryset.
154+
155+
Must be implemented by subclasses.
156+
"""
157+
106158

107159
class FindDocumentIndexer(BaseDocumentIndexer):
108160
"""
@@ -121,10 +173,12 @@ def serialize_document(self, document, accesses):
121173
dict: A JSON-serializable dictionary.
122174
"""
123175
doc_path = document.path
124-
text_content = utils.base64_yjs_to_text(document.content)
176+
doc_content = document.content
177+
text_content = utils.base64_yjs_to_text(doc_content) if doc_content else ""
178+
125179
return {
126180
"id": str(document.id),
127-
"title": document.title,
181+
"title": document.title or "",
128182
"content": text_content,
129183
"depth": document.depth,
130184
"path": document.path,
@@ -138,6 +192,46 @@ def serialize_document(self, document, accesses):
138192
"is_active": not bool(document.ancestors_deleted_at),
139193
}
140194

195+
def search_query(self, data, token) -> requests.Response:
196+
"""
197+
Retrieve documents from the Find app API.
198+
199+
Args:
200+
data (dict): search data
201+
token (str): OICD token
202+
203+
Returns:
204+
dict: A JSON-serializable dictionary.
205+
"""
206+
url = getattr(settings, "SEARCH_INDEXER_QUERY_URL", None)
207+
208+
if not url:
209+
raise RuntimeError(
210+
"SEARCH_INDEXER_QUERY_URL must be set in Django settings before indexing."
211+
)
212+
213+
try:
214+
response = requests.post(
215+
url,
216+
json=data,
217+
headers={"Authorization": f"Bearer {token}"},
218+
timeout=10,
219+
)
220+
response.raise_for_status()
221+
return response.json()
222+
except requests.exceptions.HTTPError as e:
223+
logger.error("HTTPError: %s", e)
224+
logger.error("Response content: %s", response.text) # type: ignore
225+
raise
226+
227+
def format_response(self, data: dict):
228+
"""
229+
Retrieve documents ids from Find app response and return a queryset.
230+
"""
231+
return models.Document.objects.filter(pk__in=[
232+
d['_id'] for d in data
233+
])
234+
141235
def push(self, data):
142236
"""
143237
Push a batch of documents to the Find backend.
@@ -156,6 +250,7 @@ def push(self, data):
156250
raise RuntimeError(
157251
"SEARCH_INDEXER_SECRET must be set in Django settings before indexing."
158252
)
253+
159254
try:
160255
response = requests.post(
161256
url,

src/backend/core/tasks/find.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""Trigger document indexation using celery task."""
2+
3+
from logging import getLogger
4+
5+
from django.conf import settings
6+
from django.core.cache import cache
7+
from django.db import transaction
8+
9+
from core import models
10+
from core.services.search_indexers import (
11+
FindDocumentIndexer,
12+
get_batch_accesses_by_users_and_teams,
13+
)
14+
15+
from impress.celery_app import app
16+
17+
logger = getLogger(__file__)
18+
19+
20+
def document_indexer_debounce_key(document_id):
21+
"""Returns debounce cache key"""
22+
return f"doc-indexer-debounce-{document_id}"
23+
24+
25+
def incr_counter(key):
26+
"""Increase or reset counter"""
27+
try:
28+
return cache.incr(key)
29+
except ValueError:
30+
cache.set(key, 1)
31+
return 1
32+
33+
34+
def decr_counter(key):
35+
"""Decrease or reset counter"""
36+
try:
37+
return cache.decr(key)
38+
except ValueError:
39+
cache.set(key, 0)
40+
return 0
41+
42+
43+
@app.task
44+
def document_indexer_task(document_id):
45+
"""Send indexation query for a document using celery task."""
46+
key = document_indexer_debounce_key(document_id)
47+
48+
# check if the counter : if still up, skip the task. only the last one
49+
# within the countdown delay will do the query.
50+
if decr_counter(key) > 0:
51+
logger.info("Skip document %s indexation", document_id)
52+
return
53+
54+
doc = models.Document.objects.get(pk=document_id)
55+
indexer = FindDocumentIndexer()
56+
accesses = get_batch_accesses_by_users_and_teams((doc.path,))
57+
58+
data = indexer.serialize_document(document=doc, accesses=accesses)
59+
60+
logger.info("Start document %s indexation", document_id)
61+
indexer.push(data)
62+
63+
64+
def trigger_document_indexer(document, on_commit=False):
65+
"""
66+
Trigger indexation task with debounce a delay set by the SEARCH_INDEXER_COUNTDOWN setting.
67+
68+
Args:
69+
document (Document): The document instance.
70+
on_commit (bool): Wait for the end of the transaction before starting the task
71+
(some fields may be in wrong state within the transaction)
72+
"""
73+
74+
if document.deleted_at or document.ancestors_deleted_at:
75+
pass
76+
77+
if on_commit:
78+
79+
def _aux():
80+
trigger_document_indexer(document, on_commit=False)
81+
82+
transaction.on_commit(_aux)
83+
else:
84+
key = document_indexer_debounce_key(document.pk)
85+
countdown = getattr(settings, "SEARCH_INDEXER_COUNTDOWN", 1)
86+
87+
logger.info(
88+
"Add task for document %s indexation in %.2f seconds",
89+
document.pk, countdown
90+
)
91+
92+
# Each time this method is called during the countdown, we increment the
93+
# counter and each task decrease it, so the index be run only once.
94+
incr_counter(key)
95+
96+
document_indexer_task.apply_async(args=[document.pk], countdown=countdown)

0 commit comments

Comments
 (0)