Skip to content

Commit 33e0c25

Browse files
authored
Clean up old device_federation_inbox rows (element-hq#18546)
Fixes element-hq#17370
1 parent 73a3838 commit 33e0c25

File tree

5 files changed

+250
-1
lines changed

5 files changed

+250
-1
lines changed

changelog.d/18546.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Clean up old, unused rows from the `device_federation_inbox` table.

synapse/storage/databases/main/deviceinbox.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
start_active_span,
4343
trace,
4444
)
45+
from synapse.metrics.background_process_metrics import run_as_background_process
4546
from synapse.replication.tcp.streams import ToDeviceStream
4647
from synapse.storage._base import SQLBaseStore, db_to_json
4748
from synapse.storage.database import (
@@ -52,7 +53,7 @@
5253
)
5354
from synapse.storage.util.id_generators import MultiWriterIdGenerator
5455
from synapse.types import JsonDict
55-
from synapse.util import json_encoder
56+
from synapse.util import Duration, json_encoder
5657
from synapse.util.caches.expiringcache import ExpiringCache
5758
from synapse.util.caches.stream_change_cache import StreamChangeCache
5859
from synapse.util.stringutils import parse_and_validate_server_name
@@ -63,6 +64,18 @@
6364
logger = logging.getLogger(__name__)
6465

6566

67+
# How long to keep messages in the device federation inbox before deleting them.
68+
DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS = 7 * Duration.DAY_MS
69+
70+
# How often to run the task to clean up old device_federation_inbox rows.
71+
DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL_MS = 5 * Duration.MINUTE_MS
72+
73+
# Update name for the device federation inbox received timestamp index.
74+
DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE = (
75+
"device_federation_inbox_received_ts_index"
76+
)
77+
78+
6679
class DeviceInboxWorkerStore(SQLBaseStore):
6780
def __init__(
6881
self,
@@ -134,6 +147,14 @@ def __init__(
134147
prefilled_cache=device_outbox_prefill,
135148
)
136149

150+
if hs.config.worker.run_background_tasks:
151+
self._clock.looping_call(
152+
run_as_background_process,
153+
DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL_MS,
154+
"_delete_old_federation_inbox_rows",
155+
self._delete_old_federation_inbox_rows,
156+
)
157+
137158
def process_replication_rows(
138159
self,
139160
stream_name: str,
@@ -960,6 +981,52 @@ def _add_messages_to_local_device_inbox_txn(
960981
],
961982
)
962983

984+
async def _delete_old_federation_inbox_rows(self, batch_size: int = 1000) -> None:
985+
"""Delete old rows from the device_federation_inbox table."""
986+
987+
# We wait until we have the index on `received_ts`, otherwise the query
988+
# will take a very long time.
989+
if not await self.db_pool.updates.has_completed_background_update(
990+
DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE
991+
):
992+
return
993+
994+
def _delete_old_federation_inbox_rows_txn(txn: LoggingTransaction) -> bool:
995+
# We delete at most 100 rows that are older than
996+
# DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS
997+
delete_before_ts = (
998+
self._clock.time_msec() - DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS
999+
)
1000+
sql = """
1001+
WITH to_delete AS (
1002+
SELECT origin, message_id
1003+
FROM device_federation_inbox
1004+
WHERE received_ts < ?
1005+
ORDER BY received_ts ASC
1006+
LIMIT ?
1007+
)
1008+
DELETE FROM device_federation_inbox
1009+
WHERE
1010+
(origin, message_id) IN (
1011+
SELECT origin, message_id FROM to_delete
1012+
)
1013+
"""
1014+
txn.execute(sql, (delete_before_ts, batch_size))
1015+
return txn.rowcount < batch_size
1016+
1017+
while True:
1018+
finished = await self.db_pool.runInteraction(
1019+
"_delete_old_federation_inbox_rows",
1020+
_delete_old_federation_inbox_rows_txn,
1021+
db_autocommit=True, # We don't need to run in a transaction
1022+
)
1023+
if finished:
1024+
return
1025+
1026+
# We sleep a bit so that we don't hammer the database in a tight
1027+
# loop first time we run this.
1028+
self._clock.sleep(1)
1029+
9631030

9641031
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
9651032
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
@@ -995,6 +1062,13 @@ def __init__(
9951062
self._cleanup_device_federation_outbox,
9961063
)
9971064

1065+
self.db_pool.updates.register_background_index_update(
1066+
update_name=DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE,
1067+
index_name="device_federation_inbox_received_ts_index",
1068+
table="device_federation_inbox",
1069+
columns=["received_ts"],
1070+
)
1071+
9981072
async def _background_drop_index_device_inbox(
9991073
self, progress: JsonDict, batch_size: int
10001074
) -> int:
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
--
2+
-- This file is licensed under the Affero General Public License (AGPL) version 3.
3+
--
4+
-- Copyright (C) 2025 New Vector, Ltd
5+
--
6+
-- This program is free software: you can redistribute it and/or modify
7+
-- it under the terms of the GNU Affero General Public License as
8+
-- published by the Free Software Foundation, either version 3 of the
9+
-- License, or (at your option) any later version.
10+
--
11+
-- See the GNU Affero General Public License for more details:
12+
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
13+
14+
-- Background update that adds an index to `device_federation_inbox.received_ts`
15+
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
16+
(9206, 'device_federation_inbox_received_ts_index', '{}');

synapse/util/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@
5555
logger = logging.getLogger(__name__)
5656

5757

58+
class Duration:
59+
"""Helper class that holds constants for common time durations in
60+
milliseconds."""
61+
62+
MINUTE_MS = 60 * 1000
63+
HOUR_MS = 60 * MINUTE_MS
64+
DAY_MS = 24 * HOUR_MS
65+
66+
5867
def _reject_invalid_json(val: Any) -> None:
5968
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
6069
raise ValueError("Invalid JSON value: '%s'" % val)

tests/storage/databases/main/test_deviceinbox.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,17 @@
1919
#
2020
#
2121

22+
from unittest.mock import patch
23+
24+
from twisted.internet import defer
2225
from twisted.test.proto_helpers import MemoryReactor
2326

2427
from synapse.rest import admin
2528
from synapse.rest.client import devices
2629
from synapse.server import HomeServer
30+
from synapse.storage.databases.main.deviceinbox import (
31+
DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS,
32+
)
2733
from synapse.util import Clock
2834

2935
from tests.unittest import HomeserverTestCase
@@ -172,3 +178,146 @@ def test_background_remove_hidden_devices_from_device_inbox(self) -> None:
172178
)
173179
self.assertEqual(1, len(res))
174180
self.assertEqual(res[0], "cur_device")
181+
182+
183+
class DeviceInboxFederationInboxCleanupTestCase(HomeserverTestCase):
184+
servlets = [
185+
admin.register_servlets,
186+
devices.register_servlets,
187+
]
188+
189+
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
190+
self.store = hs.get_datastores().main
191+
self.db_pool = self.store.db_pool
192+
193+
# Advance time to ensure we are past the cleanup delay
194+
self.reactor.advance(DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS * 2 / 1000)
195+
196+
def test_delete_old_federation_inbox_rows_skips_if_no_index(self) -> None:
197+
"""Test that we don't delete rows if the index hasn't been created yet."""
198+
199+
# Insert some test data into device_federation_inbox
200+
for i in range(5):
201+
self.get_success(
202+
self.db_pool.simple_insert(
203+
"device_federation_inbox",
204+
{
205+
"origin": "example.com",
206+
"message_id": f"msg_{i}",
207+
"received_ts": 0,
208+
},
209+
)
210+
)
211+
212+
# Mock to report the update as not completed
213+
with patch(
214+
"synapse.storage.background_updates.BackgroundUpdater.has_completed_background_update"
215+
) as mock:
216+
mock.return_value = False
217+
218+
self.get_success(self.store._delete_old_federation_inbox_rows())
219+
220+
# Check that no rows were deleted
221+
rows = self.get_success(
222+
self.db_pool.simple_select_list(
223+
"device_federation_inbox",
224+
keyvalues={},
225+
retcols=["origin", "message_id", "received_ts"],
226+
)
227+
)
228+
self.assertEqual(
229+
len(rows), 5, "Expected no rows to be deleted when index is missing"
230+
)
231+
232+
def test_delete_old_federation_inbox_rows(self) -> None:
233+
"""Test that old rows are deleted from device_federation_inbox."""
234+
235+
# Insert old messages
236+
for i in range(5):
237+
self.get_success(
238+
self.db_pool.simple_insert(
239+
"device_federation_inbox",
240+
{
241+
"origin": "old.example.com",
242+
"message_id": f"old_msg_{i}",
243+
"received_ts": self.clock.time_msec(),
244+
},
245+
)
246+
)
247+
248+
self.reactor.advance(2 * DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS / 1000)
249+
250+
# Insert new messages
251+
for i in range(5):
252+
self.get_success(
253+
self.db_pool.simple_insert(
254+
"device_federation_inbox",
255+
{
256+
"origin": "new.example.com",
257+
"message_id": f"new_msg_{i}",
258+
"received_ts": self.clock.time_msec(),
259+
},
260+
)
261+
)
262+
263+
# Run the cleanup
264+
self.get_success(self.store._delete_old_federation_inbox_rows())
265+
266+
# Check that only old messages were deleted
267+
rows = self.get_success(
268+
self.db_pool.simple_select_onecol(
269+
"device_federation_inbox",
270+
keyvalues={},
271+
retcol="origin",
272+
)
273+
)
274+
275+
self.assertEqual(len(rows), 5, "Expected only 5 new messages to remain")
276+
for origin in rows:
277+
self.assertEqual(origin, "new.example.com")
278+
279+
def test_delete_old_federation_inbox_rows_batch_limit(self) -> None:
280+
"""Test that the deletion happens in batches."""
281+
282+
# Insert 10 old messages (more than the 5 batch limit)
283+
for i in range(10):
284+
self.get_success(
285+
self.db_pool.simple_insert(
286+
"device_federation_inbox",
287+
{
288+
"origin": "old.example.com",
289+
"message_id": f"old_msg_{i}",
290+
"received_ts": self.clock.time_msec(),
291+
},
292+
)
293+
)
294+
295+
# Advance time to ensure we are past the cleanup delay
296+
self.reactor.advance(2 * DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS / 1000)
297+
298+
# Run the cleanup - it should delete in batches and sleep between them
299+
deferred = defer.ensureDeferred(
300+
self.store._delete_old_federation_inbox_rows(batch_size=5)
301+
)
302+
303+
# Check that the deferred doesn't resolve immediately
304+
self.assertFalse(deferred.called)
305+
306+
# Advance the reactor to allow the cleanup to continue and complete
307+
self.reactor.advance(2)
308+
self.get_success(deferred)
309+
310+
# Check that all messages were deleted after multiple batches
311+
rows = self.get_success(
312+
self.db_pool.simple_select_list(
313+
"device_federation_inbox",
314+
keyvalues={},
315+
retcols=["origin", "message_id"],
316+
)
317+
)
318+
319+
self.assertEqual(
320+
len(rows),
321+
0,
322+
"Expected all messages to be deleted after multiple batches",
323+
)

0 commit comments

Comments
 (0)