From b8748da1ce53f761574a136898c73686a597e1b0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 2 Feb 2022 14:54:56 -0500 Subject: [PATCH 01/11] Split search function in half. --- synapse/handlers/search.py | 74 +++++++++++++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 41cb80907893..42f2f257b8aa 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -100,7 +100,7 @@ async def search( """Performs a full text search for a user. Args: - user + user: The user performing the search. content: Search parameters batch: The next_batch parameter. Used for pagination. @@ -156,6 +156,7 @@ async def search( # Include context around each event? event_context = room_cat.get("event_context", None) + before_limit = after_limit = include_profile = None # Group results together? May allow clients to paginate within a # group @@ -182,6 +183,73 @@ async def search( % (set(group_keys) - {"room_id", "sender"},), ) + return await self._search( + user, + batch_group, + batch_group_key, + batch_token, + search_term, + keys, + filter_dict, + order_by, + include_state, + group_keys, + event_context, + before_limit, + after_limit, + include_profile, + ) + + async def _search( + self, + user: UserID, + batch_group: Optional[str], + batch_group_key: Optional[str], + batch_token: Optional[str], + search_term: str, + keys: List[str], + filter_dict: JsonDict, + order_by: str, + include_state: bool, + group_keys: List[str], + event_context: Optional[bool], + before_limit: Optional[int], + after_limit: Optional[int], + include_profile: Optional[bool], + ) -> JsonDict: + """Performs a full text search for a user. + + Args: + user: The user performing the search. + batch_group: Pagination information. + batch_group_key: Pagination information. + batch_token: Pagination information. + search_term: Search term to search for + keys: List of keys to search in, currently supports + "content.body", "content.name", "content.topic" + filter_dict: The JSON to build a filter out of. + order_by: How to order the results. Valid values ore "rank" and "recent". + include_state: True if the state of the room at each result should + be included. + group_keys: A list of ways to group the results. Valid values are + "room_id" and "sender". + event_context: True to include contextual events around results. + before_limit: + The number of events before a result to include as context. + + Only used if event_context is True. + after_limit: + The number of events after a result to include as context. + + Only used if event_context is True. + include_profile: True if historical profile information should be + included in the event context. + + Only used if event_context is True. + + Returns: + dict to be returned to the client with results of search + """ search_filter = Filter(self.hs, filter_dict) # TODO: Search through left rooms too @@ -353,6 +421,10 @@ async def search( if event_context is not None: now_token = self.hs.get_event_sources().get_current_token() + # Note that before and after limit must be set in this case. + assert before_limit is not None + assert after_limit is not None + contexts = {} for event in allowed_events: res = await self.store.get_events_around( From 47dcbc028ff420e48ed5713717d47ffbdc632286 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 2 Feb 2022 15:36:33 -0500 Subject: [PATCH 02/11] Split out search methods. --- synapse/handlers/search.py | 369 +++++++++++++++-------- synapse/storage/databases/main/search.py | 17 +- 2 files changed, 257 insertions(+), 129 deletions(-) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 42f2f257b8aa..3d77266d7eb5 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -14,7 +14,7 @@ import itertools import logging -from typing import TYPE_CHECKING, Dict, Iterable, List, Optional +from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple from unpaddedbase64 import decode_base64, encode_base64 @@ -284,131 +284,42 @@ async def _search( } } - rank_map = {} # event_id -> rank of event - allowed_events = [] - # Holds result of grouping by room, if applicable - room_groups: Dict[str, JsonDict] = {} - # Holds result of grouping by sender, if applicable - sender_group: Dict[str, JsonDict] = {} - - # Holds the next_batch for the entire result set if one of those exists - global_next_batch = None - - highlights = set() - - count = None + sender_group: Optional[Dict[str, JsonDict]] if order_by == "rank": - search_result = await self.store.search_msgs(room_ids, search_term, keys) - - count = search_result["count"] - - if search_result["highlights"]: - highlights.update(search_result["highlights"]) - - results = search_result["results"] - - rank_map.update({r["event"].event_id: r["rank"] for r in results}) - - filtered_events = await search_filter.filter([r["event"] for r in results]) - - events = await filter_events_for_client( - self.storage, user.to_string(), filtered_events + ( + count, + rank_map, + allowed_events, + room_groups, + highlights, + sender_group, + ) = await self._search_by_rank( + user, room_ids, search_term, keys, search_filter ) - - events.sort(key=lambda e: -rank_map[e.event_id]) - allowed_events = events[: search_filter.limit] - - for e in allowed_events: - rm = room_groups.setdefault( - e.room_id, {"results": [], "order": rank_map[e.event_id]} - ) - rm["results"].append(e.event_id) - - s = sender_group.setdefault( - e.sender, {"results": [], "order": rank_map[e.event_id]} - ) - s["results"].append(e.event_id) + # Unused return values for rank search. + global_next_batch = None elif order_by == "recent": - room_events: List[EventBase] = [] - i = 0 - - pagination_token = batch_token - - # We keep looping and we keep filtering until we reach the limit - # or we run out of things. - # But only go around 5 times since otherwise synapse will be sad. - while len(room_events) < search_filter.limit and i < 5: - i += 1 - search_result = await self.store.search_rooms( - room_ids, - search_term, - keys, - search_filter.limit * 2, - pagination_token=pagination_token, - ) - - if search_result["highlights"]: - highlights.update(search_result["highlights"]) - - count = search_result["count"] - - results = search_result["results"] - - results_map = {r["event"].event_id: r for r in results} - - rank_map.update({r["event"].event_id: r["rank"] for r in results}) - - filtered_events = await search_filter.filter( - [r["event"] for r in results] - ) - - events = await filter_events_for_client( - self.storage, user.to_string(), filtered_events - ) - - room_events.extend(events) - room_events = room_events[: search_filter.limit] - - if len(results) < search_filter.limit * 2: - pagination_token = None - break - else: - pagination_token = results[-1]["pagination_token"] - - for event in room_events: - group = room_groups.setdefault(event.room_id, {"results": []}) - group["results"].append(event.event_id) - - if room_events and len(room_events) >= search_filter.limit: - last_event_id = room_events[-1].event_id - pagination_token = results_map[last_event_id]["pagination_token"] - - # We want to respect the given batch group and group keys so - # that if people blindly use the top level `next_batch` token - # it returns more from the same group (if applicable) rather - # than reverting to searching all results again. - if batch_group and batch_group_key: - global_next_batch = encode_base64( - ( - "%s\n%s\n%s" - % (batch_group, batch_group_key, pagination_token) - ).encode("ascii") - ) - else: - global_next_batch = encode_base64( - ("%s\n%s\n%s" % ("all", "", pagination_token)).encode("ascii") - ) - - for room_id, group in room_groups.items(): - group["next_batch"] = encode_base64( - ("%s\n%s\n%s" % ("room_id", room_id, pagination_token)).encode( - "ascii" - ) - ) - - allowed_events.extend(room_events) + ( + count, + rank_map, + allowed_events, + room_groups, + highlights, + global_next_batch, + ) = await self._search_by_recent( + user, + room_ids, + search_term, + keys, + search_filter, + batch_group, + batch_group_key, + batch_token, + ) + # Unused return values for recent search. + sender_group = None else: # We should never get here due to the guard earlier. @@ -538,7 +449,7 @@ async def _search( } ) - rooms_cat_res = { + rooms_cat_res: JsonDict = { "results": results, "count": count, "highlights": list(highlights), @@ -563,3 +474,217 @@ async def _search( rooms_cat_res["next_batch"] = global_next_batch return {"search_categories": {"room_events": rooms_cat_res}} + + async def _search_by_rank( + self, + user: UserID, + room_ids: Collection[str], + search_term: str, + keys: Iterable[str], + search_filter: Filter, + ) -> Tuple[ + int, + Dict[str, int], + List[EventBase], + Dict[str, JsonDict], + Set[str], + Dict[str, JsonDict], + ]: + """ + Performs a full text search for a user ordering by rank. + + Args: + user: The user performing the search. + room_ids: List of room ids to search in + search_term: Search term to search for + keys: List of keys to search in, currently supports + "content.body", "content.name", "content.topic" + search_filter: The event filter to use. + + Returns: + A tuple of: + The count of results. + A mapping of event ID to the rank of that event. + A list of the resulting events. + A map of room ID to results. + A set of event IDs to highlight. + A map of sender ID to results. + """ + rank_map = {} # event_id -> rank of event + # Holds result of grouping by room, if applicable + room_groups: Dict[str, JsonDict] = {} + # Holds result of grouping by sender, if applicable + sender_group: Dict[str, JsonDict] = {} + + highlights = set() + + search_result = await self.store.search_msgs(room_ids, search_term, keys) + + count = search_result["count"] + + if search_result["highlights"]: + highlights.update(search_result["highlights"]) + + results = search_result["results"] + + results_map = {r["event"].event_id: r for r in results} + + rank_map.update({r["event"].event_id: r["rank"] for r in results}) + + filtered_events = await search_filter.filter([r["event"] for r in results]) + + events = await filter_events_for_client( + self.storage, user.to_string(), filtered_events + ) + + events.sort(key=lambda e: -rank_map[e.event_id]) + allowed_events = events[: search_filter.limit] + + for e in allowed_events: + rm = room_groups.setdefault( + e.room_id, {"results": [], "order": rank_map[e.event_id]} + ) + rm["results"].append(e.event_id) + + s = sender_group.setdefault( + e.sender, {"results": [], "order": rank_map[e.event_id]} + ) + s["results"].append(e.event_id) + + return count, rank_map, allowed_events, room_groups, highlights, sender_group + + async def _search_by_recent( + self, + user: UserID, + room_ids: Collection[str], + search_term: str, + keys: Iterable[str], + search_filter: Filter, + batch_group: Optional[str], + batch_group_key: Optional[str], + batch_token: Optional[str], + ) -> Tuple[ + int, + Dict[str, int], + List[EventBase], + Dict[str, JsonDict], + Set[str], + Optional[str], + ]: + """ + Performs a full text search for a user ordering by recent. + + Args: + user: The user performing the search. + room_ids: List of room ids to search in + search_term: Search term to search for + keys: List of keys to search in, currently supports + "content.body", "content.name", "content.topic" + search_filter: The event filter to use. + batch_group: Pagination information. + batch_group_key: Pagination information. + batch_token: Pagination information. + + Returns: + A tuple of: + The count of results. + A mapping of event ID to the rank of that event. + A list of the resulting events. + A map of room ID to results. + A set of event IDs to highlight. + Optionally, a pagination token. + """ + rank_map = {} # event_id -> rank of event + allowed_events: List[EventBase] = [] + # Holds result of grouping by room, if applicable + room_groups: Dict[str, JsonDict] = {} + + # Holds the next_batch for the entire result set if one of those exists + global_next_batch = None + + highlights = set() + + room_events: List[EventBase] = [] + i = 0 + + pagination_token = batch_token + + # We keep looping and we keep filtering until we reach the limit + # or we run out of things. + # But only go around 5 times since otherwise synapse will be sad. + while len(room_events) < search_filter.limit and i < 5: + i += 1 + search_result = await self.store.search_rooms( + room_ids, + search_term, + keys, + search_filter.limit * 2, + pagination_token=pagination_token, + ) + + if search_result["highlights"]: + highlights.update(search_result["highlights"]) + + count = search_result["count"] + + results = search_result["results"] + + results_map = {r["event"].event_id: r for r in results} + + rank_map.update({r["event"].event_id: r["rank"] for r in results}) + + filtered_events = await search_filter.filter([r["event"] for r in results]) + + events = await filter_events_for_client( + self.storage, user.to_string(), filtered_events + ) + + room_events.extend(events) + room_events = room_events[: search_filter.limit] + + if len(results) < search_filter.limit * 2: + pagination_token = None + break + else: + pagination_token = results[-1]["pagination_token"] + + for event in room_events: + group = room_groups.setdefault(event.room_id, {"results": []}) + group["results"].append(event.event_id) + + if room_events and len(room_events) >= search_filter.limit: + last_event_id = room_events[-1].event_id + pagination_token = results_map[last_event_id]["pagination_token"] + + # We want to respect the given batch group and group keys so + # that if people blindly use the top level `next_batch` token + # it returns more from the same group (if applicable) rather + # than reverting to searching all results again. + if batch_group and batch_group_key: + global_next_batch = encode_base64( + ( + "%s\n%s\n%s" % (batch_group, batch_group_key, pagination_token) + ).encode("ascii") + ) + else: + global_next_batch = encode_base64( + ("%s\n%s\n%s" % ("all", "", pagination_token)).encode("ascii") + ) + + for room_id, group in room_groups.items(): + group["next_batch"] = encode_base64( + ("%s\n%s\n%s" % ("room_id", room_id, pagination_token)).encode( + "ascii" + ) + ) + + allowed_events.extend(room_events) + + return ( + count, + rank_map, + allowed_events, + room_groups, + highlights, + global_next_batch, + ) diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index 2d085a5764a7..acea300ed343 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -28,6 +28,7 @@ ) from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.types import JsonDict if TYPE_CHECKING: from synapse.server import HomeServer @@ -381,17 +382,19 @@ def __init__( ): super().__init__(database, db_conn, hs) - async def search_msgs(self, room_ids, search_term, keys): + async def search_msgs( + self, room_ids: Collection[str], search_term: str, keys: Iterable[str] + ) -> JsonDict: """Performs a full text search over events with given keys. Args: - room_ids (list): List of room ids to search in - search_term (str): Search term to search for - keys (list): List of keys to search in, currently supports + room_ids: List of room ids to search in + search_term: Search term to search for + keys: List of keys to search in, currently supports "content.body", "content.name", "content.topic" Returns: - list of dicts + Dictionary of results """ clauses = [] @@ -499,10 +502,10 @@ async def search_rooms( self, room_ids: Collection[str], search_term: str, - keys: List[str], + keys: Iterable[str], limit, pagination_token: Optional[str] = None, - ) -> List[dict]: + ) -> JsonDict: """Performs a full text search over events with given keys. Args: From 0185e2967aee88de27217c14af591c15b05af8df Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 2 Feb 2022 15:41:58 -0500 Subject: [PATCH 03/11] Split out calculating contexts. --- synapse/handlers/search.py | 159 ++++++++++++++++++++++--------------- 1 file changed, 95 insertions(+), 64 deletions(-) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 3d77266d7eb5..16295bf78de0 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -156,7 +156,8 @@ async def search( # Include context around each event? event_context = room_cat.get("event_context", None) - before_limit = after_limit = include_profile = None + before_limit = after_limit = None + include_profile = False # Group results together? May allow clients to paginate within a # group @@ -215,7 +216,7 @@ async def _search( event_context: Optional[bool], before_limit: Optional[int], after_limit: Optional[int], - include_profile: Optional[bool], + include_profile: bool, ) -> JsonDict: """Performs a full text search for a user. @@ -330,72 +331,13 @@ async def _search( # If client has asked for "context" for each event (i.e. some surrounding # events and state), fetch that if event_context is not None: - now_token = self.hs.get_event_sources().get_current_token() - # Note that before and after limit must be set in this case. assert before_limit is not None assert after_limit is not None - contexts = {} - for event in allowed_events: - res = await self.store.get_events_around( - event.room_id, event.event_id, before_limit, after_limit - ) - - logger.info( - "Context for search returned %d and %d events", - len(res.events_before), - len(res.events_after), - ) - - events_before = await filter_events_for_client( - self.storage, user.to_string(), res.events_before - ) - - events_after = await filter_events_for_client( - self.storage, user.to_string(), res.events_after - ) - - context = { - "events_before": events_before, - "events_after": events_after, - "start": await now_token.copy_and_replace( - "room_key", res.start - ).to_string(self.store), - "end": await now_token.copy_and_replace( - "room_key", res.end - ).to_string(self.store), - } - - if include_profile: - senders = { - ev.sender - for ev in itertools.chain(events_before, [event], events_after) - } - - if events_after: - last_event_id = events_after[-1].event_id - else: - last_event_id = event.event_id - - state_filter = StateFilter.from_types( - [(EventTypes.Member, sender) for sender in senders] - ) - - state = await self.state_store.get_state_for_event( - last_event_id, state_filter - ) - - context["profile_info"] = { - s.state_key: { - "displayname": s.content.get("displayname", None), - "avatar_url": s.content.get("avatar_url", None), - } - for s in state.values() - if s.type == EventTypes.Member and s.state_key in senders - } - - contexts[event.event_id] = context + contexts = await self._calculate_event_contexts( + user, allowed_events, before_limit, after_limit, include_profile + ) else: contexts = {} @@ -688,3 +630,92 @@ async def _search_by_recent( highlights, global_next_batch, ) + + async def _calculate_event_contexts( + self, + user: UserID, + allowed_events: List[EventBase], + before_limit: int, + after_limit: int, + include_profile: bool, + ) -> Dict[str, JsonDict]: + """ + Calculates the contextual events for any search results. + + Args: + user: The user performing the search. + allowed_events: The search results. + before_limit: + The number of events before a result to include as context. + after_limit: + The number of events after a result to include as context. + include_profile: True if historical profile information should be + included in the event context. + + Returns: + A map of event ID to contextual information. + """ + now_token = self.hs.get_event_sources().get_current_token() + + contexts = {} + for event in allowed_events: + res = await self.store.get_events_around( + event.room_id, event.event_id, before_limit, after_limit + ) + + logger.info( + "Context for search returned %d and %d events", + len(res.events_before), + len(res.events_after), + ) + + events_before = await filter_events_for_client( + self.storage, user.to_string(), res.events_before + ) + + events_after = await filter_events_for_client( + self.storage, user.to_string(), res.events_after + ) + + context = { + "events_before": events_before, + "events_after": events_after, + "start": await now_token.copy_and_replace( + "room_key", res.start + ).to_string(self.store), + "end": await now_token.copy_and_replace("room_key", res.end).to_string( + self.store + ), + } + + if include_profile: + senders = { + ev.sender + for ev in itertools.chain(events_before, [event], events_after) + } + + if events_after: + last_event_id = events_after[-1].event_id + else: + last_event_id = event.event_id + + state_filter = StateFilter.from_types( + [(EventTypes.Member, sender) for sender in senders] + ) + + state = await self.state_store.get_state_for_event( + last_event_id, state_filter + ) + + context["profile_info"] = { + s.state_key: { + "displayname": s.content.get("displayname", None), + "avatar_url": s.content.get("avatar_url", None), + } + for s in state.values() + if s.type == EventTypes.Member and s.state_key in senders + } + + contexts[event.event_id] = context + + return contexts From 60641e9bd0cea7831093e31de7d6a9f6ddf47d38 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 2 Feb 2022 15:43:00 -0500 Subject: [PATCH 04/11] Remove unused variables. --- synapse/handlers/search.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 16295bf78de0..bca560399af1 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -469,8 +469,6 @@ async def _search_by_rank( results = search_result["results"] - results_map = {r["event"].event_id: r for r in results} - rank_map.update({r["event"].event_id: r["rank"] for r in results}) filtered_events = await search_filter.filter([r["event"] for r in results]) @@ -585,7 +583,6 @@ async def _search_by_recent( room_events = room_events[: search_filter.limit] if len(results) < search_filter.limit * 2: - pagination_token = None break else: pagination_token = results[-1]["pagination_token"] From 01c0127a52c92c7b0e56a6377b3cfc245e034fe1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 2 Feb 2022 15:47:52 -0500 Subject: [PATCH 05/11] Light refactoring to avoid overwriting / updating variables a single time. --- synapse/handlers/search.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index bca560399af1..51cd3a63ddfd 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -458,18 +458,17 @@ async def _search_by_rank( # Holds result of grouping by sender, if applicable sender_group: Dict[str, JsonDict] = {} - highlights = set() - search_result = await self.store.search_msgs(room_ids, search_term, keys) - count = search_result["count"] - if search_result["highlights"]: - highlights.update(search_result["highlights"]) + highlights = search_result["highlights"] + else: + highlights = set() results = search_result["results"] - rank_map.update({r["event"].event_id: r["rank"] for r in results}) + # event_id -> rank of event + rank_map = {r["event"].event_id: r["rank"] for r in results} filtered_events = await search_filter.filter([r["event"] for r in results]) @@ -491,7 +490,14 @@ async def _search_by_rank( ) s["results"].append(e.event_id) - return count, rank_map, allowed_events, room_groups, highlights, sender_group + return ( + search_result["count"], + rank_map, + allowed_events, + room_groups, + highlights, + sender_group, + ) async def _search_by_recent( self, @@ -535,7 +541,6 @@ async def _search_by_recent( Optionally, a pagination token. """ rank_map = {} # event_id -> rank of event - allowed_events: List[EventBase] = [] # Holds result of grouping by room, if applicable room_groups: Dict[str, JsonDict] = {} @@ -617,12 +622,10 @@ async def _search_by_recent( ) ) - allowed_events.extend(room_events) - return ( count, rank_map, - allowed_events, + room_events, room_groups, highlights, global_next_batch, From 1426aaf10bbc58bbd82226f43a0b98657beb7032 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 2 Feb 2022 15:53:53 -0500 Subject: [PATCH 06/11] Use some comprehensions to simplify code. --- synapse/handlers/search.py | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 51cd3a63ddfd..dc304ff52c85 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -379,17 +379,16 @@ async def _search( # We're now about to serialize the events. We should not make any # blocking calls after this. Otherwise the 'age' will be wrong - results = [] - for e in allowed_events: - results.append( - { - "rank": rank_map[e.event_id], - "result": self._event_serializer.serialize_event( - e, time_now, bundle_aggregations=aggregations - ), - "context": contexts.get(e.event_id, {}), - } - ) + results = [ + { + "rank": rank_map[e.event_id], + "result": self._event_serializer.serialize_event( + e, time_now, bundle_aggregations=aggregations + ), + "context": contexts.get(e.event_id, {}), + } + for e in allowed_events + ] rooms_cat_res: JsonDict = { "results": results, @@ -398,13 +397,10 @@ async def _search( } if state_results: - s = {} - for room_id, state_events in state_results.items(): - s[room_id] = self._event_serializer.serialize_events( - state_events, time_now - ) - - rooms_cat_res["state"] = s + rooms_cat_res["state"] = { + room_id: self._event_serializer.serialize_events(state_events, time_now) + for room_id, state_events in state_results.items() + } if room_groups and "room_id" in group_keys: rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups From 84c2497508a22e8fc072251fc9f52614061daa27 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 2 Feb 2022 15:51:16 -0500 Subject: [PATCH 07/11] Make a comment about serialization true. --- synapse/handlers/search.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index dc304ff52c85..d1da0118ab24 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -343,6 +343,15 @@ async def _search( # TODO: Add a limit + state_results = {} + if include_state: + for room_id in {e.room_id for e in allowed_events}: + state = await self.state_handler.get_current_state(room_id) + state_results[room_id] = list(state.values()) + + # We're now about to serialize the events. We should not make any + # blocking calls after this. Otherwise the 'age' will be wrong + time_now = self.clock.time_msec() aggregations = None @@ -370,15 +379,6 @@ async def _search( context["events_after"], time_now, bundle_aggregations=aggregations # type: ignore[arg-type] ) - state_results = {} - if include_state: - for room_id in {e.room_id for e in allowed_events}: - state = await self.state_handler.get_current_state(room_id) - state_results[room_id] = list(state.values()) - - # We're now about to serialize the events. We should not make any - # blocking calls after this. Otherwise the 'age' will be wrong - results = [ { "rank": rank_map[e.event_id], From 78f5bc2d0a6010d5e3aa930bfea0d4a7c97c9b01 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 14 Feb 2022 14:35:42 -0500 Subject: [PATCH 08/11] Newsfragment --- changelog.d/11991.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11991.misc diff --git a/changelog.d/11991.misc b/changelog.d/11991.misc new file mode 100644 index 000000000000..34a3b3a6b9df --- /dev/null +++ b/changelog.d/11991.misc @@ -0,0 +1 @@ +Refactor the search code for improved readability. From f32dbd02ebb24fc118ed251b9a6f2c096929f940 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 15 Feb 2022 08:01:15 -0500 Subject: [PATCH 09/11] Use attrs for search return results. --- synapse/handlers/search.py | 89 +++++++++++++++----------------------- 1 file changed, 35 insertions(+), 54 deletions(-) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index d1da0118ab24..eeb54600612e 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -16,6 +16,7 @@ import logging from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple +import attr from unpaddedbase64 import decode_base64, encode_base64 from synapse.api.constants import EventTypes, Membership @@ -32,6 +33,20 @@ logger = logging.getLogger(__name__) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _SearchResult: + # The count of results. + count: int + # A mapping of event ID to the rank of that event. + rank_map: Dict[str, int] + # A list of the resulting events. + allowed_events: List[EventBase] + # A map of room ID to results. + room_groups: Dict[str, JsonDict] + # A set of event IDs to highlight. + highlights: Set[str] + + class SearchHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() @@ -288,28 +303,13 @@ async def _search( sender_group: Optional[Dict[str, JsonDict]] if order_by == "rank": - ( - count, - rank_map, - allowed_events, - room_groups, - highlights, - sender_group, - ) = await self._search_by_rank( + search_result, sender_group = await self._search_by_rank( user, room_ids, search_term, keys, search_filter ) # Unused return values for rank search. global_next_batch = None - elif order_by == "recent": - ( - count, - rank_map, - allowed_events, - room_groups, - highlights, - global_next_batch, - ) = await self._search_by_recent( + search_result, global_next_batch = await self._search_by_recent( user, room_ids, search_term, @@ -321,11 +321,16 @@ async def _search( ) # Unused return values for recent search. sender_group = None - else: # We should never get here due to the guard earlier. raise NotImplementedError() + count = search_result.count + rank_map = search_result.rank_map + allowed_events = search_result.allowed_events + room_groups = search_result.room_groups + highlights = search_result.highlights + logger.info("Found %d events to return", len(allowed_events)) # If client has asked for "context" for each event (i.e. some surrounding @@ -420,14 +425,7 @@ async def _search_by_rank( search_term: str, keys: Iterable[str], search_filter: Filter, - ) -> Tuple[ - int, - Dict[str, int], - List[EventBase], - Dict[str, JsonDict], - Set[str], - Dict[str, JsonDict], - ]: + ) -> Tuple[_SearchResult, Dict[str, JsonDict]]: """ Performs a full text search for a user ordering by rank. @@ -441,11 +439,7 @@ async def _search_by_rank( Returns: A tuple of: - The count of results. - A mapping of event ID to the rank of that event. - A list of the resulting events. - A map of room ID to results. - A set of event IDs to highlight. + The search results. A map of sender ID to results. """ rank_map = {} # event_id -> rank of event @@ -487,11 +481,13 @@ async def _search_by_rank( s["results"].append(e.event_id) return ( - search_result["count"], - rank_map, - allowed_events, - room_groups, - highlights, + _SearchResult( + search_result["count"], + rank_map, + allowed_events, + room_groups, + highlights, + ), sender_group, ) @@ -505,14 +501,7 @@ async def _search_by_recent( batch_group: Optional[str], batch_group_key: Optional[str], batch_token: Optional[str], - ) -> Tuple[ - int, - Dict[str, int], - List[EventBase], - Dict[str, JsonDict], - Set[str], - Optional[str], - ]: + ) -> Tuple[_SearchResult, Optional[str]]: """ Performs a full text search for a user ordering by recent. @@ -529,11 +518,7 @@ async def _search_by_recent( Returns: A tuple of: - The count of results. - A mapping of event ID to the rank of that event. - A list of the resulting events. - A map of room ID to results. - A set of event IDs to highlight. + The search results. Optionally, a pagination token. """ rank_map = {} # event_id -> rank of event @@ -619,11 +604,7 @@ async def _search_by_recent( ) return ( - count, - rank_map, - room_events, - room_groups, - highlights, + _SearchResult(count, rank_map, room_events, room_groups, highlights), global_next_batch, ) From a4b27ffe95fdcf6fa8cbd86f2dddd821763ee0de Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 15 Feb 2022 08:07:50 -0500 Subject: [PATCH 10/11] Directly use attrs results instead of decomposing. --- synapse/handlers/search.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index eeb54600612e..4b3d421a1887 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -325,13 +325,7 @@ async def _search( # We should never get here due to the guard earlier. raise NotImplementedError() - count = search_result.count - rank_map = search_result.rank_map - allowed_events = search_result.allowed_events - room_groups = search_result.room_groups - highlights = search_result.highlights - - logger.info("Found %d events to return", len(allowed_events)) + logger.info("Found %d events to return", len(search_result.allowed_events)) # If client has asked for "context" for each event (i.e. some surrounding # events and state), fetch that @@ -341,7 +335,11 @@ async def _search( assert after_limit is not None contexts = await self._calculate_event_contexts( - user, allowed_events, before_limit, after_limit, include_profile + user, + search_result.allowed_events, + before_limit, + after_limit, + include_profile, ) else: contexts = {} @@ -350,7 +348,7 @@ async def _search( state_results = {} if include_state: - for room_id in {e.room_id for e in allowed_events}: + for room_id in {e.room_id for e in search_result.allowed_events}: state = await self.state_handler.get_current_state(room_id) state_results[room_id] = list(state.values()) @@ -371,7 +369,7 @@ async def _search( for context in contexts.values() ), # The returned events. - allowed_events, + search_result.allowed_events, ), user.to_string(), ) @@ -386,19 +384,19 @@ async def _search( results = [ { - "rank": rank_map[e.event_id], + "rank": search_result.rank_map[e.event_id], "result": self._event_serializer.serialize_event( e, time_now, bundle_aggregations=aggregations ), "context": contexts.get(e.event_id, {}), } - for e in allowed_events + for e in search_result.allowed_events ] rooms_cat_res: JsonDict = { "results": results, - "count": count, - "highlights": list(highlights), + "count": search_result.count, + "highlights": list(search_result.highlights), } if state_results: @@ -407,8 +405,10 @@ async def _search( for room_id, state_events in state_results.items() } - if room_groups and "room_id" in group_keys: - rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups + if search_result.room_groups and "room_id" in group_keys: + rooms_cat_res.setdefault("groups", {})[ + "room_id" + ] = search_result.room_groups if sender_group and "sender" in group_keys: rooms_cat_res.setdefault("groups", {})["sender"] = sender_group From c0e5e4b97f5eb33b3032fb467eb9f9eda6678502 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 15 Feb 2022 08:16:57 -0500 Subject: [PATCH 11/11] Move fetching of aggregations above generating now. --- synapse/handlers/search.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 4b3d421a1887..afd14da11222 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -352,11 +352,6 @@ async def _search( state = await self.state_handler.get_current_state(room_id) state_results[room_id] = list(state.values()) - # We're now about to serialize the events. We should not make any - # blocking calls after this. Otherwise the 'age' will be wrong - - time_now = self.clock.time_msec() - aggregations = None if self._msc3666_enabled: aggregations = await self.store.get_bundled_aggregations( @@ -374,6 +369,11 @@ async def _search( user.to_string(), ) + # We're now about to serialize the events. We should not make any + # blocking calls after this. Otherwise, the 'age' will be wrong. + + time_now = self.clock.time_msec() + for context in contexts.values(): context["events_before"] = self._event_serializer.serialize_events( context["events_before"], time_now, bundle_aggregations=aggregations # type: ignore[arg-type]