Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit d2ac767

Browse files
authored
Convert ReadWriteLock to async/await. (#8202)
1 parent b4826d6 commit d2ac767

File tree

4 files changed

+39
-33
lines changed

4 files changed

+39
-33
lines changed

changelog.d/8202.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Convert various parts of the codebase to async/await.

synapse/handlers/pagination.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
import logging
17+
from typing import Any, Dict, Optional
1718

1819
from twisted.python.failure import Failure
1920

2021
from synapse.api.constants import EventTypes, Membership
2122
from synapse.api.errors import SynapseError
23+
from synapse.api.filtering import Filter
2224
from synapse.logging.context import run_in_background
2325
from synapse.metrics.background_process_metrics import run_as_background_process
2426
from synapse.storage.state import StateFilter
25-
from synapse.types import RoomStreamToken
27+
from synapse.streams.config import PaginationConfig
28+
from synapse.types import Requester, RoomStreamToken
2629
from synapse.util.async_helpers import ReadWriteLock
2730
from synapse.util.stringutils import random_string
2831
from synapse.visibility import filter_events_for_client
@@ -247,15 +250,16 @@ def start_purge_history(self, room_id, token, delete_local_events=False):
247250
)
248251
return purge_id
249252

250-
async def _purge_history(self, purge_id, room_id, token, delete_local_events):
253+
async def _purge_history(
254+
self, purge_id: str, room_id: str, token: str, delete_local_events: bool
255+
) -> None:
251256
"""Carry out a history purge on a room.
252257
253258
Args:
254-
purge_id (str): The id for this purge
255-
room_id (str): The room to purge from
256-
token (str): topological token to delete events before
257-
delete_local_events (bool): True to delete local events as well as
258-
remote ones
259+
purge_id: The id for this purge
260+
room_id: The room to purge from
261+
token: topological token to delete events before
262+
delete_local_events: True to delete local events as well as remote ones
259263
"""
260264
self._purges_in_progress_by_room.add(room_id)
261265
try:
@@ -291,9 +295,9 @@ def get_purge_status(self, purge_id):
291295
"""
292296
return self._purges_by_id.get(purge_id)
293297

294-
async def purge_room(self, room_id):
298+
async def purge_room(self, room_id: str) -> None:
295299
"""Purge the given room from the database"""
296-
with (await self.pagination_lock.write(room_id)):
300+
with await self.pagination_lock.write(room_id):
297301
# check we know about the room
298302
await self.store.get_room_version_id(room_id)
299303

@@ -307,23 +311,22 @@ async def purge_room(self, room_id):
307311

308312
async def get_messages(
309313
self,
310-
requester,
311-
room_id=None,
312-
pagin_config=None,
313-
as_client_event=True,
314-
event_filter=None,
315-
):
314+
requester: Requester,
315+
room_id: Optional[str] = None,
316+
pagin_config: Optional[PaginationConfig] = None,
317+
as_client_event: bool = True,
318+
event_filter: Optional[Filter] = None,
319+
) -> Dict[str, Any]:
316320
"""Get messages in a room.
317321
318322
Args:
319-
requester (Requester): The user requesting messages.
320-
room_id (str): The room they want messages from.
321-
pagin_config (synapse.api.streams.PaginationConfig): The pagination
322-
config rules to apply, if any.
323-
as_client_event (bool): True to get events in client-server format.
324-
event_filter (Filter): Filter to apply to results or None
323+
requester: The user requesting messages.
324+
room_id: The room they want messages from.
325+
pagin_config: The pagination config rules to apply, if any.
326+
as_client_event: True to get events in client-server format.
327+
event_filter: Filter to apply to results or None
325328
Returns:
326-
dict: Pagination API results
329+
Pagination API results
327330
"""
328331
user_id = requester.user.to_string()
329332

@@ -343,7 +346,7 @@ async def get_messages(
343346

344347
source_config = pagin_config.get_source_config("room")
345348

346-
with (await self.pagination_lock.read(room_id)):
349+
with await self.pagination_lock.read(room_id):
347350
(
348351
membership,
349352
member_event_id,

synapse/util/async_helpers.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from typing import Dict, Sequence, Set, Union
2121

2222
import attr
23+
from typing_extensions import ContextManager
2324

2425
from twisted.internet import defer
2526
from twisted.internet.defer import CancelledError
@@ -338,11 +339,11 @@ def eb(e):
338339

339340

340341
class ReadWriteLock(object):
341-
"""A deferred style read write lock.
342+
"""An async read write lock.
342343
343344
Example:
344345
345-
with (yield read_write_lock.read("test_key")):
346+
with await read_write_lock.read("test_key"):
346347
# do some work
347348
"""
348349

@@ -365,8 +366,7 @@ def __init__(self):
365366
# Latest writer queued
366367
self.key_to_current_writer = {} # type: Dict[str, defer.Deferred]
367368

368-
@defer.inlineCallbacks
369-
def read(self, key):
369+
async def read(self, key: str) -> ContextManager:
370370
new_defer = defer.Deferred()
371371

372372
curr_readers = self.key_to_current_readers.setdefault(key, set())
@@ -376,7 +376,8 @@ def read(self, key):
376376

377377
# We wait for the latest writer to finish writing. We can safely ignore
378378
# any existing readers... as they're readers.
379-
yield make_deferred_yieldable(curr_writer)
379+
if curr_writer:
380+
await make_deferred_yieldable(curr_writer)
380381

381382
@contextmanager
382383
def _ctx_manager():
@@ -388,8 +389,7 @@ def _ctx_manager():
388389

389390
return _ctx_manager()
390391

391-
@defer.inlineCallbacks
392-
def write(self, key):
392+
async def write(self, key: str) -> ContextManager:
393393
new_defer = defer.Deferred()
394394

395395
curr_readers = self.key_to_current_readers.get(key, set())
@@ -405,7 +405,7 @@ def write(self, key):
405405
curr_readers.clear()
406406
self.key_to_current_writer[key] = new_defer
407407

408-
yield make_deferred_yieldable(defer.gatherResults(to_wait_on))
408+
await make_deferred_yieldable(defer.gatherResults(to_wait_on))
409409

410410
@contextmanager
411411
def _ctx_manager():

tests/util/test_rwlock.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
from twisted.internet import defer
1617

1718
from synapse.util.async_helpers import ReadWriteLock
1819

@@ -43,6 +44,7 @@ def test_rwlock(self):
4344
rwlock.read(key), # 5
4445
rwlock.write(key), # 6
4546
]
47+
ds = [defer.ensureDeferred(d) for d in ds]
4648

4749
self._assert_called_before_not_after(ds, 2)
4850

@@ -73,12 +75,12 @@ def test_rwlock(self):
7375
with ds[6].result:
7476
pass
7577

76-
d = rwlock.write(key)
78+
d = defer.ensureDeferred(rwlock.write(key))
7779
self.assertTrue(d.called)
7880
with d.result:
7981
pass
8082

81-
d = rwlock.read(key)
83+
d = defer.ensureDeferred(rwlock.read(key))
8284
self.assertTrue(d.called)
8385
with d.result:
8486
pass

0 commit comments

Comments
 (0)