Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 1b238e8

Browse files
authored
Speed up persisting large number of outliers (#16649)
Recalculating the roots tuple every iteration could be very expensive, so instead let's do a topological sort.
1 parent fef08cb commit 1b238e8

File tree

4 files changed

+134
-12
lines changed

4 files changed

+134
-12
lines changed

changelog.d/16649.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Speed up persisting large number of outliers.

synapse/handlers/federation_event.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
)
8989
from synapse.types.state import StateFilter
9090
from synapse.util.async_helpers import Linearizer, concurrently_execute
91-
from synapse.util.iterutils import batch_iter, partition
91+
from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
9292
from synapse.util.retryutils import NotRetryingDestination
9393
from synapse.util.stringutils import shortstr
9494

@@ -1669,14 +1669,13 @@ async def _auth_and_persist_outliers(
16691669

16701670
# XXX: it might be possible to kick this process off in parallel with fetching
16711671
# the events.
1672-
while event_map:
1673-
# build a list of events whose auth events are not in the queue.
1674-
roots = tuple(
1675-
ev
1676-
for ev in event_map.values()
1677-
if not any(aid in event_map for aid in ev.auth_event_ids())
1678-
)
16791672

1673+
# We need to persist an event's auth events before the event.
1674+
auth_graph = {
1675+
ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
1676+
for ev in event_map.values()
1677+
}
1678+
for roots in sorted_topologically_batched(event_map.values(), auth_graph):
16801679
if not roots:
16811680
# if *none* of the remaining events are ready, that means
16821681
# we have a loop. This either means a bug in our logic, or that
@@ -1698,9 +1697,6 @@ async def _auth_and_persist_outliers(
16981697

16991698
await self._auth_and_persist_outliers_inner(room_id, roots)
17001699

1701-
for ev in roots:
1702-
del event_map[ev.event_id]
1703-
17041700
async def _auth_and_persist_outliers_inner(
17051701
self, room_id: str, fetched_events: Collection[EventBase]
17061702
) -> None:

synapse/util/iterutils.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,54 @@ def sorted_topologically(
135135
degree_map[edge] -= 1
136136
if degree_map[edge] == 0:
137137
heapq.heappush(zero_degree, edge)
138+
139+
140+
def sorted_topologically_batched(
141+
nodes: Iterable[T],
142+
graph: Mapping[T, Collection[T]],
143+
) -> Generator[Collection[T], None, None]:
144+
r"""Walk the graph topologically, returning batches of nodes where all nodes
145+
that references it have been previously returned.
146+
147+
For example, given the following graph:
148+
149+
A
150+
/ \
151+
B C
152+
\ /
153+
D
154+
155+
This function will return: `[[A], [B, C], [D]]`.
156+
157+
This function is useful for e.g. batch persisting events in an auth chain,
158+
where we can only persist an event if all its auth events have already been
159+
persisted.
160+
"""
161+
162+
degree_map = {node: 0 for node in nodes}
163+
reverse_graph: Dict[T, Set[T]] = {}
164+
165+
for node, edges in graph.items():
166+
if node not in degree_map:
167+
continue
168+
169+
for edge in set(edges):
170+
if edge in degree_map:
171+
degree_map[node] += 1
172+
173+
reverse_graph.setdefault(edge, set()).add(node)
174+
reverse_graph.setdefault(node, set())
175+
176+
zero_degree = [node for node, degree in degree_map.items() if degree == 0]
177+
178+
while zero_degree:
179+
new_zero_degree = []
180+
for node in zero_degree:
181+
for edge in reverse_graph.get(node, []):
182+
if edge in degree_map:
183+
degree_map[edge] -= 1
184+
if degree_map[edge] == 0:
185+
new_zero_degree.append(edge)
186+
187+
yield zero_degree
188+
zero_degree = new_zero_degree

tests/util/test_itertools.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@
1313
# limitations under the License.
1414
from typing import Dict, Iterable, List, Sequence
1515

16-
from synapse.util.iterutils import chunk_seq, sorted_topologically
16+
from synapse.util.iterutils import (
17+
chunk_seq,
18+
sorted_topologically,
19+
sorted_topologically_batched,
20+
)
1721

1822
from tests.unittest import TestCase
1923

@@ -107,3 +111,73 @@ def test_multiple_paths(self) -> None:
107111
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
108112

109113
self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
114+
115+
116+
class SortTopologicallyBatched(TestCase):
117+
"Test cases for `sorted_topologically_batched`"
118+
119+
def test_empty(self) -> None:
120+
"Test that an empty graph works correctly"
121+
122+
graph: Dict[int, List[int]] = {}
123+
self.assertEqual(list(sorted_topologically_batched([], graph)), [])
124+
125+
def test_handle_empty_graph(self) -> None:
126+
"Test that a graph where a node doesn't have an entry is treated as empty"
127+
128+
graph: Dict[int, List[int]] = {}
129+
130+
# For disconnected nodes the output is simply sorted.
131+
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
132+
133+
def test_disconnected(self) -> None:
134+
"Test that a graph with no edges work"
135+
136+
graph: Dict[int, List[int]] = {1: [], 2: []}
137+
138+
# For disconnected nodes the output is simply sorted.
139+
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
140+
141+
def test_linear(self) -> None:
142+
"Test that a simple `4 -> 3 -> 2 -> 1` graph works"
143+
144+
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
145+
146+
self.assertEqual(
147+
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
148+
[[1], [2], [3], [4]],
149+
)
150+
151+
def test_subset(self) -> None:
152+
"Test that only sorting a subset of the graph works"
153+
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
154+
155+
self.assertEqual(list(sorted_topologically_batched([4, 3], graph)), [[3], [4]])
156+
157+
def test_fork(self) -> None:
158+
"Test that a forked graph works"
159+
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]}
160+
161+
# Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should
162+
# always get the same one.
163+
self.assertEqual(
164+
list(sorted_topologically_batched([4, 3, 2, 1], graph)), [[1], [2, 3], [4]]
165+
)
166+
167+
def test_duplicates(self) -> None:
168+
"Test that a graph with duplicate edges work"
169+
graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]}
170+
171+
self.assertEqual(
172+
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
173+
[[1], [2], [3], [4]],
174+
)
175+
176+
def test_multiple_paths(self) -> None:
177+
"Test that a graph with multiple paths between two nodes work"
178+
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
179+
180+
self.assertEqual(
181+
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
182+
[[1], [2], [3], [4]],
183+
)

0 commit comments

Comments
 (0)