119119]
120120
121121
122+ @attr .s (slots = True , auto_attribs = True )
123+ class _RoomReceipt :
124+ """
125+ HttpPushAction instances include the information used to generate HTTP
126+ requests to a push gateway.
127+ """
128+
129+ unthreaded_stream_ordering : int = 0
130+ # threaded_stream_ordering includes the main pseudo-thread.
131+ threaded_stream_ordering : Dict [str , int ] = attr .Factory (dict )
132+
133+ def is_unread (self , thread_id : str , stream_ordering : int ) -> bool :
134+ """Returns True if the stream ordering is unread according to the receipt information."""
135+
136+ # Only include push actions with a stream ordering after both the unthreaded
137+ # and threaded receipt. Properly handles a user without any receipts present.
138+ return (
139+ self .unthreaded_stream_ordering < stream_ordering
140+ and self .threaded_stream_ordering .get (thread_id , 0 ) < stream_ordering
141+ )
142+
143+
144+ # A _RoomReceipt with no receipts in it.
145+ MISSING_ROOM_RECEIPT = _RoomReceipt ()
146+
147+
122148@attr .s (slots = True , frozen = True , auto_attribs = True )
123149class HttpPushAction :
124150 """
@@ -716,7 +742,7 @@ def f(txn: LoggingTransaction) -> List[str]:
716742
717743 def _get_receipts_by_room_txn (
718744 self , txn : LoggingTransaction , user_id : str
719- ) -> Dict [str , int ]:
745+ ) -> Dict [str , _RoomReceipt ]:
720746 """
721747 Generate a map of room ID to the latest stream ordering that has been
722748 read by the given user.
@@ -726,7 +752,8 @@ def _get_receipts_by_room_txn(
726752 user_id: The user to fetch receipts for.
727753
728754 Returns:
729- A map of room ID to stream ordering for all rooms the user has a receipt in.
755+ A map including all rooms the user is in with a receipt. It maps
756+ room IDs to _RoomReceipt instances
730757 """
731758 receipt_types_clause , args = make_in_list_sql_clause (
732759 self .database_engine ,
@@ -735,20 +762,26 @@ def _get_receipts_by_room_txn(
735762 )
736763
737764 sql = f"""
738- SELECT room_id, MAX(stream_ordering)
765+ SELECT room_id, thread_id, MAX(stream_ordering)
739766 FROM receipts_linearized
740767 INNER JOIN events USING (room_id, event_id)
741768 WHERE { receipt_types_clause }
742769 AND user_id = ?
743- GROUP BY room_id
770+ GROUP BY room_id, thread_id
744771 """
745772
746773 args .extend ((user_id ,))
747774 txn .execute (sql , args )
748- return {
749- room_id : latest_stream_ordering
750- for room_id , latest_stream_ordering in txn .fetchall ()
751- }
775+
776+ result : Dict [str , _RoomReceipt ] = {}
777+ for room_id , thread_id , stream_ordering in txn :
778+ room_receipt = result .setdefault (room_id , _RoomReceipt ())
779+ if thread_id is None :
780+ room_receipt .unthreaded_stream_ordering = stream_ordering
781+ else :
782+ room_receipt .threaded_stream_ordering [thread_id ] = stream_ordering
783+
784+ return result
752785
753786 async def get_unread_push_actions_for_user_in_range_for_http (
754787 self ,
@@ -781,9 +814,10 @@ async def get_unread_push_actions_for_user_in_range_for_http(
781814
782815 def get_push_actions_txn (
783816 txn : LoggingTransaction ,
784- ) -> List [Tuple [str , str , int , str , bool ]]:
817+ ) -> List [Tuple [str , str , str , int , str , bool ]]:
785818 sql = """
786- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
819+ SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
820+ ep.actions, ep.highlight
787821 FROM event_push_actions AS ep
788822 WHERE
789823 ep.user_id = ?
@@ -793,7 +827,7 @@ def get_push_actions_txn(
793827 ORDER BY ep.stream_ordering ASC LIMIT ?
794828 """
795829 txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
796- return cast (List [Tuple [str , str , int , str , bool ]], txn .fetchall ())
830+ return cast (List [Tuple [str , str , str , int , str , bool ]], txn .fetchall ())
797831
798832 push_actions = await self .db_pool .runInteraction (
799833 "get_unread_push_actions_for_user_in_range_http" , get_push_actions_txn
@@ -806,10 +840,10 @@ def get_push_actions_txn(
806840 stream_ordering = stream_ordering ,
807841 actions = _deserialize_action (actions , highlight ),
808842 )
809- for event_id , room_id , stream_ordering , actions , highlight in push_actions
810- # Only include push actions with a stream ordering after any receipt, or without any
811- # receipt present (invited to but never read rooms).
812- if stream_ordering > receipts_by_room . get ( room_id , 0 )
843+ for event_id , room_id , thread_id , stream_ordering , actions , highlight in push_actions
844+ if receipts_by_room . get ( room_id , MISSING_ROOM_RECEIPT ). is_unread (
845+ thread_id , stream_ordering
846+ )
813847 ]
814848
815849 # Now sort it so it's ordered correctly, since currently it will
@@ -853,10 +887,10 @@ async def get_unread_push_actions_for_user_in_range_for_email(
853887
854888 def get_push_actions_txn (
855889 txn : LoggingTransaction ,
856- ) -> List [Tuple [str , str , int , str , bool , int ]]:
890+ ) -> List [Tuple [str , str , str , int , str , bool , int ]]:
857891 sql = """
858- SELECT ep.event_id, ep.room_id, ep.stream_ordering , ep.actions ,
859- ep.highlight, e.received_ts
892+ SELECT ep.event_id, ep.room_id, ep.thread_id , ep.stream_ordering ,
893+ ep.actions, ep. highlight, e.received_ts
860894 FROM event_push_actions AS ep
861895 INNER JOIN events AS e USING (room_id, event_id)
862896 WHERE
@@ -867,7 +901,7 @@ def get_push_actions_txn(
867901 ORDER BY ep.stream_ordering DESC LIMIT ?
868902 """
869903 txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
870- return cast (List [Tuple [str , str , int , str , bool , int ]], txn .fetchall ())
904+ return cast (List [Tuple [str , str , str , int , str , bool , int ]], txn .fetchall ())
871905
872906 push_actions = await self .db_pool .runInteraction (
873907 "get_unread_push_actions_for_user_in_range_email" , get_push_actions_txn
@@ -882,10 +916,10 @@ def get_push_actions_txn(
882916 actions = _deserialize_action (actions , highlight ),
883917 received_ts = received_ts ,
884918 )
885- for event_id , room_id , stream_ordering , actions , highlight , received_ts in push_actions
886- # Only include push actions with a stream ordering after any receipt, or without any
887- # receipt present (invited to but never read rooms).
888- if stream_ordering > receipts_by_room . get ( room_id , 0 )
919+ for event_id , room_id , thread_id , stream_ordering , actions , highlight , received_ts in push_actions
920+ if receipts_by_room . get ( room_id , MISSING_ROOM_RECEIPT ). is_unread (
921+ thread_id , stream_ordering
922+ )
889923 ]
890924
891925 # Now sort it so it's ordered correctly, since currently it will
0 commit comments