Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit c05e43b

Browse files
committed
Add inserted historical messages to /backfill response
1 parent baae5d8 commit c05e43b

File tree

3 files changed

+150
-32
lines changed

3 files changed

+150
-32
lines changed

synapse/storage/databases/main/event_federation.py

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -673,11 +673,11 @@ async def get_oldest_events_with_depth_in_room(self, room_id):
673673
)
674674

675675
def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
676-
sqlAsdf = "SELECT * FROM insertion_event_extremeties as i"
676+
sqlAsdf = "SELECT * FROM insertion_event_edges as i"
677677
txn.execute(sqlAsdf)
678678
logger.info("wfeafewawafeawg %s", dict(txn))
679679

680-
sqlAsdf = "SELECT * FROM insertion_event_extremeties as i WHERE i.room_id = ?"
680+
sqlAsdf = "SELECT * FROM insertion_event_edges as i WHERE i.room_id = ?"
681681
txn.execute(sqlAsdf, (room_id,))
682682
logger.info("awfeawefw %s", dict(txn))
683683

@@ -688,7 +688,7 @@ def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
688688
# " INNER JOIN event_backward_extremities as b"
689689
# " ON g.prev_event_id = b.event_id"
690690
# TODO
691-
" INNER JOIN insertion_event_extremeties as i"
691+
" INNER JOIN insertion_event_edges as i"
692692
" ON e.event_id = i.insertion_prev_event_id"
693693
" WHERE i.room_id = ?"
694694
" GROUP BY i.insertion_event_id"
@@ -703,7 +703,7 @@ def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
703703
" INNER JOIN event_backward_extremities as b"
704704
" ON g.prev_event_id = b.event_id"
705705
# TODO
706-
# " INNER JOIN insertion_event_extremeties as i"
706+
# " INNER JOIN insertion_event_edges as i"
707707
# " ON g.event_id = i.insertion_prev_event_id"
708708
" WHERE b.room_id = ? AND g.is_state is ?"
709709
" GROUP BY b.event_id"
@@ -961,16 +961,50 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
961961
# We want to make sure that we do a breadth-first, "depth" ordered
962962
# search.
963963

964-
# TODO
964+
# Look for the prev_event_id connected to the given event_id
965965
query = (
966966
"SELECT depth, prev_event_id FROM event_edges"
967+
# Get the depth of the prev_event_id from the events table
967968
" INNER JOIN events"
968969
" ON prev_event_id = events.event_id"
970+
# Find an event which matches the given event_id
969971
" WHERE event_edges.event_id = ?"
970972
" AND event_edges.is_state = ?"
971973
" LIMIT ?"
972974
)
973975

976+
# Look for the "insertion" events connected to the given event_id
977+
# TODO: Do we need to worry about selecting only from the given room_id? The other query above doesn't
978+
connected_insertion_event_query = (
979+
"SELECT e.depth, i.insertion_event_id FROM insertion_event_edges AS i"
980+
# Get the depth of the insertion event from the events table
981+
" INNER JOIN events AS e"
982+
" ON e.event_id = i.insertion_event_id"
983+
# Find an insertion event which points via prev_events to the given event_id
984+
" WHERE i.insertion_prev_event_id = ?"
985+
" LIMIT ?"
986+
)
987+
988+
# Find any chunk connections of a given insertion event
989+
# TODO: Do we need to worry about selecting only from the given room_id? The other query above doesn't
990+
chunk_connection_query = (
991+
"SELECT e.depth, c.event_id FROM insertion_events AS i"
992+
# Find the chunk that connects to the given insertion event
993+
" INNER JOIN chunk_edges AS c"
994+
" ON i.next_chunk_id = c.chunk_id"
995+
# Get the depth of the chunk start event from the events table
996+
" INNER JOIN events AS e"
997+
" ON e.event_id = c.event_id"
998+
# Find an insertion event which matches the given event_id
999+
" WHERE i.insertion_event_id = ?"
1000+
" LIMIT ?"
1001+
)
1002+
1003+
# In a PriorityQueue, the lowest valued entries are retrieved first.
1004+
# We're using depth as the priority in the queue.
1005+
# Depth is lowest at the oldest-in-time message and highest and
1006+
# newest-in-time message. We add events to the queue with a negative depth so that
1007+
# we process the newest-in-time messages first going backwards in time.
9741008
queue = PriorityQueue()
9751009

9761010
for event_id in event_list:
@@ -996,9 +1030,36 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
9961030

9971031
event_results.add(event_id)
9981032

1033+
txn.execute(
1034+
connected_insertion_event_query, (event_id, limit - len(event_results))
1035+
)
1036+
connected_insertion_event_id_results = list(txn)
1037+
logger.info(
1038+
"connected_insertion_event_query %s",
1039+
connected_insertion_event_id_results,
1040+
)
1041+
for row in connected_insertion_event_id_results:
1042+
if row[1] not in event_results:
1043+
queue.put((-row[0], row[1]))
1044+
1045+
# Find any chunk connections for the given insertion event
1046+
txn.execute(
1047+
chunk_connection_query, (row[1], limit - len(event_results))
1048+
)
1049+
chunk_start_event_id_results = list(txn)
1050+
logger.info(
1051+
"chunk_start_event_id_results %s",
1052+
chunk_start_event_id_results,
1053+
)
1054+
for row in chunk_start_event_id_results:
1055+
if row[1] not in event_results:
1056+
queue.put((-row[0], row[1]))
1057+
9991058
txn.execute(query, (event_id, False, limit - len(event_results)))
1059+
prev_event_id_results = list(txn)
1060+
logger.info("prev_event_ids %s", prev_event_id_results)
10001061

1001-
for row in txn:
1062+
for row in prev_event_id_results:
10021063
if row[1] not in event_results:
10031064
queue.put((-row[0], row[1]))
10041065

synapse/storage/databases/main/events.py

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1506,6 +1506,7 @@ def _update_metadata_tables_txn(
15061506

15071507
self._handle_insertion_event(txn, event)
15081508
self._handle_marker_event(txn, event)
1509+
self._handle_chunk_id(txn, event)
15091510

15101511
# Store the labels for this event.
15111512
labels = event.content.get(EventContentFields.LABELS)
@@ -1773,10 +1774,27 @@ def _handle_insertion_event(self, txn, event):
17731774

17741775
logger.info("_handle_insertion_event %s", event)
17751776

1777+
next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
1778+
if next_chunk_id is None:
1779+
# Invalid insertion event without next chunk ID
1780+
return
1781+
1782+
# Keep track of the insertion event and the chunk ID
1783+
self.db_pool.simple_insert_txn(
1784+
txn,
1785+
table="insertion_events",
1786+
values={
1787+
"insertion_event_id": event.event_id,
1788+
"room_id": event.room_id,
1789+
"next_chunk_id": next_chunk_id,
1790+
},
1791+
)
1792+
1793+
# Insert an edge for every prev_event connection
17761794
for prev_event_id in event.prev_events:
17771795
self.db_pool.simple_insert_txn(
17781796
txn,
1779-
table="insertion_event_extremeties",
1797+
table="insertion_event_edges",
17801798
values={
17811799
"insertion_event_id": event.event_id,
17821800
"room_id": event.room_id,
@@ -1798,26 +1816,55 @@ def _handle_marker_event(self, txn, event):
17981816

17991817
logger.info("_handle_marker_event %s", event)
18001818

1801-
insertion_event_id = event.content.get(
1802-
EventContentFields.MSC2716_MARKER_INSERTION
1803-
)
1804-
insertion_prev_event_ids = event.content.get(
1805-
EventContentFields.MSC2716_MARKER_INSERTION_PREV_EVENTS
1806-
)
1807-
if not insertion_event_id or not insertion_prev_event_ids:
1808-
# Invalid marker event
1819+
# TODO: We should attempt to backfill the insertion event instead
1820+
# of trying to pack all of the info in the marker event. Otherwise,
1821+
# we need to pack in the insertion_prev_events and insertion_next_chunk_id.
1822+
1823+
# insertion_event_id = event.content.get(
1824+
# EventContentFields.MSC2716_MARKER_INSERTION
1825+
# )
1826+
# insertion_prev_event_ids = event.content.get(
1827+
# EventContentFields.MSC2716_MARKER_INSERTION_PREV_EVENTS
1828+
# )
1829+
# if not insertion_event_id or not insertion_prev_event_ids:
1830+
# # Invalid marker event
1831+
# return
1832+
1833+
# for prev_event_id in insertion_prev_event_ids:
1834+
# self.db_pool.simple_insert_txn(
1835+
# txn,
1836+
# table="insertion_event_edges",
1837+
# values={
1838+
# "insertion_event_id": insertion_event_id,
1839+
# "room_id": event.room_id,
1840+
# "insertion_prev_event_id": prev_event_id,
1841+
# },
1842+
# )
1843+
1844+
def _handle_chunk_id(self, txn, event):
1845+
"""Handles inserting the chunk connections between the event at the
1846+
start of a chunk and an insertion event
1847+
1848+
Args: txn event (EventBase)
1849+
"""
1850+
1851+
chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
1852+
if chunk_id is None:
1853+
# No chunk connection to persist
18091854
return
18101855

1811-
for prev_event_id in insertion_prev_event_ids:
1812-
self.db_pool.simple_insert_txn(
1813-
txn,
1814-
table="insertion_event_extremeties",
1815-
values={
1816-
"insertion_event_id": insertion_event_id,
1817-
"room_id": event.room_id,
1818-
"insertion_prev_event_id": prev_event_id,
1819-
},
1820-
)
1856+
logger.info("_handle_chunk_id %s %s", chunk_id, event)
1857+
1858+
# Keep track of the insertion event and the chunk ID
1859+
self.db_pool.simple_insert_txn(
1860+
txn,
1861+
table="chunk_edges",
1862+
values={
1863+
"event_id": event.event_id,
1864+
"room_id": event.room_id,
1865+
"chunk_id": chunk_id,
1866+
},
1867+
)
18211868

18221869
def _handle_redaction(self, txn, redacted_event_id):
18231870
"""Handles receiving a redaction and checking whether we need to remove

synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,33 @@
1818
-- and we hit an event which matches `insertion_prev_event_id`, it should backfill
1919
-- the "insertion" event and start navigating from there.
2020

21+
CREATE TABLE IF NOT EXISTS insertion_events(
22+
insertion_event_id TEXT NOT NULL,
23+
room_id TEXT NOT NULL,
24+
next_chunk_id TEXT NOT NULL,
25+
UNIQUE (insertion_event_id, room_id, next_chunk_id)
26+
);
27+
28+
CREATE INDEX IF NOT EXISTS insertion_events_insertion_room_id ON insertion_events(room_id);
29+
CREATE INDEX IF NOT EXISTS insertion_events_insertion_event_id ON insertion_events(insertion_event_id);
30+
CREATE INDEX IF NOT EXISTS insertion_events_next_chunk_id ON insertion_events(next_chunk_id);
2131

22-
CREATE TABLE IF NOT EXISTS insertion_event_extremeties(
32+
CREATE TABLE IF NOT EXISTS insertion_event_edges(
2333
insertion_event_id TEXT NOT NULL,
2434
room_id TEXT NOT NULL,
2535
insertion_prev_event_id TEXT NOT NULL,
26-
UNIQUE (insertion_event_id, room_id, room_id, insertion_prev_event_id)
36+
UNIQUE (insertion_event_id, room_id, insertion_prev_event_id)
2737
);
2838

29-
CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_room_id ON insertion_event_extremeties(room_id);
30-
CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_event_id ON insertion_event_extremeties(insertion_event_id);
31-
CREATE INDEX IF NOT EXISTS insertion_event_extremeties_insertion_prev_event_id ON insertion_event_extremeties(insertion_prev_event_id);
39+
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id);
40+
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_event_id ON insertion_event_edges(insertion_event_id);
41+
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id);
3242

33-
CREATE TABLE IF NOT EXISTS chunk_connections(
43+
CREATE TABLE IF NOT EXISTS chunk_edges(
3444
event_id TEXT NOT NULL,
3545
room_id TEXT NOT NULL,
3646
chunk_id TEXT NOT NULL,
3747
UNIQUE (event_id, room_id)
3848
);
3949

40-
CREATE INDEX IF NOT EXISTS chunk_connections_insertion_chunk_id ON chunk_connections(chunk_id);
50+
CREATE INDEX IF NOT EXISTS chunk_edges_chunk_id ON chunk_edges(chunk_id);

0 commit comments

Comments
 (0)