Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 98c8fc6

Browse files
authored
Handle federation inbound instances being killed more gracefully (#11262)
* Make lock better handle process being killed If the process gets killed and restarted (so that it didn't have a chance to drop its locks gracefully) then there may still be locks in the DB that are for the same instance that haven't yet timed out but are safe to delete. We handle this case by a) checking if the current instance already has taken out the lock, and b) if not then ignoring locks that are for the same instance. * Periodically check for old staged events This is to protect against other instances dying and their locks timing out.
1 parent 9799c56 commit 98c8fc6

File tree

3 files changed

+27
-10
lines changed

3 files changed

+27
-10
lines changed

changelog.d/11262.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1.

synapse/federation/federation_server.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,11 @@ async def on_incoming_transaction(
213213
self._started_handling_of_staged_events = True
214214
self._handle_old_staged_events()
215215

216+
# Start a periodic check for old staged events. This is to handle
217+
# the case where locks time out, e.g. if another process gets killed
218+
# without dropping its locks.
219+
self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)
220+
216221
# keep this as early as possible to make the calculated origin ts as
217222
# accurate as possible.
218223
request_time = self._clock.time_msec()

synapse/storage/databases/main/lock.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import logging
1515
from types import TracebackType
1616
from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type
17+
from weakref import WeakValueDictionary
1718

1819
from twisted.internet.interfaces import IReactorCore
1920

@@ -61,7 +62,7 @@ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"
6162

6263
# A map from `(lock_name, lock_key)` to the token of any locks that we
6364
# think we currently hold.
64-
self._live_tokens: Dict[Tuple[str, str], str] = {}
65+
self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary()
6566

6667
# When we shut down we want to remove the locks. Technically this can
6768
# lead to a race, as we may drop the lock while we are still processing.
@@ -80,10 +81,10 @@ async def _on_shutdown(self) -> None:
8081

8182
# We need to take a copy of the tokens dict as dropping the locks will
8283
# cause the dictionary to change.
83-
tokens = dict(self._live_tokens)
84+
locks = dict(self._live_tokens)
8485

85-
for (lock_name, lock_key), token in tokens.items():
86-
await self._drop_lock(lock_name, lock_key, token)
86+
for lock in locks.values():
87+
await lock.release()
8788

8889
logger.info("Dropped locks due to shutdown")
8990

@@ -93,14 +94,21 @@ async def try_acquire_lock(self, lock_name: str, lock_key: str) -> Optional["Loc
9394
used (otherwise the lock will leak).
9495
"""
9596

97+
# Check if this process has taken out a lock and if it's still valid.
98+
lock = self._live_tokens.get((lock_name, lock_key))
99+
if lock and await lock.is_still_valid():
100+
return None
101+
96102
now = self._clock.time_msec()
97103
token = random_string(6)
98104

99105
if self.db_pool.engine.can_native_upsert:
100106

101107
def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
102108
# We take out the lock if either a) there is no row for the lock
103-
# already or b) the existing row has timed out.
109+
# already, b) the existing row has timed out, or c) the row is
110+
# for this instance (which means the process got killed and
111+
# restarted)
104112
sql = """
105113
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
106114
VALUES (?, ?, ?, ?, ?)
@@ -112,6 +120,7 @@ def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
112120
last_renewed_ts = EXCLUDED.last_renewed_ts
113121
WHERE
114122
worker_locks.last_renewed_ts < ?
123+
OR worker_locks.instance_name = EXCLUDED.instance_name
115124
"""
116125
txn.execute(
117126
sql,
@@ -148,11 +157,11 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
148157
WHERE
149158
lock_name = ?
150159
AND lock_key = ?
151-
AND last_renewed_ts < ?
160+
AND (last_renewed_ts < ? OR instance_name = ?)
152161
"""
153162
txn.execute(
154163
sql,
155-
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS),
164+
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
156165
)
157166

158167
inserted = self.db_pool.simple_upsert_txn_emulated(
@@ -179,9 +188,7 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
179188
if not did_lock:
180189
return None
181190

182-
self._live_tokens[(lock_name, lock_key)] = token
183-
184-
return Lock(
191+
lock = Lock(
185192
self._reactor,
186193
self._clock,
187194
self,
@@ -190,6 +197,10 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
190197
token=token,
191198
)
192199

200+
self._live_tokens[(lock_name, lock_key)] = lock
201+
202+
return lock
203+
193204
async def _is_lock_still_valid(
194205
self, lock_name: str, lock_key: str, token: str
195206
) -> bool:

0 commit comments

Comments
 (0)