|
15 | 15 | import logging |
16 | 16 | from typing import ( |
17 | 17 | TYPE_CHECKING, |
18 | | - Callable, |
19 | 18 | Collection, |
20 | 19 | Dict, |
21 | 20 | FrozenSet, |
|
52 | 51 | from synapse.util.async_helpers import Linearizer |
53 | 52 | from synapse.util.caches import intern_string |
54 | 53 | from synapse.util.caches.descriptors import _CacheContext, cached, cachedList |
55 | | -from synapse.util.cancellation import cancellable |
56 | 54 | from synapse.util.iterutils import batch_iter |
57 | 55 | from synapse.util.metrics import Measure |
58 | 56 |
|
@@ -600,58 +598,6 @@ def _get_rooms_for_user_with_stream_ordering_txn( |
600 | 598 | for room_id, instance, stream_id in txn |
601 | 599 | ) |
602 | 600 |
|
603 | | - @cachedList( |
604 | | - cached_method_name="get_rooms_for_user_with_stream_ordering", |
605 | | - list_name="user_ids", |
606 | | - ) |
607 | | - async def get_rooms_for_users_with_stream_ordering( |
608 | | - self, user_ids: Collection[str] |
609 | | - ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: |
610 | | - """A batched version of `get_rooms_for_user_with_stream_ordering`. |
611 | | -
|
612 | | - Returns: |
613 | | - Map from user_id to set of rooms that is currently in. |
614 | | - """ |
615 | | - return await self.db_pool.runInteraction( |
616 | | - "get_rooms_for_users_with_stream_ordering", |
617 | | - self._get_rooms_for_users_with_stream_ordering_txn, |
618 | | - user_ids, |
619 | | - ) |
620 | | - |
621 | | - def _get_rooms_for_users_with_stream_ordering_txn( |
622 | | - self, txn: LoggingTransaction, user_ids: Collection[str] |
623 | | - ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: |
624 | | - |
625 | | - clause, args = make_in_list_sql_clause( |
626 | | - self.database_engine, |
627 | | - "c.state_key", |
628 | | - user_ids, |
629 | | - ) |
630 | | - |
631 | | - sql = f""" |
632 | | - SELECT c.state_key, room_id, e.instance_name, e.stream_ordering |
633 | | - FROM current_state_events AS c |
634 | | - INNER JOIN events AS e USING (room_id, event_id) |
635 | | - WHERE |
636 | | - c.type = 'm.room.member' |
637 | | - AND c.membership = ? |
638 | | - AND {clause} |
639 | | - """ |
640 | | - |
641 | | - txn.execute(sql, [Membership.JOIN] + args) |
642 | | - |
643 | | - result: Dict[str, Set[GetRoomsForUserWithStreamOrdering]] = { |
644 | | - user_id: set() for user_id in user_ids |
645 | | - } |
646 | | - for user_id, room_id, instance, stream_id in txn: |
647 | | - result[user_id].add( |
648 | | - GetRoomsForUserWithStreamOrdering( |
649 | | - room_id, PersistedEventPosition(instance, stream_id) |
650 | | - ) |
651 | | - ) |
652 | | - |
653 | | - return {user_id: frozenset(v) for user_id, v in result.items()} |
654 | | - |
655 | 601 | async def get_users_server_still_shares_room_with( |
656 | 602 | self, user_ids: Collection[str] |
657 | 603 | ) -> Set[str]: |
@@ -693,19 +639,68 @@ def get_users_server_still_shares_room_with_txn( |
693 | 639 |
|
694 | 640 | return {row[0] for row in txn} |
695 | 641 |
|
696 | | - @cancellable |
697 | | - async def get_rooms_for_user( |
698 | | - self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None |
699 | | - ) -> FrozenSet[str]: |
| 642 | + @cached(max_entries=500000, iterable=True) |
| 643 | + async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]: |
700 | 644 | """Returns a set of room_ids the user is currently joined to. |
701 | 645 |
|
702 | 646 | If a remote user only returns rooms this server is currently |
703 | 647 | participating in. |
704 | 648 | """ |
705 | | - rooms = await self.get_rooms_for_user_with_stream_ordering( |
706 | | - user_id, on_invalidate=on_invalidate |
| 649 | + rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate( |
| 650 | + (user_id,), |
| 651 | + None, |
| 652 | + update_metrics=False, |
| 653 | + ) |
| 654 | + if rooms: |
| 655 | + return frozenset(r.room_id for r in rooms) |
| 656 | + |
| 657 | + room_ids = await self.db_pool.simple_select_onecol( |
| 658 | + table="current_state_events", |
| 659 | + keyvalues={ |
| 660 | + "type": EventTypes.Member, |
| 661 | + "membership": Membership.JOIN, |
| 662 | + "state_key": user_id, |
| 663 | + }, |
| 664 | + retcol="room_id", |
| 665 | + desc="get_rooms_for_user", |
707 | 666 | ) |
708 | | - return frozenset(r.room_id for r in rooms) |
| 667 | + |
| 668 | + return frozenset(room_ids) |
| 669 | + |
| 670 | + @cachedList( |
| 671 | + cached_method_name="get_rooms_for_user", |
| 672 | + list_name="user_ids", |
| 673 | + ) |
| 674 | + async def get_rooms_for_users( |
| 675 | + self, user_ids: Collection[str] |
| 676 | + ) -> Dict[str, FrozenSet[str]]: |
| 677 | + """A batched version of `get_rooms_for_user`. |
| 678 | +
|
| 679 | + Returns: |
| 680 | + Map from user_id to set of rooms that is currently in. |
| 681 | + """ |
| 682 | + |
| 683 | + rows = await self.db_pool.simple_select_many_batch( |
| 684 | + table="current_state_events", |
| 685 | + column="state_key", |
| 686 | + iterable=user_ids, |
| 687 | + retcols=( |
| 688 | + "state_key", |
| 689 | + "room_id", |
| 690 | + ), |
| 691 | + keyvalues={ |
| 692 | + "type": EventTypes.Member, |
| 693 | + "membership": Membership.JOIN, |
| 694 | + }, |
| 695 | + desc="get_rooms_for_users", |
| 696 | + ) |
| 697 | + |
| 698 | + user_rooms: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids} |
| 699 | + |
| 700 | + for row in rows: |
| 701 | + user_rooms[row["state_key"]].add(row["room_id"]) |
| 702 | + |
| 703 | + return {key: frozenset(rooms) for key, rooms in user_rooms.items()} |
709 | 704 |
|
710 | 705 | @cached(max_entries=10000) |
711 | 706 | async def does_pair_of_users_share_a_room( |
|
0 commit comments