Skip to content
This repository was archived by the owner on Mar 26, 2024. It is now read-only.

Commit 9791865

Browse files
committed
Rewrite get push actions queries (matrix-org#13597)
1 parent 84eef5b commit 9791865

File tree

2 files changed

+69
-160
lines changed

2 files changed

+69
-160
lines changed

changelog.d/13597.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Optimise push action fetching queries. Contributed by Nick @ Beeper (@fizzadar).

synapse/storage/databases/main/event_push_actions.py

Lines changed: 68 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,32 @@ def f(txn: LoggingTransaction) -> List[str]:
429429

430430
return await self.db_pool.runInteraction("get_push_action_users_in_range", f)
431431

432+
def _get_receipts_by_room_txn(
433+
self, txn: LoggingTransaction, user_id: str
434+
) -> List[Tuple[str, int]]:
435+
receipt_types_clause, args = make_in_list_sql_clause(
436+
self.database_engine,
437+
"receipt_type",
438+
(
439+
ReceiptTypes.READ,
440+
ReceiptTypes.READ_PRIVATE,
441+
ReceiptTypes.UNSTABLE_READ_PRIVATE,
442+
),
443+
)
444+
445+
sql = f"""
446+
SELECT room_id, MAX(stream_ordering)
447+
FROM receipts_linearized
448+
INNER JOIN events USING (room_id, event_id)
449+
WHERE {receipt_types_clause}
450+
AND user_id = ?
451+
GROUP BY room_id
452+
"""
453+
454+
args.extend((user_id,))
455+
txn.execute(sql, args)
456+
return cast(List[Tuple[str, int]], txn.fetchall())
457+
432458
async def get_unread_push_actions_for_user_in_range_for_http(
433459
self,
434460
user_id: str,
@@ -452,106 +478,45 @@ async def get_unread_push_actions_for_user_in_range_for_http(
452478
The list will have between 0~limit entries.
453479
"""
454480

455-
# find rooms that have a read receipt in them and return the next
456-
# push actions
457-
def get_after_receipt(
458-
txn: LoggingTransaction,
459-
) -> List[Tuple[str, str, int, str, bool]]:
460-
# find rooms that have a read receipt in them and return the next
461-
# push actions
462-
463-
receipt_types_clause, args = make_in_list_sql_clause(
464-
self.database_engine,
465-
"receipt_type",
466-
(
467-
ReceiptTypes.READ,
468-
ReceiptTypes.READ_PRIVATE,
469-
ReceiptTypes.UNSTABLE_READ_PRIVATE,
470-
),
471-
)
472-
473-
sql = f"""
474-
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
475-
ep.highlight
476-
FROM (
477-
SELECT room_id,
478-
MAX(stream_ordering) as stream_ordering
479-
FROM events
480-
INNER JOIN receipts_linearized USING (room_id, event_id)
481-
WHERE {receipt_types_clause} AND user_id = ?
482-
GROUP BY room_id
483-
) AS rl,
484-
event_push_actions AS ep
485-
WHERE
486-
ep.room_id = rl.room_id
487-
AND ep.stream_ordering > rl.stream_ordering
488-
AND ep.user_id = ?
489-
AND ep.stream_ordering > ?
490-
AND ep.stream_ordering <= ?
491-
AND ep.notif = 1
492-
ORDER BY ep.stream_ordering ASC LIMIT ?
493-
"""
494-
args.extend(
495-
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
496-
)
497-
txn.execute(sql, args)
498-
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
499-
500-
after_read_receipt = await self.db_pool.runInteraction(
501-
"get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
481+
receipts_by_room = dict(
482+
await self.db_pool.runInteraction(
483+
"get_unread_push_actions_for_user_in_range_http_receipts",
484+
self._get_receipts_by_room_txn,
485+
user_id=user_id,
486+
),
502487
)
503488

504-
# There are rooms with push actions in them but you don't have a read receipt in
505-
# them e.g. rooms you've been invited to, so get push actions for rooms which do
506-
# not have read receipts in them too.
507-
def get_no_receipt(
489+
def get_push_actions_txn(
508490
txn: LoggingTransaction,
509491
) -> List[Tuple[str, str, int, str, bool]]:
510-
receipt_types_clause, args = make_in_list_sql_clause(
511-
self.database_engine,
512-
"receipt_type",
513-
(
514-
ReceiptTypes.READ,
515-
ReceiptTypes.READ_PRIVATE,
516-
ReceiptTypes.UNSTABLE_READ_PRIVATE,
517-
),
518-
)
519-
520-
sql = f"""
521-
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
522-
ep.highlight
492+
sql = """
493+
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
523494
FROM event_push_actions AS ep
524-
INNER JOIN events AS e USING (room_id, event_id)
525495
WHERE
526-
ep.room_id NOT IN (
527-
SELECT room_id FROM receipts_linearized
528-
WHERE {receipt_types_clause} AND user_id = ?
529-
GROUP BY room_id
530-
)
531-
AND ep.user_id = ?
496+
ep.user_id = ?
532497
AND ep.stream_ordering > ?
533498
AND ep.stream_ordering <= ?
534499
AND ep.notif = 1
535500
ORDER BY ep.stream_ordering ASC LIMIT ?
536501
"""
537-
args.extend(
538-
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
539-
)
540-
txn.execute(sql, args)
502+
txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
541503
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
542504

543-
no_read_receipt = await self.db_pool.runInteraction(
544-
"get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
505+
push_actions = await self.db_pool.runInteraction(
506+
"get_unread_push_actions_for_user_in_range_http", get_push_actions_txn
545507
)
546508

547509
notifs = [
548510
HttpPushAction(
549-
event_id=row[0],
550-
room_id=row[1],
551-
stream_ordering=row[2],
552-
actions=_deserialize_action(row[3], row[4]),
511+
event_id=event_id,
512+
room_id=room_id,
513+
stream_ordering=stream_ordering,
514+
actions=_deserialize_action(actions, highlight),
553515
)
554-
for row in after_read_receipt + no_read_receipt
516+
for event_id, room_id, stream_ordering, actions, highlight in push_actions
517+
# Only include push actions with a stream ordering after any receipt, or without any
518+
# receipt present (invited to but never read rooms).
519+
if stream_ordering > receipts_by_room.get(room_id, 0)
555520
]
556521

557522
# Now sort it so it's ordered correctly, since currently it will
@@ -587,106 +552,49 @@ async def get_unread_push_actions_for_user_in_range_for_email(
587552
The list will have between 0~limit entries.
588553
"""
589554

590-
# find rooms that have a read receipt in them and return the most recent
591-
# push actions
592-
def get_after_receipt(
593-
txn: LoggingTransaction,
594-
) -> List[Tuple[str, str, int, str, bool, int]]:
595-
receipt_types_clause, args = make_in_list_sql_clause(
596-
self.database_engine,
597-
"receipt_type",
598-
(
599-
ReceiptTypes.READ,
600-
ReceiptTypes.READ_PRIVATE,
601-
ReceiptTypes.UNSTABLE_READ_PRIVATE,
602-
),
603-
)
604-
605-
sql = f"""
606-
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
607-
ep.highlight, e.received_ts
608-
FROM (
609-
SELECT room_id,
610-
MAX(stream_ordering) as stream_ordering
611-
FROM events
612-
INNER JOIN receipts_linearized USING (room_id, event_id)
613-
WHERE {receipt_types_clause} AND user_id = ?
614-
GROUP BY room_id
615-
) AS rl,
616-
event_push_actions AS ep
617-
INNER JOIN events AS e USING (room_id, event_id)
618-
WHERE
619-
ep.room_id = rl.room_id
620-
AND ep.stream_ordering > rl.stream_ordering
621-
AND ep.user_id = ?
622-
AND ep.stream_ordering > ?
623-
AND ep.stream_ordering <= ?
624-
AND ep.notif = 1
625-
ORDER BY ep.stream_ordering DESC LIMIT ?
626-
"""
627-
args.extend(
628-
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
629-
)
630-
txn.execute(sql, args)
631-
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
632-
633-
after_read_receipt = await self.db_pool.runInteraction(
634-
"get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
555+
receipts_by_room = dict(
556+
await self.db_pool.runInteraction(
557+
"get_unread_push_actions_for_user_in_range_email_receipts",
558+
self._get_receipts_by_room_txn,
559+
user_id=user_id,
560+
),
635561
)
636562

637-
# There are rooms with push actions in them but you don't have a read receipt in
638-
# them e.g. rooms you've been invited to, so get push actions for rooms which do
639-
# not have read receipts in them too.
640-
def get_no_receipt(
563+
def get_push_actions_txn(
641564
txn: LoggingTransaction,
642565
) -> List[Tuple[str, str, int, str, bool, int]]:
643-
receipt_types_clause, args = make_in_list_sql_clause(
644-
self.database_engine,
645-
"receipt_type",
646-
(
647-
ReceiptTypes.READ,
648-
ReceiptTypes.READ_PRIVATE,
649-
ReceiptTypes.UNSTABLE_READ_PRIVATE,
650-
),
651-
)
652-
653-
sql = f"""
566+
sql = """
654567
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
655568
ep.highlight, e.received_ts
656569
FROM event_push_actions AS ep
657570
INNER JOIN events AS e USING (room_id, event_id)
658571
WHERE
659-
ep.room_id NOT IN (
660-
SELECT room_id FROM receipts_linearized
661-
WHERE {receipt_types_clause} AND user_id = ?
662-
GROUP BY room_id
663-
)
664-
AND ep.user_id = ?
572+
ep.user_id = ?
665573
AND ep.stream_ordering > ?
666574
AND ep.stream_ordering <= ?
667575
AND ep.notif = 1
668576
ORDER BY ep.stream_ordering DESC LIMIT ?
669577
"""
670-
args.extend(
671-
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
672-
)
673-
txn.execute(sql, args)
578+
txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
674579
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
675580

676-
no_read_receipt = await self.db_pool.runInteraction(
677-
"get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
581+
push_actions = await self.db_pool.runInteraction(
582+
"get_unread_push_actions_for_user_in_range_email", get_push_actions_txn
678583
)
679584

680585
# Make a list of dicts from the two sets of results.
681586
notifs = [
682587
EmailPushAction(
683-
event_id=row[0],
684-
room_id=row[1],
685-
stream_ordering=row[2],
686-
actions=_deserialize_action(row[3], row[4]),
687-
received_ts=row[5],
588+
event_id=event_id,
589+
room_id=room_id,
590+
stream_ordering=stream_ordering,
591+
actions=_deserialize_action(actions, highlight),
592+
received_ts=received_ts,
688593
)
689-
for row in after_read_receipt + no_read_receipt
594+
for event_id, room_id, stream_ordering, actions, highlight, received_ts in push_actions
595+
# Only include push actions with a stream ordering after any receipt, or without any
596+
# receipt present (invited to but never read rooms).
597+
if stream_ordering > receipts_by_room.get(room_id, 0)
690598
]
691599

692600
# Now sort it so it's ordered correctly, since currently it will

0 commit comments

Comments
 (0)