11from __future__ import annotations
22
33import asyncio
4+ import time
45import warnings
56from functools import partial
67from typing import TYPE_CHECKING
2223
2324_logger = logistro .getLogger (__name__ )
2425
26+ PERFS_MAX = 5000 # maximum number of entries in the perf dicts
27+ TRIM_SIZE = 500 # what to save after trimming it
28+
2529
2630class UnhandledMessageWarning (UserWarning ):
2731 pass
@@ -49,6 +53,9 @@ class Broker:
4953 ]
5054 """A mapping of session id: subscription: list[futures]"""
5155
56+ write_perfs : MutableMapping [protocol .MessageKey , tuple [float , float ]]
57+ read_perfs : MutableMapping [protocol .MessageKey , float ]
58+
5259 def __init__ (self , browser : Browser , channel : ChannelInterface ) -> None :
5360 """
5461 Construct a broker for a synchronous arragenment w/ both ends.
@@ -66,6 +73,8 @@ def __init__(self, browser: Browser, channel: ChannelInterface) -> None:
6673 # if its a user task, can cancel
6774 self ._current_read_task : asyncio .Task [Any ] | None = None
6875 self .futures = {}
76+ self .write_perfs = {}
77+ self .read_perfs = {}
6978 self ._subscriptions_futures = {}
7079
7180 self ._write_lock = asyncio .Lock ()
@@ -223,6 +232,14 @@ async def read_loop() -> None: # noqa: PLR0912, PLR0915, C901
223232 raise RuntimeError (f"Couldn't find a future for key: { key } " )
224233 if not future .done ():
225234 future .set_result (response )
235+ self .read_perfs [key ] = time .perf_counter ()
236+ if len (self .write_perfs ) > PERFS_MAX :
237+ self .write_perfs = dict (
238+ list (self .write_perfs .items ())[TRIM_SIZE :],
239+ )
240+ self .read_perfs = dict (
241+ list (self .read_perfs .items ())[TRIM_SIZE :],
242+ )
226243 else :
227244 warnings .warn (
228245 f"Unhandled message type:{ response !s} " ,
@@ -237,6 +254,16 @@ async def read_loop() -> None: # noqa: PLR0912, PLR0915, C901
237254 read_task .add_done_callback (check_read_loop_error )
238255 self ._current_read_task = read_task
239256
257+ def get_perf (
258+ self ,
259+ obj : protocol .BrowserCommand ,
260+ ) -> tuple [float , float , float ]:
261+ """Get the performance tuple for a certain BrowserCommand."""
262+ key = protocol .calculate_message_key (obj )
263+ if not key :
264+ return (0 , 0 , 0 )
265+ return (* self .write_perfs [key ], self .read_perfs [key ])
266+
240267 async def write_json (
241268 self ,
242269 obj : protocol .BrowserCommand ,
@@ -254,13 +281,15 @@ async def write_json(
254281 self .futures [key ] = future
255282 _logger .debug (f"Created future: { key } { future } " )
256283 try :
284+ perf_start = time .perf_counter ()
257285 async with self ._write_lock : # this should be a queue not a lock
258286 loop = asyncio .get_running_loop ()
259287 await loop .run_in_executor (
260288 self ._executor ,
261289 self ._channel .write_json ,
262290 obj ,
263291 )
292+ self .write_perfs [key ] = (perf_start , time .perf_counter ())
264293 except (_manual_thread_pool .ExecutorClosedError , asyncio .CancelledError ) as e :
265294 if not future .cancel () or not future .cancelled ():
266295 await future # it wasn't canceled, so listen to it before raising
0 commit comments