Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .ci/scripts/calculate_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,24 @@ def set_output(key: str, value: str):

# First calculate the various sytest jobs.
#
# For each type of test we only run on bullseye on PRs
# For each type of test we only run on bookworm on PRs


sytest_tests = [
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
},
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
"postgres": "postgres",
},
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
"postgres": "multi-postgres",
"workers": "workers",
},
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
"postgres": "multi-postgres",
"workers": "workers",
"reactor": "asyncio",
Expand All @@ -127,11 +127,11 @@ def set_output(key: str, value: str):
sytest_tests.extend(
[
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
"reactor": "asyncio",
},
{
"sytest-tag": "bullseye",
"sytest-tag": "bookworm",
"postgres": "postgres",
"reactor": "asyncio",
},
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/latest_deps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ jobs:
fail-fast: false
matrix:
include:
- sytest-tag: bullseye
- sytest-tag: bookworm

- sytest-tag: bullseye
- sytest-tag: bookworm
postgres: postgres
workers: workers
redis: redis
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/twisted_trunk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,13 @@ jobs:
if: needs.check_repo.outputs.should_run_workflow == 'true'
runs-on: ubuntu-latest
container:
# We're using debian:bullseye because it uses Python 3.9 which is our minimum supported Python version.
# We're using bookworm because that's what Debian oldstable is at the time of writing.
# It uses Python 3.11, but sytest-synapse:bookworm is built with Python 3.9,
# which is our minimum supported Python version.
# This job is a canary to warn us about unreleased twisted changes that would cause problems for us if
# they were to be released immediately. For simplicity's sake (and to save CI runners) we use the oldest
# version, assuming that any incompatibilities on newer versions would also be present on the oldest.
image: matrixdotorg/sytest-synapse:bullseye
image: matrixdotorg/sytest-synapse:bookworm
volumes:
- ${{ github.workspace }}:/src

Expand Down
1 change: 1 addition & 0 deletions changelog.d/19047.doc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update the link to the Debian oldstable package for SQLite.
1 change: 1 addition & 0 deletions changelog.d/19047.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Always treat `RETURNING` as supported by SQL engines, now that the minimum-supported versions of both SQLite and Postgres support it.
2 changes: 1 addition & 1 deletion docs/deprecation_policy.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ people building from source should ensure they can fetch recent versions of Rust
(e.g. by using [rustup](https://rustup.rs/)).

The oldest supported version of SQLite is the version
[provided](https://packages.debian.org/bullseye/libsqlite3-0) by
[provided](https://packages.debian.org/oldstable/libsqlite3-0) by
[Debian oldstable](https://wiki.debian.org/DebianOldStable).


Expand Down
2 changes: 1 addition & 1 deletion docs/development/contributing_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ The following command will let you run the integration test with the most common
configuration:

```sh
$ docker run --rm -it -v /path/where/you/have/cloned/the/repository\:/src:ro -v /path/to/where/you/want/logs\:/logs matrixdotorg/sytest-synapse:bullseye
$ docker run --rm -it -v /path/where/you/have/cloned/the/repository\:/src:ro -v /path/to/where/you/want/logs\:/logs matrixdotorg/sytest-synapse:bookworm
```
(Note that the paths must be full paths! You could also write `$(realpath relative/path)` if needed.)

Expand Down
39 changes: 10 additions & 29 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,36 +1165,17 @@ def simple_insert_returning_txn(
SQLite versions that don't support it).
"""

if txn.database_engine.supports_returning:
sql = "INSERT INTO %s (%s) VALUES(%s) RETURNING %s" % (
table,
", ".join(k for k in values.keys()),
", ".join("?" for _ in values.keys()),
", ".join(k for k in returning),
)

txn.execute(sql, list(values.values()))
row = txn.fetchone()
assert row is not None
return row
else:
# For old versions of SQLite we do a standard insert and then can
# use `last_insert_rowid` to get at the row we just inserted
DatabasePool.simple_insert_txn(
txn,
table=table,
values=values,
)
txn.execute("SELECT last_insert_rowid()")
row = txn.fetchone()
assert row is not None
(rowid,) = row
sql = "INSERT INTO %s (%s) VALUES(%s) RETURNING %s" % (
table,
", ".join(k for k in values.keys()),
", ".join("?" for _ in values.keys()),
", ".join(k for k in returning),
)

row = DatabasePool.simple_select_one_txn(
txn, table=table, keyvalues={"rowid": rowid}, retcols=returning
)
assert row is not None
return row
txn.execute(sql, list(values.values()))
row = txn.fetchone()
assert row is not None
return row

async def simple_insert_many(
self,
Expand Down
35 changes: 15 additions & 20 deletions synapse/storage/databases/main/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,33 +347,28 @@ def process_target_delayed_event_txn(
EventDetails,
Optional[Timestamp],
]:
sql_cols = ", ".join(
(
"room_id",
"event_type",
"state_key",
"origin_server_ts",
"content",
"device_id",
)
)
sql_update = "UPDATE delayed_events SET is_processed = TRUE"
sql_where = "WHERE delay_id = ? AND user_localpart = ? AND NOT is_processed"
sql_args = (delay_id, user_localpart)
txn.execute(
"""
UPDATE delayed_events
SET is_processed = TRUE
WHERE delay_id = ? AND user_localpart = ?
AND NOT is_processed
RETURNING
room_id,
event_type,
state_key,
origin_server_ts,
content,
device_id
""",
(
f"{sql_update} {sql_where} RETURNING {sql_cols}"
if self.database_engine.supports_returning
else f"SELECT {sql_cols} FROM delayed_events {sql_where}"
delay_id,
user_localpart,
),
sql_args,
)
row = txn.fetchone()
if row is None:
raise NotFoundError("Delayed event not found")
elif not self.database_engine.supports_returning:
txn.execute(f"{sql_update} {sql_where}", sql_args)
assert txn.rowcount == 1

event = EventDetails(
RoomID.from_string(row[0]),
Expand Down
68 changes: 18 additions & 50 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2045,61 +2045,29 @@ async def remove_received_event_from_staging(
Returns:
The received_ts of the row that was deleted, if any.
"""
if self.db_pool.engine.supports_returning:

def _remove_received_event_from_staging_txn(
txn: LoggingTransaction,
) -> Optional[int]:
sql = """
DELETE FROM federation_inbound_events_staging
WHERE origin = ? AND event_id = ?
RETURNING received_ts
"""

txn.execute(sql, (origin, event_id))
row = cast(Optional[Tuple[int]], txn.fetchone())

if row is None:
return None

return row[0]

return await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
db_autocommit=True,
)
def _remove_received_event_from_staging_txn(
txn: LoggingTransaction,
) -> Optional[int]:
sql = """
DELETE FROM federation_inbound_events_staging
WHERE origin = ? AND event_id = ?
RETURNING received_ts
"""

else:
txn.execute(sql, (origin, event_id))
row = cast(Optional[Tuple[int]], txn.fetchone())

def _remove_received_event_from_staging_txn(
txn: LoggingTransaction,
) -> Optional[int]:
received_ts = self.db_pool.simple_select_one_onecol_txn(
txn,
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
retcol="received_ts",
allow_none=True,
)
self.db_pool.simple_delete_txn(
txn,
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
)
if row is None:
return None

return received_ts
return row[0]

return await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
)
return await self.db_pool.runInteraction(
"remove_received_event_from_staging",
_remove_received_event_from_staging_txn,
db_autocommit=True,
)

async def get_next_staged_event_id_for_room(
self,
Expand Down
32 changes: 7 additions & 25 deletions synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2544,31 +2544,13 @@ def user_delete_access_tokens_for_devices_txn(
)
args.append(user_id)

if self.database_engine.supports_returning:
sql = f"""
DELETE FROM access_tokens
WHERE {clause} AND user_id = ?
RETURNING token, id, device_id
"""
txn.execute(sql, args)
tokens_and_devices = txn.fetchall()
else:
tokens_and_devices = self.db_pool.simple_select_many_txn(
txn,
table="access_tokens",
column="device_id",
iterable=batch_device_ids,
keyvalues={"user_id": user_id},
retcols=("token", "id", "device_id"),
)

self.db_pool.simple_delete_many_txn(
txn,
table="access_tokens",
keyvalues={"user_id": user_id},
column="device_id",
values=batch_device_ids,
)
sql = f"""
DELETE FROM access_tokens
WHERE {clause} AND user_id = ?
RETURNING token, id, device_id
"""
txn.execute(sql, args)
tokens_and_devices = txn.fetchall()

self._invalidate_cache_and_stream_bulk(
txn,
Expand Down
43 changes: 12 additions & 31 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,27 +356,19 @@ async def _populate_user_directory_process_users(
def _populate_user_directory_process_users_txn(
txn: LoggingTransaction,
) -> Optional[int]:
if self.database_engine.supports_returning:
# Note: we use an ORDER BY in the SELECT to force usage of an
# index. Otherwise, postgres does a sequential scan that is
# surprisingly slow (I think due to the fact it will read/skip
# over lots of already deleted rows).
sql = f"""
DELETE FROM {TEMP_TABLE + "_users"}
WHERE user_id IN (
SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ?
)
RETURNING user_id
"""
txn.execute(sql, (batch_size,))
user_result = cast(List[Tuple[str]], txn.fetchall())
else:
sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % (
TEMP_TABLE + "_users",
str(batch_size),
# Note: we use an ORDER BY in the SELECT to force usage of an
# index. Otherwise, postgres does a sequential scan that is
# surprisingly slow (I think due to the fact it will read/skip
# over lots of already deleted rows).
sql = f"""
DELETE FROM {TEMP_TABLE + "_users"}
WHERE user_id IN (
SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ?
)
txn.execute(sql)
user_result = cast(List[Tuple[str]], txn.fetchall())
RETURNING user_id
"""
txn.execute(sql, (batch_size,))
user_result = cast(List[Tuple[str]], txn.fetchall())

if not user_result:
return None
Expand Down Expand Up @@ -435,17 +427,6 @@ def _populate_user_directory_process_users_txn(
# Actually insert the users with their profiles into the directory.
self._update_profiles_in_user_dir_txn(txn, profiles_to_insert)

# We've finished processing the users. Delete it from the table, if
# we haven't already.
if not self.database_engine.supports_returning:
self.db_pool.simple_delete_many_txn(
txn,
table=TEMP_TABLE + "_users",
column="user_id",
values=users_to_work_on,
keyvalues={},
)

# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
self.db_pool.updates._background_update_progress_txn(
Expand Down
6 changes: 0 additions & 6 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ def supports_using_any_list(self) -> bool:
"""
...

@property
@abc.abstractmethod
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
...

@abc.abstractmethod
def check_database(
self, db_conn: ConnectionType, allow_outdated_version: bool = False
Expand Down
5 changes: 0 additions & 5 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,6 @@ def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""
return True

@property
def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return True

def is_deadlock(self, error: Exception) -> bool:
if isinstance(error, psycopg2.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
Expand Down
Loading
Loading