Skip to content
This repository was archived by the owner on Apr 12, 2024. It is now read-only.

Commit cbabb31

Browse files
authored
Use async with for ID gens (#8383)
This will allow us to hit the DB after we've finished using the generated stream ID.
1 parent 916bb9d commit cbabb31

File tree

15 files changed

+144
-105
lines changed

15 files changed

+144
-105
lines changed

changelog.d/8383.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor ID generators to use `async with` syntax.

synapse/storage/databases/main/account_data.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ async def add_account_data_to_room(
339339
"""
340340
content_json = json_encoder.encode(content)
341341

342-
with await self._account_data_id_gen.get_next() as next_id:
342+
async with self._account_data_id_gen.get_next() as next_id:
343343
# no need to lock here as room_account_data has a unique constraint
344344
# on (user_id, room_id, account_data_type) so simple_upsert will
345345
# retry if there is a conflict.
@@ -387,7 +387,7 @@ async def add_account_data_for_user(
387387
"""
388388
content_json = json_encoder.encode(content)
389389

390-
with await self._account_data_id_gen.get_next() as next_id:
390+
async with self._account_data_id_gen.get_next() as next_id:
391391
# no need to lock here as account_data has a unique constraint on
392392
# (user_id, account_data_type) so simple_upsert will retry if
393393
# there is a conflict.

synapse/storage/databases/main/deviceinbox.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ def add_messages_txn(txn, now_ms, stream_id):
362362
rows.append((destination, stream_id, now_ms, edu_json))
363363
txn.executemany(sql, rows)
364364

365-
with await self._device_inbox_id_gen.get_next() as stream_id:
365+
async with self._device_inbox_id_gen.get_next() as stream_id:
366366
now_ms = self.clock.time_msec()
367367
await self.db_pool.runInteraction(
368368
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
@@ -411,7 +411,7 @@ def add_messages_txn(txn, now_ms, stream_id):
411411
txn, stream_id, local_messages_by_user_then_device
412412
)
413413

414-
with await self._device_inbox_id_gen.get_next() as stream_id:
414+
async with self._device_inbox_id_gen.get_next() as stream_id:
415415
now_ms = self.clock.time_msec()
416416
await self.db_pool.runInteraction(
417417
"add_messages_from_remote_to_device_inbox",

synapse/storage/databases/main/devices.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ async def add_user_signature_change_to_streams(
377377
THe new stream ID.
378378
"""
379379

380-
with await self._device_list_id_gen.get_next() as stream_id:
380+
async with self._device_list_id_gen.get_next() as stream_id:
381381
await self.db_pool.runInteraction(
382382
"add_user_sig_change_to_streams",
383383
self._add_user_signature_change_txn,
@@ -1093,7 +1093,7 @@ async def add_device_change_to_streams(
10931093
if not device_ids:
10941094
return
10951095

1096-
with await self._device_list_id_gen.get_next_mult(
1096+
async with self._device_list_id_gen.get_next_mult(
10971097
len(device_ids)
10981098
) as stream_ids:
10991099
await self.db_pool.runInteraction(
@@ -1108,7 +1108,7 @@ async def add_device_change_to_streams(
11081108
return stream_ids[-1]
11091109

11101110
context = get_active_span_text_map()
1111-
with await self._device_list_id_gen.get_next_mult(
1111+
async with self._device_list_id_gen.get_next_mult(
11121112
len(hosts) * len(device_ids)
11131113
) as stream_ids:
11141114
await self.db_pool.runInteraction(

synapse/storage/databases/main/end_to_end_keys.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,7 @@ async def set_e2e_cross_signing_key(self, user_id, key_type, key):
831831
key (dict): the key data
832832
"""
833833

834-
with await self._cross_signing_id_gen.get_next() as stream_id:
834+
async with self._cross_signing_id_gen.get_next() as stream_id:
835835
return await self.db_pool.runInteraction(
836836
"add_e2e_cross_signing_key",
837837
self._set_e2e_cross_signing_key_txn,

synapse/storage/databases/main/events.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,15 @@ async def _persist_events_and_state_updates(
156156
# Note: Multiple instances of this function cannot be in flight at
157157
# the same time for the same room.
158158
if backfilled:
159-
stream_ordering_manager = await self._backfill_id_gen.get_next_mult(
159+
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
160160
len(events_and_contexts)
161161
)
162162
else:
163-
stream_ordering_manager = await self._stream_id_gen.get_next_mult(
163+
stream_ordering_manager = self._stream_id_gen.get_next_mult(
164164
len(events_and_contexts)
165165
)
166166

167-
with stream_ordering_manager as stream_orderings:
167+
async with stream_ordering_manager as stream_orderings:
168168
for (event, context), stream in zip(events_and_contexts, stream_orderings):
169169
event.internal_metadata.stream_ordering = stream
170170

synapse/storage/databases/main/group_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1265,7 +1265,7 @@ def _register_user_group_membership_txn(txn, next_id):
12651265

12661266
return next_id
12671267

1268-
with await self._group_updates_id_gen.get_next() as next_id:
1268+
async with self._group_updates_id_gen.get_next() as next_id:
12691269
res = await self.db_pool.runInteraction(
12701270
"register_user_group_membership",
12711271
_register_user_group_membership_txn,

synapse/storage/databases/main/presence.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323

2424
class PresenceStore(SQLBaseStore):
2525
async def update_presence(self, presence_states):
26-
stream_ordering_manager = await self._presence_id_gen.get_next_mult(
26+
stream_ordering_manager = self._presence_id_gen.get_next_mult(
2727
len(presence_states)
2828
)
2929

30-
with stream_ordering_manager as stream_orderings:
30+
async with stream_ordering_manager as stream_orderings:
3131
await self.db_pool.runInteraction(
3232
"update_presence",
3333
self._update_presence_txn,

synapse/storage/databases/main/push_rule.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ async def add_push_rule(
338338
) -> None:
339339
conditions_json = json_encoder.encode(conditions)
340340
actions_json = json_encoder.encode(actions)
341-
with await self._push_rules_stream_id_gen.get_next() as stream_id:
341+
async with self._push_rules_stream_id_gen.get_next() as stream_id:
342342
event_stream_ordering = self._stream_id_gen.get_current_token()
343343

344344
if before or after:
@@ -585,7 +585,7 @@ def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
585585
txn, stream_id, event_stream_ordering, user_id, rule_id, op="DELETE"
586586
)
587587

588-
with await self._push_rules_stream_id_gen.get_next() as stream_id:
588+
async with self._push_rules_stream_id_gen.get_next() as stream_id:
589589
event_stream_ordering = self._stream_id_gen.get_current_token()
590590

591591
await self.db_pool.runInteraction(
@@ -616,7 +616,7 @@ async def set_push_rule_enabled(
616616
Raises:
617617
NotFoundError if the rule does not exist.
618618
"""
619-
with await self._push_rules_stream_id_gen.get_next() as stream_id:
619+
async with self._push_rules_stream_id_gen.get_next() as stream_id:
620620
event_stream_ordering = self._stream_id_gen.get_current_token()
621621
await self.db_pool.runInteraction(
622622
"_set_push_rule_enabled_txn",
@@ -754,7 +754,7 @@ def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):
754754
data={"actions": actions_json},
755755
)
756756

757-
with await self._push_rules_stream_id_gen.get_next() as stream_id:
757+
async with self._push_rules_stream_id_gen.get_next() as stream_id:
758758
event_stream_ordering = self._stream_id_gen.get_current_token()
759759

760760
await self.db_pool.runInteraction(

synapse/storage/databases/main/pusher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ async def add_pusher(
281281
last_stream_ordering,
282282
profile_tag="",
283283
) -> None:
284-
with await self._pushers_id_gen.get_next() as stream_id:
284+
async with self._pushers_id_gen.get_next() as stream_id:
285285
# no need to lock because `pushers` has a unique key on
286286
# (app_id, pushkey, user_name) so simple_upsert will retry
287287
await self.db_pool.simple_upsert(
@@ -344,7 +344,7 @@ def delete_pusher_txn(txn, stream_id):
344344
},
345345
)
346346

347-
with await self._pushers_id_gen.get_next() as stream_id:
347+
async with self._pushers_id_gen.get_next() as stream_id:
348348
await self.db_pool.runInteraction(
349349
"delete_pusher", delete_pusher_txn, stream_id
350350
)

0 commit comments

Comments
 (0)