1313# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414# See the License for the specific language governing permissions and
1515# limitations under the License.
16-
16+ import heapq
1717import logging
18- from collections import namedtuple
18+ from collections import defaultdict , namedtuple
1919from typing import (
20+ Any ,
2021 Awaitable ,
22+ Callable ,
23+ DefaultDict ,
2124 Dict ,
2225 Iterable ,
2326 List ,
2427 Optional ,
2528 Sequence ,
2629 Set ,
30+ Tuple ,
2731 Union ,
2832 overload ,
2933)
3034
3135import attr
3236from frozendict import frozendict
33- from prometheus_client import Histogram
37+ from prometheus_client import Counter , Histogram
3438from typing_extensions import Literal
3539
3640from synapse .api .constants import EventTypes
3741from synapse .api .room_versions import KNOWN_ROOM_VERSIONS , StateResolutionVersions
3842from synapse .events import EventBase
3943from synapse .events .snapshot import EventContext
44+ from synapse .logging .context import ContextResourceUsage
4045from synapse .logging .utils import log_function
4146from synapse .state import v1 , v2
4247from synapse .storage .databases .main .events_worker import EventRedactBehaviour
4348from synapse .storage .roommember import ProfileInfo
4449from synapse .types import Collection , StateMap
45- from synapse .util import Clock
4650from synapse .util .async_helpers import Linearizer
4751from synapse .util .caches .expiringcache import ExpiringCache
4852from synapse .util .metrics import Measure , measure_func
4953
5054logger = logging .getLogger (__name__ )
51-
55+ metrics_logger = logging . getLogger ( "synapse.state.metrics" )
5256
5357# Metrics for number of state groups involved in a resolution.
5458state_groups_histogram = Histogram (
@@ -459,6 +463,33 @@ async def resolve_events(
459463 return {key : state_map [ev_id ] for key , ev_id in new_state .items ()}
460464
461465
466+ @attr .s (slots = True )
467+ class _StateResMetrics :
468+ """Keeps track of some usage metrics about state res."""
469+
470+ # System and User CPU time, in seconds
471+ cpu_time = attr .ib (type = float , default = 0.0 )
472+
473+ # time spent on database transactions (excluding scheduling time). This roughly
474+ # corresponds to the amount of work done on the db server, excluding event fetches.
475+ db_time = attr .ib (type = float , default = 0.0 )
476+
477+ # number of events fetched from the db.
478+ db_events = attr .ib (type = int , default = 0 )
479+
480+
481+ _biggest_room_by_cpu_counter = Counter (
482+ "synapse_state_res_cpu_for_biggest_room_seconds" ,
483+ "CPU time spent performing state resolution for the single most expensive "
484+ "room for state resolution" ,
485+ )
486+ _biggest_room_by_db_counter = Counter (
487+ "synapse_state_res_db_for_biggest_room_seconds" ,
488+ "Database time spent performing state resolution for the single most "
489+ "expensive room for state resolution" ,
490+ )
491+
492+
462493class StateResolutionHandler :
463494 """Responsible for doing state conflict resolution.
464495
@@ -481,6 +512,17 @@ def __init__(self, hs):
481512 reset_expiry_on_get = True ,
482513 )
483514
515+ #
516+ # stuff for tracking time spent on state-res by room
517+ #
518+
519+ # tracks the amount of work done on state res per room
520+ self ._state_res_metrics = defaultdict (
521+ _StateResMetrics
522+ ) # type: DefaultDict[str, _StateResMetrics]
523+
524+ self .clock .looping_call (self ._report_metrics , 120 * 1000 )
525+
484526 @log_function
485527 async def resolve_state_groups (
486528 self ,
@@ -578,21 +620,83 @@ async def resolve_events_with_store(
578620 Returns:
579621 a map from (type, state_key) to event_id.
580622 """
581- with Measure (self .clock , "state._resolve_events" ):
582- v = KNOWN_ROOM_VERSIONS [room_version ]
583- if v .state_res == StateResolutionVersions .V1 :
584- return await v1 .resolve_events_with_store (
585- room_id , state_sets , event_map , state_res_store .get_events
586- )
587- else :
588- return await v2 .resolve_events_with_store (
589- self .clock ,
590- room_id ,
591- room_version ,
592- state_sets ,
593- event_map ,
594- state_res_store ,
595- )
623+ try :
624+ with Measure (self .clock , "state._resolve_events" ) as m :
625+ v = KNOWN_ROOM_VERSIONS [room_version ]
626+ if v .state_res == StateResolutionVersions .V1 :
627+ return await v1 .resolve_events_with_store (
628+ room_id , state_sets , event_map , state_res_store .get_events
629+ )
630+ else :
631+ return await v2 .resolve_events_with_store (
632+ self .clock ,
633+ room_id ,
634+ room_version ,
635+ state_sets ,
636+ event_map ,
637+ state_res_store ,
638+ )
639+ finally :
640+ self ._record_state_res_metrics (room_id , m .get_resource_usage ())
641+
642+ def _record_state_res_metrics (self , room_id : str , rusage : ContextResourceUsage ):
643+ room_metrics = self ._state_res_metrics [room_id ]
644+ room_metrics .cpu_time += rusage .ru_utime + rusage .ru_stime
645+ room_metrics .db_time += rusage .db_txn_duration_sec
646+ room_metrics .db_events += rusage .evt_db_fetch_count
647+
648+ def _report_metrics (self ):
649+ if not self ._state_res_metrics :
650+ # no state res has happened since the last iteration: don't bother logging.
651+ return
652+
653+ self ._report_biggest (
654+ lambda i : i .cpu_time , "CPU time" , _biggest_room_by_cpu_counter ,
655+ )
656+
657+ self ._report_biggest (
658+ lambda i : i .db_time , "DB time" , _biggest_room_by_db_counter ,
659+ )
660+
661+ self ._state_res_metrics .clear ()
662+
663+ def _report_biggest (
664+ self ,
665+ extract_key : Callable [[_StateResMetrics ], Any ],
666+ metric_name : str ,
667+ prometheus_counter_metric : Counter ,
668+ ) -> None :
669+ """Report metrics on the biggest rooms for state res
670+
671+ Args:
672+ extract_key: a callable which, given a _StateResMetrics, extracts a single
673+ metric to sort by.
674+ metric_name: the name of the metric we have extracted, for the log line
675+ prometheus_counter_metric: a prometheus metric recording the sum of the
676+ the extracted metric
677+ """
678+ n_to_log = 10
679+ if not metrics_logger .isEnabledFor (logging .DEBUG ):
680+ # only need the most expensive if we don't have debug logging, which
681+ # allows nlargest() to degrade to max()
682+ n_to_log = 1
683+
684+ items = self ._state_res_metrics .items ()
685+
686+ # log the N biggest rooms
687+ biggest = heapq .nlargest (
688+ n_to_log , items , key = lambda i : extract_key (i [1 ])
689+ ) # type: List[Tuple[str, _StateResMetrics]]
690+ metrics_logger .debug (
691+ "%i biggest rooms for state-res by %s: %s" ,
692+ len (biggest ),
693+ metric_name ,
694+ ["%s (%gs)" % (r , extract_key (m )) for (r , m ) in biggest ],
695+ )
696+
697+ # report info on the single biggest to prometheus
698+ _ , biggest_metrics = biggest [0 ]
699+ prometheus_counter_metric .inc (extract_key (biggest_metrics ))
596700
597701
598702def _make_state_cache_entry (
0 commit comments