Skip to content

Commit 16f9f93

Browse files
authored
Make deleting stale pushers a background update (matrix-org#9536)
1 parent a5daae2 commit 16f9f93

File tree

3 files changed

+55
-1
lines changed

3 files changed

+55
-1
lines changed

changelog.d/9536.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix deleting pushers when using sharded pushers.

synapse/storage/databases/main/pusher.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"
4444
self._remove_deactivated_pushers,
4545
)
4646

47+
self.db_pool.updates.register_background_update_handler(
48+
"remove_stale_pushers",
49+
self._remove_stale_pushers,
50+
)
51+
4752
def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
4853
"""JSON-decode the data in the rows returned from the `pushers` table
4954
@@ -337,6 +342,53 @@ def _delete_pushers(txn) -> int:
337342

338343
return number_deleted
339344

345+
async def _remove_stale_pushers(self, progress: dict, batch_size: int) -> int:
346+
"""A background update that deletes all pushers for logged out devices.
347+
348+
Note that we don't proacively tell the pusherpool that we've deleted
349+
these (just because its a bit off a faff to do from here), but they will
350+
get cleaned up at the next restart
351+
"""
352+
353+
last_pusher = progress.get("last_pusher", 0)
354+
355+
def _delete_pushers(txn) -> int:
356+
357+
sql = """
358+
SELECT p.id, access_token FROM pushers AS p
359+
LEFT JOIN access_tokens AS a ON (p.access_token = a.id)
360+
WHERE p.id > ?
361+
ORDER BY p.id ASC
362+
LIMIT ?
363+
"""
364+
365+
txn.execute(sql, (last_pusher, batch_size))
366+
pushers = [(row[0], row[1]) for row in txn]
367+
368+
self.db_pool.simple_delete_many_txn(
369+
txn,
370+
table="pushers",
371+
column="id",
372+
iterable=(pusher_id for pusher_id, token in pushers if token is None),
373+
keyvalues={},
374+
)
375+
376+
if pushers:
377+
self.db_pool.updates._background_update_progress_txn(
378+
txn, "remove_stale_pushers", {"last_pusher": pushers[-1][0]}
379+
)
380+
381+
return len(pushers)
382+
383+
number_deleted = await self.db_pool.runInteraction(
384+
"_remove_stale_pushers", _delete_pushers
385+
)
386+
387+
if number_deleted < batch_size:
388+
await self.db_pool.updates._end_background_update("remove_stale_pushers")
389+
390+
return number_deleted
391+
340392

341393
class PusherStore(PusherWorkerStore):
342394
def get_pushers_stream_token(self) -> int:

synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@
1616

1717
-- Delete all pushers associated with deleted devices. This is to clear up after
1818
-- a bug where they weren't correctly deleted when using workers.
19-
DELETE FROM pushers WHERE access_token NOT IN (SELECT id FROM access_tokens);
19+
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
20+
(5908, 'remove_stale_pushers', '{}');

0 commit comments

Comments
 (0)