Skip to content

Commit d285d76

Browse files
authored
Recover an appservice if a successful ping occurs. (element-hq#18521)
Fixes element-hq#14240 This scratches an itch that i've had for years. We regularly run into the issue where (especially in development) appservices can go down for a period and them come back up. The ping endpoint was introduced some time ago which means Synapse can determine if an AS is up more or less immediately, so we might as well use that to schedule transaction redelivery. I believe transaction scheduling logic is largely implementation specific, so we should be in the clear to do this without any spec changes.
1 parent 919c362 commit d285d76

File tree

4 files changed

+77
-4
lines changed

4 files changed

+77
-4
lines changed

changelog.d/18521.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Successful requests to `/_matrix/app/v1/ping` will now force Synapse to reattempt delivering transactions to appservices.

synapse/appservice/scheduler.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# This file is licensed under the Affero General Public License (AGPL) version 3.
33
#
44
# Copyright 2015, 2016 OpenMarket Ltd
5-
# Copyright (C) 2023 New Vector, Ltd
5+
# Copyright (C) 2023, 2025 New Vector, Ltd
66
#
77
# This program is free software: you can redistribute it and/or modify
88
# it under the terms of the GNU Affero General Public License as
@@ -70,6 +70,8 @@
7070
Tuple,
7171
)
7272

73+
from twisted.internet.interfaces import IDelayedCall
74+
7375
from synapse.appservice import (
7476
ApplicationService,
7577
ApplicationServiceState,
@@ -450,6 +452,20 @@ def start_recoverer(self, service: ApplicationService) -> None:
450452
recoverer.recover()
451453
logger.info("Now %i active recoverers", len(self.recoverers))
452454

455+
def force_retry(self, service: ApplicationService) -> None:
456+
"""Forces a Recoverer to attempt delivery of transations immediately.
457+
458+
Args:
459+
service:
460+
"""
461+
recoverer = self.recoverers.get(service.id)
462+
if not recoverer:
463+
# No need to force a retry on a happy AS.
464+
logger.info(f"{service.id} is not in recovery, not forcing retry")
465+
return
466+
467+
recoverer.force_retry()
468+
453469
async def _is_service_up(self, service: ApplicationService) -> bool:
454470
state = await self.store.get_appservice_state(service)
455471
return state == ApplicationServiceState.UP or state is None
@@ -482,11 +498,12 @@ def __init__(
482498
self.service = service
483499
self.callback = callback
484500
self.backoff_counter = 1
501+
self.scheduled_recovery: Optional[IDelayedCall] = None
485502

486503
def recover(self) -> None:
487504
delay = 2**self.backoff_counter
488505
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
489-
self.clock.call_later(
506+
self.scheduled_recovery = self.clock.call_later(
490507
delay, run_as_background_process, "as-recoverer", self.retry
491508
)
492509

@@ -496,6 +513,21 @@ def _backoff(self) -> None:
496513
self.backoff_counter += 1
497514
self.recover()
498515

516+
def force_retry(self) -> None:
517+
"""Cancels the existing timer and forces an immediate retry in the background.
518+
519+
Args:
520+
service:
521+
"""
522+
# Prevent the existing backoff from occuring
523+
if self.scheduled_recovery:
524+
self.clock.cancel_call_later(self.scheduled_recovery)
525+
# Run a retry, which will resechedule a recovery if it fails.
526+
run_as_background_process(
527+
"retry",
528+
self.retry,
529+
)
530+
499531
async def retry(self) -> None:
500532
logger.info("Starting retries on %s", self.service.id)
501533
try:

synapse/rest/client/appservice_ping.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# This file is licensed under the Affero General Public License (AGPL) version 3.
33
#
44
# Copyright 2023 Tulir Asokan
5-
# Copyright (C) 2023 New Vector, Ltd
5+
# Copyright (C) 2023, 2025 New Vector, Ltd
66
#
77
# This program is free software: you can redistribute it and/or modify
88
# it under the terms of the GNU Affero General Public License as
@@ -53,6 +53,7 @@ class AppservicePingRestServlet(RestServlet):
5353
def __init__(self, hs: "HomeServer"):
5454
super().__init__()
5555
self.as_api = hs.get_application_service_api()
56+
self.scheduler = hs.get_application_service_scheduler()
5657
self.auth = hs.get_auth()
5758

5859
async def on_POST(
@@ -85,6 +86,10 @@ async def on_POST(
8586
start = time.monotonic()
8687
try:
8788
await self.as_api.ping(requester.app_service, txn_id)
89+
90+
# We got a OK response, so if the AS needs to be recovered then lets recover it now.
91+
# This sets off a task in the background and so is safe to execute and forget.
92+
self.scheduler.txn_ctrl.force_retry(requester.app_service)
8893
except RequestTimedOutError as e:
8994
raise SynapseError(
9095
HTTPStatus.GATEWAY_TIMEOUT,

tests/appservice/test_scheduler.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# This file is licensed under the Affero General Public License (AGPL) version 3.
33
#
44
# Copyright 2015, 2016 OpenMarket Ltd
5-
# Copyright (C) 2023 New Vector, Ltd
5+
# Copyright (C) 2023, 2025 New Vector, Ltd
66
#
77
# This program is free software: you can redistribute it and/or modify
88
# it under the terms of the GNU Affero General Public License as
@@ -234,6 +234,41 @@ def take_txn(
234234
self.assertEqual(1, txn.complete.call_count)
235235
self.callback.assert_called_once_with(self.recoverer)
236236

237+
def test_recover_force_retry(self) -> None:
238+
txn = Mock()
239+
txns = [txn, None]
240+
pop_txn = False
241+
242+
def take_txn(
243+
*args: object, **kwargs: object
244+
) -> "defer.Deferred[Optional[Mock]]":
245+
if pop_txn:
246+
return defer.succeed(txns.pop(0))
247+
else:
248+
return defer.succeed(txn)
249+
250+
self.store.get_oldest_unsent_txn = Mock(side_effect=take_txn)
251+
252+
# Start the recovery, and then fail the first attempt.
253+
self.recoverer.recover()
254+
self.assertEqual(0, self.store.get_oldest_unsent_txn.call_count)
255+
txn.send = AsyncMock(return_value=False)
256+
txn.complete = AsyncMock(return_value=None)
257+
self.clock.advance_time(2)
258+
self.assertEqual(1, txn.send.call_count)
259+
self.assertEqual(0, txn.complete.call_count)
260+
self.assertEqual(0, self.callback.call_count)
261+
262+
# Now allow the send to succeed, and force a retry.
263+
pop_txn = True # returns the txn the first time, then no more.
264+
txn.send = AsyncMock(return_value=True) # successfully send the txn
265+
self.recoverer.force_retry()
266+
self.assertEqual(1, txn.send.call_count) # new mock reset call count
267+
self.assertEqual(1, txn.complete.call_count)
268+
269+
# Ensure we call the callback to say we're done!
270+
self.callback.assert_called_once_with(self.recoverer)
271+
237272

238273
# Corresponds to synapse.appservice.scheduler._TransactionController.send
239274
TxnCtrlArgs: TypeAlias = """

0 commit comments

Comments
 (0)