Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit f056218

Browse files
authored
skip some dict munging in event persistence (#11560)
Create a new dict helper method `simple_insert_many_values_txn`, which takes raw row values, rather than {key=>value} dicts. This saves us a bunch of dict munging, and makes it easier to use generators rather than creating intermediate lists and dicts.
1 parent 86e7a6d commit f056218

File tree

3 files changed

+114
-60
lines changed

3 files changed

+114
-60
lines changed

changelog.d/11560.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Minor efficiency improvements in event persistence.

synapse/storage/database.py

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,9 @@ async def simple_insert_many(
896896
) -> None:
897897
"""Executes an INSERT query on the named table.
898898
899+
The input is given as a list of dicts, with one dict per row.
900+
Generally simple_insert_many_values should be preferred for new code.
901+
899902
Args:
900903
table: string giving the table name
901904
values: dict of new column names and values for them
@@ -909,6 +912,9 @@ def simple_insert_many_txn(
909912
) -> None:
910913
"""Executes an INSERT query on the named table.
911914
915+
The input is given as a list of dicts, with one dict per row.
916+
Generally simple_insert_many_values_txn should be preferred for new code.
917+
912918
Args:
913919
txn: The transaction to use.
914920
table: string giving the table name
@@ -933,23 +939,66 @@ def simple_insert_many_txn(
933939
if k != keys[0]:
934940
raise RuntimeError("All items must have the same keys")
935941

942+
return DatabasePool.simple_insert_many_values_txn(txn, table, keys[0], vals)
943+
944+
async def simple_insert_many_values(
945+
self,
946+
table: str,
947+
keys: Collection[str],
948+
values: Iterable[Iterable[Any]],
949+
desc: str,
950+
) -> None:
951+
"""Executes an INSERT query on the named table.
952+
953+
The input is given as a list of rows, where each row is a list of values.
954+
(Actually any iterable is fine.)
955+
956+
Args:
957+
table: string giving the table name
958+
keys: list of column names
959+
values: for each row, a list of values in the same order as `keys`
960+
desc: description of the transaction, for logging and metrics
961+
"""
962+
await self.runInteraction(
963+
desc, self.simple_insert_many_values_txn, table, keys, values
964+
)
965+
966+
@staticmethod
967+
def simple_insert_many_values_txn(
968+
txn: LoggingTransaction,
969+
table: str,
970+
keys: Collection[str],
971+
values: Iterable[Iterable[Any]],
972+
) -> None:
973+
"""Executes an INSERT query on the named table.
974+
975+
The input is given as a list of rows, where each row is a list of values.
976+
(Actually any iterable is fine.)
977+
978+
Args:
979+
txn: The transaction to use.
980+
table: string giving the table name
981+
keys: list of column names
982+
values: for each row, a list of values in the same order as `keys`
983+
"""
984+
936985
if isinstance(txn.database_engine, PostgresEngine):
937986
# We use `execute_values` as it can be a lot faster than `execute_batch`,
938987
# but it's only available on postgres.
939988
sql = "INSERT INTO %s (%s) VALUES ?" % (
940989
table,
941-
", ".join(k for k in keys[0]),
990+
", ".join(k for k in keys),
942991
)
943992

944-
txn.execute_values(sql, vals, fetch=False)
993+
txn.execute_values(sql, values, fetch=False)
945994
else:
946995
sql = "INSERT INTO %s (%s) VALUES(%s)" % (
947996
table,
948-
", ".join(k for k in keys[0]),
949-
", ".join("?" for _ in keys[0]),
997+
", ".join(k for k in keys),
998+
", ".join("?" for _ in keys),
950999
)
9511000

952-
txn.execute_batch(sql, vals)
1001+
txn.execute_batch(sql, values)
9531002

9541003
async def simple_upsert(
9551004
self,

synapse/storage/databases/main/events.py

Lines changed: 59 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from typing import (
2020
TYPE_CHECKING,
2121
Any,
22+
Collection,
2223
Dict,
2324
Generator,
2425
Iterable,
@@ -1319,14 +1320,13 @@ def _update_outliers_txn(self, txn, events_and_contexts):
13191320

13201321
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
13211322

1322-
def _store_event_txn(self, txn, events_and_contexts):
1323+
def _store_event_txn(
1324+
self,
1325+
txn: LoggingTransaction,
1326+
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
1327+
) -> None:
13231328
"""Insert new events into the event, event_json, redaction and
13241329
state_events tables.
1325-
1326-
Args:
1327-
txn (twisted.enterprise.adbapi.Connection): db connection
1328-
events_and_contexts (list[(EventBase, EventContext)]): events
1329-
we are persisting
13301330
"""
13311331

13321332
if not events_and_contexts:
@@ -1339,46 +1339,58 @@ def event_dict(event):
13391339
d.pop("redacted_because", None)
13401340
return d
13411341

1342-
self.db_pool.simple_insert_many_txn(
1342+
self.db_pool.simple_insert_many_values_txn(
13431343
txn,
13441344
table="event_json",
1345-
values=[
1346-
{
1347-
"event_id": event.event_id,
1348-
"room_id": event.room_id,
1349-
"internal_metadata": json_encoder.encode(
1350-
event.internal_metadata.get_dict()
1351-
),
1352-
"json": json_encoder.encode(event_dict(event)),
1353-
"format_version": event.format_version,
1354-
}
1345+
keys=("event_id", "room_id", "internal_metadata", "json", "format_version"),
1346+
values=(
1347+
(
1348+
event.event_id,
1349+
event.room_id,
1350+
json_encoder.encode(event.internal_metadata.get_dict()),
1351+
json_encoder.encode(event_dict(event)),
1352+
event.format_version,
1353+
)
13551354
for event, _ in events_and_contexts
1356-
],
1355+
),
13571356
)
13581357

1359-
self.db_pool.simple_insert_many_txn(
1358+
self.db_pool.simple_insert_many_values_txn(
13601359
txn,
13611360
table="events",
1362-
values=[
1363-
{
1364-
"instance_name": self._instance_name,
1365-
"stream_ordering": event.internal_metadata.stream_ordering,
1366-
"topological_ordering": event.depth,
1367-
"depth": event.depth,
1368-
"event_id": event.event_id,
1369-
"room_id": event.room_id,
1370-
"type": event.type,
1371-
"processed": True,
1372-
"outlier": event.internal_metadata.is_outlier(),
1373-
"origin_server_ts": int(event.origin_server_ts),
1374-
"received_ts": self._clock.time_msec(),
1375-
"sender": event.sender,
1376-
"contains_url": (
1377-
"url" in event.content and isinstance(event.content["url"], str)
1378-
),
1379-
}
1361+
keys=(
1362+
"instance_name",
1363+
"stream_ordering",
1364+
"topological_ordering",
1365+
"depth",
1366+
"event_id",
1367+
"room_id",
1368+
"type",
1369+
"processed",
1370+
"outlier",
1371+
"origin_server_ts",
1372+
"received_ts",
1373+
"sender",
1374+
"contains_url",
1375+
),
1376+
values=(
1377+
(
1378+
self._instance_name,
1379+
event.internal_metadata.stream_ordering,
1380+
event.depth, # topological_ordering
1381+
event.depth, # depth
1382+
event.event_id,
1383+
event.room_id,
1384+
event.type,
1385+
True, # processed
1386+
event.internal_metadata.is_outlier(),
1387+
int(event.origin_server_ts),
1388+
self._clock.time_msec(),
1389+
event.sender,
1390+
"url" in event.content and isinstance(event.content["url"], str),
1391+
)
13801392
for event, _ in events_and_contexts
1381-
],
1393+
),
13821394
)
13831395

13841396
# If we're persisting an unredacted event we go and ensure
@@ -1397,23 +1409,15 @@ def event_dict(event):
13971409
)
13981410
txn.execute(sql + clause, [False] + args)
13991411

1400-
state_events_and_contexts = [
1401-
ec for ec in events_and_contexts if ec[0].is_state()
1402-
]
1403-
1404-
state_values = []
1405-
for event, _ in state_events_and_contexts:
1406-
vals = {
1407-
"event_id": event.event_id,
1408-
"room_id": event.room_id,
1409-
"type": event.type,
1410-
"state_key": event.state_key,
1411-
}
1412-
1413-
state_values.append(vals)
1414-
1415-
self.db_pool.simple_insert_many_txn(
1416-
txn, table="state_events", values=state_values
1412+
self.db_pool.simple_insert_many_values_txn(
1413+
txn,
1414+
table="state_events",
1415+
keys=("event_id", "room_id", "type", "state_key"),
1416+
values=(
1417+
(event.event_id, event.room_id, event.type, event.state_key)
1418+
for event, _ in events_and_contexts
1419+
if event.is_state()
1420+
),
14171421
)
14181422

14191423
def _store_rejected_events_txn(self, txn, events_and_contexts):

0 commit comments

Comments
 (0)