1313# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414# See the License for the specific language governing permissions and
1515# limitations under the License.
16-
16+ import heapq
1717import logging
18- from collections import namedtuple
18+ from collections import defaultdict , namedtuple
1919from typing import (
20+ Any ,
2021 Awaitable ,
22+ Callable ,
23+ DefaultDict ,
2124 Dict ,
2225 Iterable ,
2326 List ,
2427 Optional ,
2528 Sequence ,
2629 Set ,
30+ Tuple ,
2731 Union ,
2832 overload ,
2933)
3034
3135import attr
3236from frozendict import frozendict
33- from prometheus_client import Histogram
37+ from prometheus_client import Counter , Histogram
3438from typing_extensions import Literal
3539
3640from synapse .api .constants import EventTypes
3741from synapse .api .room_versions import KNOWN_ROOM_VERSIONS , StateResolutionVersions
3842from synapse .events import EventBase
3943from synapse .events .snapshot import EventContext
44+ from synapse .logging .context import ContextResourceUsage
4045from synapse .logging .utils import log_function
4146from synapse .state import v1 , v2
4247from synapse .storage .databases .main .events_worker import EventRedactBehaviour
4348from synapse .storage .roommember import ProfileInfo
4449from synapse .types import Collection , StateMap
45- from synapse .util import Clock
4650from synapse .util .async_helpers import Linearizer
4751from synapse .util .caches .expiringcache import ExpiringCache
4852from synapse .util .metrics import Measure , measure_func
4953
5054logger = logging .getLogger (__name__ )
51-
55+ metrics_logger = logging . getLogger ( "synapse.state.metrics" )
5256
5357# Metrics for number of state groups involved in a resolution.
5458state_groups_histogram = Histogram (
@@ -448,19 +452,44 @@ async def resolve_events(
448452
449453 state_map = {ev .event_id : ev for st in state_sets for ev in st }
450454
451- with Measure (self .clock , "state._resolve_events" ):
452- new_state = await resolve_events_with_store (
453- self .clock ,
454- event .room_id ,
455- room_version ,
456- state_set_ids ,
457- event_map = state_map ,
458- state_res_store = StateResolutionStore (self .store ),
459- )
455+ new_state = await self ._state_resolution_handler .resolve_events_with_store (
456+ event .room_id ,
457+ room_version ,
458+ state_set_ids ,
459+ event_map = state_map ,
460+ state_res_store = StateResolutionStore (self .store ),
461+ )
460462
461463 return {key : state_map [ev_id ] for key , ev_id in new_state .items ()}
462464
463465
466+ @attr .s (slots = True )
467+ class _StateResMetrics :
468+ """Keeps track of some usage metrics about state res."""
469+
470+ # System and User CPU time, in seconds
471+ cpu_time = attr .ib (type = float , default = 0.0 )
472+
473+ # time spent on database transactions (excluding scheduling time). This roughly
474+ # corresponds to the amount of work done on the db server, excluding event fetches.
475+ db_time = attr .ib (type = float , default = 0.0 )
476+
477+ # number of events fetched from the db.
478+ db_events = attr .ib (type = int , default = 0 )
479+
480+
481+ _biggest_room_by_cpu_counter = Counter (
482+ "synapse_state_res_cpu_for_biggest_room_seconds" ,
483+ "CPU time spent performing state resolution for the single most expensive "
484+ "room for state resolution" ,
485+ )
486+ _biggest_room_by_db_counter = Counter (
487+ "synapse_state_res_db_for_biggest_room_seconds" ,
488+ "Database time spent performing state resolution for the single most "
489+ "expensive room for state resolution" ,
490+ )
491+
492+
464493class StateResolutionHandler :
465494 """Responsible for doing state conflict resolution.
466495
@@ -483,6 +512,17 @@ def __init__(self, hs):
483512 reset_expiry_on_get = True ,
484513 )
485514
515+ #
516+ # stuff for tracking time spent on state-res by room
517+ #
518+
519+ # tracks the amount of work done on state res per room
520+ self ._state_res_metrics = defaultdict (
521+ _StateResMetrics
522+ ) # type: DefaultDict[str, _StateResMetrics]
523+
524+ self .clock .looping_call (self ._report_metrics , 120 * 1000 )
525+
486526 @log_function
487527 async def resolve_state_groups (
488528 self ,
@@ -530,15 +570,13 @@ async def resolve_state_groups(
530570
531571 state_groups_histogram .observe (len (state_groups_ids ))
532572
533- with Measure (self .clock , "state._resolve_events" ):
534- new_state = await resolve_events_with_store (
535- self .clock ,
536- room_id ,
537- room_version ,
538- list (state_groups_ids .values ()),
539- event_map = event_map ,
540- state_res_store = state_res_store ,
541- )
573+ new_state = await self .resolve_events_with_store (
574+ room_id ,
575+ room_version ,
576+ list (state_groups_ids .values ()),
577+ event_map = event_map ,
578+ state_res_store = state_res_store ,
579+ )
542580
543581 # if the new state matches any of the input state groups, we can
544582 # use that state group again. Otherwise we will generate a state_id
@@ -552,6 +590,114 @@ async def resolve_state_groups(
552590
553591 return cache
554592
593+ async def resolve_events_with_store (
594+ self ,
595+ room_id : str ,
596+ room_version : str ,
597+ state_sets : Sequence [StateMap [str ]],
598+ event_map : Optional [Dict [str , EventBase ]],
599+ state_res_store : "StateResolutionStore" ,
600+ ) -> StateMap [str ]:
601+ """
602+ Args:
603+ room_id: the room we are working in
604+
605+ room_version: Version of the room
606+
607+ state_sets: List of dicts of (type, state_key) -> event_id,
608+ which are the different state groups to resolve.
609+
610+ event_map:
611+ a dict from event_id to event, for any events that we happen to
612+ have in flight (eg, those currently being persisted). This will be
613+ used as a starting point fof finding the state we need; any missing
614+ events will be requested via state_map_factory.
615+
616+ If None, all events will be fetched via state_res_store.
617+
618+ state_res_store: a place to fetch events from
619+
620+ Returns:
621+ a map from (type, state_key) to event_id.
622+ """
623+ try :
624+ with Measure (self .clock , "state._resolve_events" ) as m :
625+ v = KNOWN_ROOM_VERSIONS [room_version ]
626+ if v .state_res == StateResolutionVersions .V1 :
627+ return await v1 .resolve_events_with_store (
628+ room_id , state_sets , event_map , state_res_store .get_events
629+ )
630+ else :
631+ return await v2 .resolve_events_with_store (
632+ self .clock ,
633+ room_id ,
634+ room_version ,
635+ state_sets ,
636+ event_map ,
637+ state_res_store ,
638+ )
639+ finally :
640+ self ._record_state_res_metrics (room_id , m .get_resource_usage ())
641+
642+ def _record_state_res_metrics (self , room_id : str , rusage : ContextResourceUsage ):
643+ room_metrics = self ._state_res_metrics [room_id ]
644+ room_metrics .cpu_time += rusage .ru_utime + rusage .ru_stime
645+ room_metrics .db_time += rusage .db_txn_duration_sec
646+ room_metrics .db_events += rusage .evt_db_fetch_count
647+
648+ def _report_metrics (self ):
649+ if not self ._state_res_metrics :
650+ # no state res has happened since the last iteration: don't bother logging.
651+ return
652+
653+ self ._report_biggest (
654+ lambda i : i .cpu_time , "CPU time" , _biggest_room_by_cpu_counter ,
655+ )
656+
657+ self ._report_biggest (
658+ lambda i : i .db_time , "DB time" , _biggest_room_by_db_counter ,
659+ )
660+
661+ self ._state_res_metrics .clear ()
662+
663+ def _report_biggest (
664+ self ,
665+ extract_key : Callable [[_StateResMetrics ], Any ],
666+ metric_name : str ,
667+ prometheus_counter_metric : Counter ,
668+ ) -> None :
669+ """Report metrics on the biggest rooms for state res
670+
671+ Args:
672+ extract_key: a callable which, given a _StateResMetrics, extracts a single
673+ metric to sort by.
674+ metric_name: the name of the metric we have extracted, for the log line
675+ prometheus_counter_metric: a prometheus metric recording the sum of the
676+ the extracted metric
677+ """
678+ n_to_log = 10
679+ if not metrics_logger .isEnabledFor (logging .DEBUG ):
680+ # only need the most expensive if we don't have debug logging, which
681+ # allows nlargest() to degrade to max()
682+ n_to_log = 1
683+
684+ items = self ._state_res_metrics .items ()
685+
686+ # log the N biggest rooms
687+ biggest = heapq .nlargest (
688+ n_to_log , items , key = lambda i : extract_key (i [1 ])
689+ ) # type: List[Tuple[str, _StateResMetrics]]
690+ metrics_logger .debug (
691+ "%i biggest rooms for state-res by %s: %s" ,
692+ len (biggest ),
693+ metric_name ,
694+ ["%s (%gs)" % (r , extract_key (m )) for (r , m ) in biggest ],
695+ )
696+
697+ # report info on the single biggest to prometheus
698+ _ , biggest_metrics = biggest [0 ]
699+ prometheus_counter_metric .inc (extract_key (biggest_metrics ))
700+
555701
556702def _make_state_cache_entry (
557703 new_state : StateMap [str ], state_groups_ids : Dict [int , StateMap [str ]]
@@ -605,47 +751,6 @@ def _make_state_cache_entry(
605751 )
606752
607753
608- def resolve_events_with_store (
609- clock : Clock ,
610- room_id : str ,
611- room_version : str ,
612- state_sets : Sequence [StateMap [str ]],
613- event_map : Optional [Dict [str , EventBase ]],
614- state_res_store : "StateResolutionStore" ,
615- ) -> Awaitable [StateMap [str ]]:
616- """
617- Args:
618- room_id: the room we are working in
619-
620- room_version: Version of the room
621-
622- state_sets: List of dicts of (type, state_key) -> event_id,
623- which are the different state groups to resolve.
624-
625- event_map:
626- a dict from event_id to event, for any events that we happen to
627- have in flight (eg, those currently being persisted). This will be
628- used as a starting point fof finding the state we need; any missing
629- events will be requested via state_map_factory.
630-
631- If None, all events will be fetched via state_res_store.
632-
633- state_res_store: a place to fetch events from
634-
635- Returns:
636- a map from (type, state_key) to event_id.
637- """
638- v = KNOWN_ROOM_VERSIONS [room_version ]
639- if v .state_res == StateResolutionVersions .V1 :
640- return v1 .resolve_events_with_store (
641- room_id , state_sets , event_map , state_res_store .get_events
642- )
643- else :
644- return v2 .resolve_events_with_store (
645- clock , room_id , room_version , state_sets , event_map , state_res_store
646- )
647-
648-
649754@attr .s (slots = True )
650755class StateResolutionStore :
651756 """Interface that allows state resolution algorithms to access the database
0 commit comments