Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 1c95ddd

Browse files
authored
Batch up storing state groups when creating new room (#14918)
1 parent 335f52d commit 1c95ddd

File tree

14 files changed

+371
-49
lines changed

14 files changed

+371
-49
lines changed

changelog.d/14918.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Batch up storing state groups when creating a new room.

synapse/events/snapshot.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
if TYPE_CHECKING:
2525
from synapse.storage.controllers import StorageControllers
26+
from synapse.storage.databases import StateGroupDataStore
2627
from synapse.storage.databases.main import DataStore
2728
from synapse.types.state import StateFilter
2829

@@ -348,6 +349,54 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
348349
partial_state: bool
349350
state_map_before_event: Optional[StateMap[str]] = None
350351

352+
@classmethod
353+
async def batch_persist_unpersisted_contexts(
354+
cls,
355+
events_and_context: List[Tuple[EventBase, "UnpersistedEventContextBase"]],
356+
room_id: str,
357+
last_known_state_group: int,
358+
datastore: "StateGroupDataStore",
359+
) -> List[Tuple[EventBase, EventContext]]:
360+
"""
361+
Takes a list of events and their associated unpersisted contexts and persists
362+
the unpersisted contexts, returning a list of events and persisted contexts.
363+
Note that all the events must be in a linear chain (ie a <- b <- c).
364+
365+
Args:
366+
events_and_context: A list of events and their unpersisted contexts
367+
room_id: the room_id for the events
368+
last_known_state_group: the last persisted state group
369+
datastore: a state datastore
370+
"""
371+
amended_events_and_context = await datastore.store_state_deltas_for_batched(
372+
events_and_context, room_id, last_known_state_group
373+
)
374+
375+
events_and_persisted_context = []
376+
for event, unpersisted_context in amended_events_and_context:
377+
if event.is_state():
378+
context = EventContext(
379+
storage=unpersisted_context._storage,
380+
state_group=unpersisted_context.state_group_after_event,
381+
state_group_before_event=unpersisted_context.state_group_before_event,
382+
state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
383+
partial_state=unpersisted_context.partial_state,
384+
prev_group=unpersisted_context.state_group_before_event,
385+
delta_ids=unpersisted_context.state_delta_due_to_event,
386+
)
387+
else:
388+
context = EventContext(
389+
storage=unpersisted_context._storage,
390+
state_group=unpersisted_context.state_group_after_event,
391+
state_group_before_event=unpersisted_context.state_group_before_event,
392+
state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
393+
partial_state=unpersisted_context.partial_state,
394+
prev_group=unpersisted_context.prev_group_for_state_group_before_event,
395+
delta_ids=unpersisted_context.delta_ids_to_state_group_before_event,
396+
)
397+
events_and_persisted_context.append((event, context))
398+
return events_and_persisted_context
399+
351400
async def get_prev_state_ids(
352401
self, state_filter: Optional["StateFilter"] = None
353402
) -> StateMap[str]:

synapse/handlers/message.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ async def create_event(
574574
state_map: Optional[StateMap[str]] = None,
575575
for_batch: bool = False,
576576
current_state_group: Optional[int] = None,
577-
) -> Tuple[EventBase, EventContext]:
577+
) -> Tuple[EventBase, UnpersistedEventContextBase]:
578578
"""
579579
Given a dict from a client, create a new event. If bool for_batch is true, will
580580
create an event using the prev_event_ids, and will create an event context for
@@ -721,8 +721,6 @@ async def create_event(
721721
current_state_group=current_state_group,
722722
)
723723

724-
context = await unpersisted_context.persist(event)
725-
726724
# In an ideal world we wouldn't need the second part of this condition. However,
727725
# this behaviour isn't spec'd yet, meaning we should be able to deactivate this
728726
# behaviour. Another reason is that this code is also evaluated each time a new
@@ -739,7 +737,7 @@ async def create_event(
739737
assert state_map is not None
740738
prev_event_id = state_map.get((EventTypes.Member, event.sender))
741739
else:
742-
prev_state_ids = await context.get_prev_state_ids(
740+
prev_state_ids = await unpersisted_context.get_prev_state_ids(
743741
StateFilter.from_types([(EventTypes.Member, None)])
744742
)
745743
prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
@@ -764,8 +762,7 @@ async def create_event(
764762
)
765763

766764
self.validator.validate_new(event, self.config)
767-
768-
return event, context
765+
return event, unpersisted_context
769766

770767
async def _is_exempt_from_privacy_policy(
771768
self, builder: EventBuilder, requester: Requester
@@ -1005,7 +1002,7 @@ async def create_and_send_nonmember_event(
10051002
max_retries = 5
10061003
for i in range(max_retries):
10071004
try:
1008-
event, context = await self.create_event(
1005+
event, unpersisted_context = await self.create_event(
10091006
requester,
10101007
event_dict,
10111008
txn_id=txn_id,
@@ -1016,6 +1013,7 @@ async def create_and_send_nonmember_event(
10161013
historical=historical,
10171014
depth=depth,
10181015
)
1016+
context = await unpersisted_context.persist(event)
10191017

10201018
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
10211019
event.sender,
@@ -1190,7 +1188,6 @@ async def create_new_client_event(
11901188
if for_batch:
11911189
assert prev_event_ids is not None
11921190
assert state_map is not None
1193-
assert current_state_group is not None
11941191
auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
11951192
event = await builder.build(
11961193
prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
@@ -2046,7 +2043,7 @@ async def _send_dummy_event_for_room(self, room_id: str) -> bool:
20462043
max_retries = 5
20472044
for i in range(max_retries):
20482045
try:
2049-
event, context = await self.create_event(
2046+
event, unpersisted_context = await self.create_event(
20502047
requester,
20512048
{
20522049
"type": EventTypes.Dummy,
@@ -2055,6 +2052,7 @@ async def _send_dummy_event_for_room(self, room_id: str) -> bool:
20552052
"sender": user_id,
20562053
},
20572054
)
2055+
context = await unpersisted_context.persist(event)
20582056

20592057
event.internal_metadata.proactively_send = False
20602058

synapse/handlers/room.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
5252
from synapse.event_auth import validate_event_for_room_version
5353
from synapse.events import EventBase
54+
from synapse.events.snapshot import UnpersistedEventContext
5455
from synapse.events.utils import copy_and_fixup_power_levels_contents
5556
from synapse.handlers.relations import BundledAggregations
5657
from synapse.module_api import NOT_SPAM
@@ -211,7 +212,7 @@ async def upgrade_room(
211212
# the required power level to send the tombstone event.
212213
(
213214
tombstone_event,
214-
tombstone_context,
215+
tombstone_unpersisted_context,
215216
) = await self.event_creation_handler.create_event(
216217
requester,
217218
{
@@ -225,6 +226,9 @@ async def upgrade_room(
225226
},
226227
},
227228
)
229+
tombstone_context = await tombstone_unpersisted_context.persist(
230+
tombstone_event
231+
)
228232
validate_event_for_room_version(tombstone_event)
229233
await self._event_auth_handler.check_auth_rules_from_context(
230234
tombstone_event
@@ -1092,7 +1096,7 @@ async def create_event(
10921096
content: JsonDict,
10931097
for_batch: bool,
10941098
**kwargs: Any,
1095-
) -> Tuple[EventBase, synapse.events.snapshot.EventContext]:
1099+
) -> Tuple[EventBase, synapse.events.snapshot.UnpersistedEventContextBase]:
10961100
"""
10971101
Creates an event and associated event context.
10981102
Args:
@@ -1111,20 +1115,23 @@ async def create_event(
11111115

11121116
event_dict = create_event_dict(etype, content, **kwargs)
11131117

1114-
new_event, new_context = await self.event_creation_handler.create_event(
1118+
(
1119+
new_event,
1120+
new_unpersisted_context,
1121+
) = await self.event_creation_handler.create_event(
11151122
creator,
11161123
event_dict,
11171124
prev_event_ids=prev_event,
11181125
depth=depth,
11191126
state_map=state_map,
11201127
for_batch=for_batch,
1121-
current_state_group=current_state_group,
11221128
)
1129+
11231130
depth += 1
11241131
prev_event = [new_event.event_id]
11251132
state_map[(new_event.type, new_event.state_key)] = new_event.event_id
11261133

1127-
return new_event, new_context
1134+
return new_event, new_unpersisted_context
11281135

11291136
try:
11301137
config = self._presets_dict[preset_config]
@@ -1134,10 +1141,10 @@ async def create_event(
11341141
)
11351142

11361143
creation_content.update({"creator": creator_id})
1137-
creation_event, creation_context = await create_event(
1144+
creation_event, unpersisted_creation_context = await create_event(
11381145
EventTypes.Create, creation_content, False
11391146
)
1140-
1147+
creation_context = await unpersisted_creation_context.persist(creation_event)
11411148
logger.debug("Sending %s in new room", EventTypes.Member)
11421149
ev = await self.event_creation_handler.handle_new_client_event(
11431150
requester=creator,
@@ -1181,7 +1188,6 @@ async def create_event(
11811188
power_event, power_context = await create_event(
11821189
EventTypes.PowerLevels, pl_content, True
11831190
)
1184-
current_state_group = power_context._state_group
11851191
events_to_send.append((power_event, power_context))
11861192
else:
11871193
power_level_content: JsonDict = {
@@ -1230,14 +1236,12 @@ async def create_event(
12301236
power_level_content,
12311237
True,
12321238
)
1233-
current_state_group = pl_context._state_group
12341239
events_to_send.append((pl_event, pl_context))
12351240

12361241
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
12371242
room_alias_event, room_alias_context = await create_event(
12381243
EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
12391244
)
1240-
current_state_group = room_alias_context._state_group
12411245
events_to_send.append((room_alias_event, room_alias_context))
12421246

12431247
if (EventTypes.JoinRules, "") not in initial_state:
@@ -1246,7 +1250,6 @@ async def create_event(
12461250
{"join_rule": config["join_rules"]},
12471251
True,
12481252
)
1249-
current_state_group = join_rules_context._state_group
12501253
events_to_send.append((join_rules_event, join_rules_context))
12511254

12521255
if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
@@ -1255,7 +1258,6 @@ async def create_event(
12551258
{"history_visibility": config["history_visibility"]},
12561259
True,
12571260
)
1258-
current_state_group = visibility_context._state_group
12591261
events_to_send.append((visibility_event, visibility_context))
12601262

12611263
if config["guest_can_join"]:
@@ -1265,14 +1267,12 @@ async def create_event(
12651267
{EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
12661268
True,
12671269
)
1268-
current_state_group = guest_access_context._state_group
12691270
events_to_send.append((guest_access_event, guest_access_context))
12701271

12711272
for (etype, state_key), content in initial_state.items():
12721273
event, context = await create_event(
12731274
etype, content, True, state_key=state_key
12741275
)
1275-
current_state_group = context._state_group
12761276
events_to_send.append((event, context))
12771277

12781278
if config["encrypted"]:
@@ -1284,9 +1284,16 @@ async def create_event(
12841284
)
12851285
events_to_send.append((encryption_event, encryption_context))
12861286

1287+
datastore = self.hs.get_datastores().state
1288+
events_and_context = (
1289+
await UnpersistedEventContext.batch_persist_unpersisted_contexts(
1290+
events_to_send, room_id, current_state_group, datastore
1291+
)
1292+
)
1293+
12871294
last_event = await self.event_creation_handler.handle_new_client_event(
12881295
creator,
1289-
events_to_send,
1296+
events_and_context,
12901297
ignore_shadow_ban=True,
12911298
ratelimit=False,
12921299
)

synapse/handlers/room_batch.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ async def persist_historical_events(
327327
# Mark all events as historical
328328
event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
329329

330-
event, context = await self.event_creation_handler.create_event(
330+
event, unpersisted_context = await self.event_creation_handler.create_event(
331331
await self.create_requester_for_user_id_from_app_service(
332332
ev["sender"], app_service_requester.app_service
333333
),
@@ -345,7 +345,7 @@ async def persist_historical_events(
345345
historical=True,
346346
depth=inherited_depth,
347347
)
348-
348+
context = await unpersisted_context.persist(event)
349349
assert context._state_group
350350

351351
# Normally this is done when persisting the event but we have to

synapse/handlers/room_member.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,10 @@ async def _local_membership_update(
414414
max_retries = 5
415415
for i in range(max_retries):
416416
try:
417-
event, context = await self.event_creation_handler.create_event(
417+
(
418+
event,
419+
unpersisted_context,
420+
) = await self.event_creation_handler.create_event(
418421
requester,
419422
{
420423
"type": EventTypes.Member,
@@ -435,7 +438,7 @@ async def _local_membership_update(
435438
outlier=outlier,
436439
historical=historical,
437440
)
438-
441+
context = await unpersisted_context.persist(event)
439442
prev_state_ids = await context.get_prev_state_ids(
440443
StateFilter.from_types([(EventTypes.Member, None)])
441444
)
@@ -1944,14 +1947,18 @@ async def _generate_local_out_of_band_leave(
19441947
max_retries = 5
19451948
for i in range(max_retries):
19461949
try:
1947-
event, context = await self.event_creation_handler.create_event(
1950+
(
1951+
event,
1952+
unpersisted_context,
1953+
) = await self.event_creation_handler.create_event(
19481954
requester,
19491955
event_dict,
19501956
txn_id=txn_id,
19511957
prev_event_ids=prev_event_ids,
19521958
auth_event_ids=auth_event_ids,
19531959
outlier=True,
19541960
)
1961+
context = await unpersisted_context.persist(event)
19551962
event.internal_metadata.out_of_band_membership = True
19561963

19571964
result_event = (

0 commit comments

Comments
 (0)