2424 Optional ,
2525 Set ,
2626 Tuple ,
27+ cast ,
2728)
2829
29- from twisted .internet import defer
30-
3130from synapse .api .constants import ReceiptTypes
3231from synapse .replication .slave .storage ._slaved_id_tracker import SlavedIdTracker
3332from synapse .replication .tcp .streams import ReceiptsStream
3837 LoggingTransaction ,
3938)
4039from synapse .storage .engines import PostgresEngine
41- from synapse .storage .util .id_generators import MultiWriterIdGenerator , StreamIdGenerator
40+ from synapse .storage .util .id_generators import (
41+ AbstractStreamIdTracker ,
42+ MultiWriterIdGenerator ,
43+ StreamIdGenerator ,
44+ )
4245from synapse .types import JsonDict
4346from synapse .util import json_encoder
4447from synapse .util .caches .descriptors import cached , cachedList
@@ -58,6 +61,7 @@ def __init__(
5861 hs : "HomeServer" ,
5962 ):
6063 self ._instance_name = hs .get_instance_name ()
64+ self ._receipts_id_gen : AbstractStreamIdTracker
6165
6266 if isinstance (database .engine , PostgresEngine ):
6367 self ._can_write_to_receipts = (
@@ -161,7 +165,7 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, int, int]]:
161165 " AND user_id = ?"
162166 )
163167 txn .execute (sql , (user_id ,))
164- return txn .fetchall ()
168+ return cast ( List [ Tuple [ str , str , int , int ]], txn .fetchall () )
165169
166170 rows = await self .db_pool .runInteraction (
167171 "get_receipts_for_user_with_orderings" , f
@@ -257,7 +261,7 @@ def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
257261 if not rows :
258262 return []
259263
260- content = {}
264+ content : JsonDict = {}
261265 for row in rows :
262266 content .setdefault (row ["event_id" ], {}).setdefault (row ["receipt_type" ], {})[
263267 row ["user_id" ]
@@ -305,7 +309,7 @@ def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
305309 "_get_linearized_receipts_for_rooms" , f
306310 )
307311
308- results = {}
312+ results : JsonDict = {}
309313 for row in txn_results :
310314 # We want a single event per room, since we want to batch the
311315 # receipts by room, event and type.
@@ -370,7 +374,7 @@ def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
370374 "get_linearized_receipts_for_all_rooms" , f
371375 )
372376
373- results = {}
377+ results : JsonDict = {}
374378 for row in txn_results :
375379 # We want a single event per room, since we want to batch the
376380 # receipts by room, event and type.
@@ -399,7 +403,7 @@ async def get_users_sent_receipts_between(
399403 """
400404
401405 if last_id == current_id :
402- return defer . succeed ([])
406+ return []
403407
404408 def _get_users_sent_receipts_between_txn (txn : LoggingTransaction ) -> List [str ]:
405409 sql = """
@@ -453,7 +457,10 @@ def get_all_updated_receipts_txn(
453457 """
454458 txn .execute (sql , (last_id , current_id , limit ))
455459
456- updates = [(r [0 ], r [1 :5 ] + (db_to_json (r [5 ]),)) for r in txn ]
460+ updates = cast (
461+ List [Tuple [int , list ]],
462+ [(r [0 ], r [1 :5 ] + (db_to_json (r [5 ]),)) for r in txn ],
463+ )
457464
458465 limited = False
459466 upper_bound = current_id
@@ -496,7 +503,13 @@ def invalidate_caches_for_receipt(
496503 self ._invalidate_get_users_with_receipts_in_room (room_id , receipt_type , user_id )
497504 self .get_receipts_for_room .invalidate ((room_id , receipt_type ))
498505
499- def process_replication_rows (self , stream_name , instance_name , token , rows ):
506+ def process_replication_rows (
507+ self ,
508+ stream_name : str ,
509+ instance_name : str ,
510+ token : int ,
511+ rows : Iterable [Any ],
512+ ) -> None :
500513 if stream_name == ReceiptsStream .NAME :
501514 self ._receipts_id_gen .advance (instance_name , token )
502515 for row in rows :
@@ -584,7 +597,7 @@ def insert_linearized_receipt_txn(
584597 )
585598
586599 if receipt_type == ReceiptTypes .READ and stream_ordering is not None :
587- self ._remove_old_push_actions_before_txn (
600+ self ._remove_old_push_actions_before_txn ( # type: ignore[attr-defined]
588601 txn , room_id = room_id , user_id = user_id , stream_ordering = stream_ordering
589602 )
590603
@@ -637,7 +650,7 @@ def graph_to_linear(txn: LoggingTransaction) -> str:
637650 "insert_receipt_conv" , graph_to_linear
638651 )
639652
640- async with self ._receipts_id_gen .get_next () as stream_id :
653+ async with self ._receipts_id_gen .get_next () as stream_id : # type: ignore[attr-defined]
641654 event_ts = await self .db_pool .runInteraction (
642655 "insert_linearized_receipt" ,
643656 self .insert_linearized_receipt_txn ,
0 commit comments