Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit a47ee14

Browse files
committed
add a function to store state groups for batched events/contexts
1 parent 03bccd5 commit a47ee14

File tree

1 file changed

+107
-5
lines changed

1 file changed

+107
-5
lines changed

synapse/storage/databases/state/store.py

Lines changed: 107 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import attr
1919

2020
from synapse.api.constants import EventTypes
21+
from synapse.events import EventBase
22+
from synapse.events.snapshot import EventContext
2123
from synapse.storage._base import SQLBaseStore
2224
from synapse.storage.database import (
2325
DatabasePool,
@@ -404,6 +406,111 @@ def _insert_into_cache(
404406
fetched_keys=non_member_types,
405407
)
406408

409+
async def store_state_deltas_for_batched(
410+
self,
411+
events_and_context: List[Tuple[EventBase, EventContext]],
412+
room_id: str,
413+
prev_group: int,
414+
) -> List[int]:
415+
"""Generate and store state deltas for a group of events and contexts created to be
416+
batch persisted.
417+
418+
Args:
419+
events_and_context: the events to generate and store a state groups for
420+
and their associated contexts
421+
room_id: the id of the room the events were created for
422+
prev_group: the state group of the last event persisted before the batched events
423+
were created
424+
"""
425+
426+
def insert_deltas_group_txn(
427+
txn: LoggingTransaction,
428+
events_and_context: List[Tuple[EventBase, EventContext]],
429+
prev_group: int,
430+
) -> List[int]:
431+
"""Generate and store state groups for the provided events and contexts.
432+
433+
Requires that we have the state as a delta from the last persisted state group.
434+
435+
Returns:
436+
A list of state groups
437+
"""
438+
is_in_db = self.db_pool.simple_select_one_onecol_txn(
439+
txn,
440+
table="state_groups",
441+
keyvalues={"id": prev_group},
442+
retcol="id",
443+
allow_none=True,
444+
)
445+
if not is_in_db:
446+
raise Exception(
447+
"Trying to persist state with unpersisted prev_group: %r"
448+
% (prev_group,)
449+
)
450+
451+
num_state_groups = len(events_and_context)
452+
453+
state_groups = self._state_group_seq_gen.get_next_mult_txn(
454+
txn, num_state_groups
455+
)
456+
457+
index = 0
458+
for event, context in events_and_context:
459+
context._state_group = state_groups[index]
460+
# The first prev_group will be the last persisted state group, which is passed in
461+
# else it will be the group most recently assigned
462+
if index > 0:
463+
context.prev_group = state_groups[index - 1]
464+
context.state_group_before_event = state_groups[index - 1]
465+
else:
466+
context.prev_group = prev_group
467+
context.state_group_before_event = prev_group
468+
context.delta_ids = {(event.type, event.state_key): event.event_id}
469+
context._state_delta_due_to_event = {
470+
(event.type, event.state_key): event.event_id
471+
}
472+
index += 1
473+
474+
self.db_pool.simple_insert_many_txn(
475+
txn,
476+
table="state_groups",
477+
keys=("id", "room_id", "event_id"),
478+
values=[
479+
(context._state_group, room_id, event.event_id)
480+
for event, context in events_and_context
481+
],
482+
)
483+
484+
self.db_pool.simple_insert_many_txn(
485+
txn,
486+
table="state_group_edges",
487+
keys=("state_group", "prev_state_group"),
488+
values=[
489+
(context._state_group, context.prev_group)
490+
for _, context in events_and_context
491+
],
492+
)
493+
494+
for _, context in events_and_context:
495+
assert context.delta_ids is not None
496+
self.db_pool.simple_insert_many_txn(
497+
txn,
498+
table="state_groups_state",
499+
keys=("state_group", "room_id", "type", "state_key", "event_id"),
500+
values=[
501+
(context._state_group, room_id, key[0], key[1], state_id)
502+
for key, state_id in context.delta_ids.items()
503+
],
504+
)
505+
return state_groups
506+
507+
return await self.db_pool.runInteraction(
508+
"store_state_deltas_for_batched.insert_deltas_group",
509+
insert_deltas_group_txn,
510+
events_and_context,
511+
prev_group,
512+
)
513+
407514
async def store_state_group(
408515
self,
409516
event_id: str,
@@ -413,10 +520,8 @@ async def store_state_group(
413520
current_state_ids: Optional[StateMap[str]],
414521
) -> int:
415522
"""Store a new set of state, returning a newly assigned state group.
416-
417523
At least one of `current_state_ids` and `prev_group` must be provided. Whenever
418524
`prev_group` is not None, `delta_ids` must also not be None.
419-
420525
Args:
421526
event_id: The event ID for which the state was calculated
422527
room_id
@@ -426,7 +531,6 @@ async def store_state_group(
426531
`current_state_ids`.
427532
current_state_ids: The state to store. Map of (type, state_key)
428533
to event_id.
429-
430534
Returns:
431535
The state group ID
432536
"""
@@ -441,9 +545,7 @@ def insert_delta_group_txn(
441545
txn: LoggingTransaction, prev_group: int, delta_ids: StateMap[str]
442546
) -> Optional[int]:
443547
"""Try and persist the new group as a delta.
444-
445548
Requires that we have the state as a delta from a previous state group.
446-
447549
Returns:
448550
The state group if successfully created, or None if the state
449551
needs to be persisted as a full state.

0 commit comments

Comments
 (0)