5959from synapse .events .snapshot import EventContext
6060from synapse .federation .federation_client import InvalidResponseError
6161from synapse .logging .context import nested_logging_context
62- from synapse .logging .opentracing import trace
62+ from synapse .logging .opentracing import (
63+ SynapseTags ,
64+ set_tag ,
65+ start_active_span ,
66+ tag_args ,
67+ trace ,
68+ )
6369from synapse .metrics .background_process_metrics import run_as_background_process
6470from synapse .replication .http .devices import ReplicationUserDevicesResyncRestServlet
6571from synapse .replication .http .federation import (
@@ -410,6 +416,7 @@ async def check_join_restrictions(
410416 prev_member_event ,
411417 )
412418
419+ @trace
413420 async def process_remote_join (
414421 self ,
415422 origin : str ,
@@ -715,7 +722,7 @@ async def _get_missing_events_for_pdu(
715722
716723 @trace
717724 async def _process_pulled_events (
718- self , origin : str , events : Iterable [EventBase ], backfilled : bool
725+ self , origin : str , events : Collection [EventBase ], backfilled : bool
719726 ) -> None :
720727 """Process a batch of events we have pulled from a remote server
721728
@@ -730,6 +737,15 @@ async def _process_pulled_events(
730737 backfilled: True if this is part of a historical batch of events (inhibits
731738 notification to clients, and validation of device keys.)
732739 """
740+ set_tag (
741+ SynapseTags .FUNC_ARG_PREFIX + "event_ids" ,
742+ str ([event .event_id for event in events ]),
743+ )
744+ set_tag (
745+ SynapseTags .FUNC_ARG_PREFIX + "event_ids.length" ,
746+ str (len (events )),
747+ )
748+ set_tag (SynapseTags .FUNC_ARG_PREFIX + "backfilled" , str (backfilled ))
733749 logger .debug (
734750 "processing pulled backfilled=%s events=%s" ,
735751 backfilled ,
@@ -753,6 +769,7 @@ async def _process_pulled_events(
753769 await self ._process_pulled_event (origin , ev , backfilled = backfilled )
754770
755771 @trace
772+ @tag_args
756773 async def _process_pulled_event (
757774 self , origin : str , event : EventBase , backfilled : bool
758775 ) -> None :
@@ -854,6 +871,7 @@ async def _process_pulled_event(
854871 else :
855872 raise
856873
874+ @trace
857875 async def _compute_event_context_with_maybe_missing_prevs (
858876 self , dest : str , event : EventBase
859877 ) -> EventContext :
@@ -970,6 +988,8 @@ async def _compute_event_context_with_maybe_missing_prevs(
970988 event , state_ids_before_event = state_map , partial_state = partial_state
971989 )
972990
991+ @trace
992+ @tag_args
973993 async def _get_state_ids_after_missing_prev_event (
974994 self ,
975995 destination : str ,
@@ -1009,10 +1029,10 @@ async def _get_state_ids_after_missing_prev_event(
10091029 logger .debug ("Fetching %i events from cache/store" , len (desired_events ))
10101030 have_events = await self ._store .have_seen_events (room_id , desired_events )
10111031
1012- missing_desired_events = desired_events - have_events
1032+ missing_desired_event_ids = desired_events - have_events
10131033 logger .debug (
10141034 "We are missing %i events (got %i)" ,
1015- len (missing_desired_events ),
1035+ len (missing_desired_event_ids ),
10161036 len (have_events ),
10171037 )
10181038
@@ -1024,13 +1044,30 @@ async def _get_state_ids_after_missing_prev_event(
10241044 # already have a bunch of the state events. It would be nice if the
10251045 # federation api gave us a way of finding out which we actually need.
10261046
1027- missing_auth_events = set (auth_event_ids ) - have_events
1028- missing_auth_events .difference_update (
1029- await self ._store .have_seen_events (room_id , missing_auth_events )
1047+ missing_auth_event_ids = set (auth_event_ids ) - have_events
1048+ missing_auth_event_ids .difference_update (
1049+ await self ._store .have_seen_events (room_id , missing_auth_event_ids )
10301050 )
1031- logger .debug ("We are also missing %i auth events" , len (missing_auth_events ))
1051+ logger .debug ("We are also missing %i auth events" , len (missing_auth_event_ids ))
10321052
1033- missing_events = missing_desired_events | missing_auth_events
1053+ missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
1054+
1055+ set_tag (
1056+ SynapseTags .RESULT_PREFIX + "missing_auth_event_ids" ,
1057+ str (missing_auth_event_ids ),
1058+ )
1059+ set_tag (
1060+ SynapseTags .RESULT_PREFIX + "missing_auth_event_ids.length" ,
1061+ str (len (missing_auth_event_ids )),
1062+ )
1063+ set_tag (
1064+ SynapseTags .RESULT_PREFIX + "missing_desired_event_ids" ,
1065+ str (missing_desired_event_ids ),
1066+ )
1067+ set_tag (
1068+ SynapseTags .RESULT_PREFIX + "missing_desired_event_ids.length" ,
1069+ str (len (missing_desired_event_ids )),
1070+ )
10341071
10351072 # Making an individual request for each of 1000s of events has a lot of
10361073 # overhead. On the other hand, we don't really want to fetch all of the events
@@ -1041,13 +1078,13 @@ async def _get_state_ids_after_missing_prev_event(
10411078 #
10421079 # TODO: might it be better to have an API which lets us do an aggregate event
10431080 # request
1044- if (len (missing_events ) * 10 ) >= len (auth_event_ids ) + len (state_event_ids ):
1081+ if (len (missing_event_ids ) * 10 ) >= len (auth_event_ids ) + len (state_event_ids ):
10451082 logger .debug ("Requesting complete state from remote" )
10461083 await self ._get_state_and_persist (destination , room_id , event_id )
10471084 else :
1048- logger .debug ("Fetching %i events from remote" , len (missing_events ))
1085+ logger .debug ("Fetching %i events from remote" , len (missing_event_ids ))
10491086 await self ._get_events_and_persist (
1050- destination = destination , room_id = room_id , event_ids = missing_events
1087+ destination = destination , room_id = room_id , event_ids = missing_event_ids
10511088 )
10521089
10531090 # We now need to fill out the state map, which involves fetching the
@@ -1104,6 +1141,14 @@ async def _get_state_ids_after_missing_prev_event(
11041141 event_id ,
11051142 failed_to_fetch ,
11061143 )
1144+ set_tag (
1145+ SynapseTags .RESULT_PREFIX + "failed_to_fetch" ,
1146+ str (failed_to_fetch ),
1147+ )
1148+ set_tag (
1149+ SynapseTags .RESULT_PREFIX + "failed_to_fetch.length" ,
1150+ str (len (failed_to_fetch )),
1151+ )
11071152
11081153 if remote_event .is_state () and remote_event .rejected_reason is None :
11091154 state_map [
@@ -1112,6 +1157,8 @@ async def _get_state_ids_after_missing_prev_event(
11121157
11131158 return state_map
11141159
1160+ @trace
1161+ @tag_args
11151162 async def _get_state_and_persist (
11161163 self , destination : str , room_id : str , event_id : str
11171164 ) -> None :
@@ -1133,6 +1180,7 @@ async def _get_state_and_persist(
11331180 destination = destination , room_id = room_id , event_ids = (event_id ,)
11341181 )
11351182
1183+ @trace
11361184 async def _process_received_pdu (
11371185 self ,
11381186 origin : str ,
@@ -1283,6 +1331,7 @@ async def _resync_device(self, sender: str) -> None:
12831331 except Exception :
12841332 logger .exception ("Failed to resync device for %s" , sender )
12851333
1334+ @trace
12861335 async def _handle_marker_event (self , origin : str , marker_event : EventBase ) -> None :
12871336 """Handles backfilling the insertion event when we receive a marker
12881337 event that points to one.
@@ -1414,6 +1463,8 @@ async def backfill_event_id(
14141463
14151464 return event_from_response
14161465
1466+ @trace
1467+ @tag_args
14171468 async def _get_events_and_persist (
14181469 self , destination : str , room_id : str , event_ids : Collection [str ]
14191470 ) -> None :
@@ -1459,6 +1510,7 @@ async def get_event(event_id: str) -> None:
14591510 logger .info ("Fetched %i events of %i requested" , len (events ), len (event_ids ))
14601511 await self ._auth_and_persist_outliers (room_id , events )
14611512
1513+ @trace
14621514 async def _auth_and_persist_outliers (
14631515 self , room_id : str , events : Iterable [EventBase ]
14641516 ) -> None :
@@ -1477,6 +1529,16 @@ async def _auth_and_persist_outliers(
14771529 """
14781530 event_map = {event .event_id : event for event in events }
14791531
1532+ event_ids = event_map .keys ()
1533+ set_tag (
1534+ SynapseTags .FUNC_ARG_PREFIX + "event_ids" ,
1535+ str (event_ids ),
1536+ )
1537+ set_tag (
1538+ SynapseTags .FUNC_ARG_PREFIX + "event_ids.length" ,
1539+ str (len (event_ids )),
1540+ )
1541+
14801542 # filter out any events we have already seen. This might happen because
14811543 # the events were eagerly pushed to us (eg, during a room join), or because
14821544 # another thread has raced against us since we decided to request the event.
@@ -1593,6 +1655,7 @@ async def prep(event: EventBase) -> None:
15931655 backfilled = True ,
15941656 )
15951657
1658+ @trace
15961659 async def _check_event_auth (
15971660 self , origin : Optional [str ], event : EventBase , context : EventContext
15981661 ) -> None :
@@ -1631,6 +1694,14 @@ async def _check_event_auth(
16311694 claimed_auth_events = await self ._load_or_fetch_auth_events_for_event (
16321695 origin , event
16331696 )
1697+ set_tag (
1698+ SynapseTags .RESULT_PREFIX + "claimed_auth_events" ,
1699+ str ([ev .event_id for ev in claimed_auth_events ]),
1700+ )
1701+ set_tag (
1702+ SynapseTags .RESULT_PREFIX + "claimed_auth_events.length" ,
1703+ str (len (claimed_auth_events )),
1704+ )
16341705
16351706 # ... and check that the event passes auth at those auth events.
16361707 # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1728,6 +1799,7 @@ async def _check_event_auth(
17281799 )
17291800 context .rejected = RejectedReason .AUTH_ERROR
17301801
1802+ @trace
17311803 async def _maybe_kick_guest_users (self , event : EventBase ) -> None :
17321804 if event .type != EventTypes .GuestAccess :
17331805 return
@@ -1935,6 +2007,8 @@ async def _load_or_fetch_auth_events_for_event(
19352007 # instead we raise an AuthError, which will make the caller ignore it.
19362008 raise AuthError (code = HTTPStatus .FORBIDDEN , msg = "Auth events could not be found" )
19372009
2010+ @trace
2011+ @tag_args
19382012 async def _get_remote_auth_chain_for_event (
19392013 self , destination : str , room_id : str , event_id : str
19402014 ) -> None :
@@ -1963,6 +2037,7 @@ async def _get_remote_auth_chain_for_event(
19632037
19642038 await self ._auth_and_persist_outliers (room_id , remote_auth_events )
19652039
2040+ @trace
19662041 async def _run_push_actions_and_persist_event (
19672042 self , event : EventBase , context : EventContext , backfilled : bool = False
19682043 ) -> None :
@@ -2071,8 +2146,17 @@ async def persist_events_and_notify(
20712146 self ._message_handler .maybe_schedule_expiry (event )
20722147
20732148 if not backfilled : # Never notify for backfilled events
2074- for event in events :
2075- await self ._notify_persisted_event (event , max_stream_token )
2149+ with start_active_span ("notify_persisted_events" ):
2150+ set_tag (
2151+ SynapseTags .RESULT_PREFIX + "event_ids" ,
2152+ str ([ev .event_id for ev in events ]),
2153+ )
2154+ set_tag (
2155+ SynapseTags .RESULT_PREFIX + "event_ids.length" ,
2156+ str (len (events )),
2157+ )
2158+ for event in events :
2159+ await self ._notify_persisted_event (event , max_stream_token )
20762160
20772161 return max_stream_token .stream
20782162
0 commit comments