diff --git a/changelog.d/19036.misc b/changelog.d/19036.misc index 95b8daab9b0..8790cc2342b 100644 --- a/changelog.d/19036.misc +++ b/changelog.d/19036.misc @@ -1 +1 @@ -Move `start_doing_background_updates()` to `SynapseHomeServer.start_background_tasks()`. +Move `start_doing_background_updates()` to `HomeServer.start_background_tasks()`. diff --git a/changelog.d/19057.misc b/changelog.d/19057.misc new file mode 100644 index 00000000000..8790cc2342b --- /dev/null +++ b/changelog.d/19057.misc @@ -0,0 +1 @@ +Move `start_doing_background_updates()` to `HomeServer.start_background_tasks()`. diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b9ac86c2fc3..23119c5cc53 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -317,11 +317,6 @@ def start_listening(self) -> None: # during parsing logger.warning("Unrecognized listener type: %s", listener.type) - def start_background_tasks(self) -> None: - super().start_background_tasks() - - self.get_datastores().main.db_pool.updates.start_doing_background_updates() - def load_or_generate_config(argv_options: List[str]) -> HomeServerConfig: """ diff --git a/synapse/server.py b/synapse/server.py index b63a11273a7..35b878d4b11 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -646,6 +646,7 @@ def start_background_tasks(self) -> None: self.get_task_scheduler() self.get_common_usage_metrics_manager().setup() start_phone_stats_home(self) + self.get_datastores().main.db_pool.updates.start_doing_background_updates() def get_reactor(self) -> ISynapseReactor: """ diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 8dd6f12feb3..91174769cd3 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -53,8 +53,8 @@ class TaskScheduler: """ This is a simple task scheduler designed for resumable tasks. Normally, - you'd use `run_in_background` to start a background task or Twisted's - `deferLater` if you want to run it later. + you'd use `run_in_background` to start a background task or `clock.call_later` + if you want to run it later. The issue is that these tasks stop completely and won't resume if Synapse is shut down for any reason. diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 30b2de26e4a..2885b068883 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -27,7 +27,6 @@ from parameterized import parameterized -from twisted.internet.task import deferLater from twisted.internet.testing import MemoryReactor import synapse.rest.admin @@ -861,7 +860,7 @@ def test_delete_same_room_twice(self) -> None: # Mock PaginationHandler.purge_room to sleep for 100s, so we have time to do a second call # before the purge is over. Note that it doesn't purge anymore, but we don't care. async def purge_room(room_id: str, force: bool) -> None: - await deferLater(self.hs.get_reactor(), 100, lambda: None) + await self.hs.get_clock().sleep(100) self.pagination_handler.purge_room = AsyncMock(side_effect=purge_room) # type: ignore[method-assign] diff --git a/tests/unittest.py b/tests/unittest.py index 9ab052e7c0c..e63a1a6e892 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -73,12 +73,12 @@ from synapse.http.site import SynapseRequest, SynapseSite from synapse.logging.context import ( SENTINEL_CONTEXT, - LoggingContext, current_context, set_current_context, ) from synapse.rest import RegisterServletsFunc from synapse.server import HomeServer +from synapse.storage.background_updates import UpdaterStatus from synapse.storage.keys import FetchKeyResult from synapse.types import ISynapseReactor, JsonDict, Requester, UserID, create_requester from synapse.util.clock import Clock @@ -108,6 +108,11 @@ R = TypeVar("R") S = TypeVar("S") +BACKGROUND_UPDATE_TIMEOUT_SECONDS = 5 +""" +We expect this to be pretty immediate as we're working with an empty database. +""" + class _TypedFailure(Generic[_ExcType], Protocol): """Extension to twisted.Failure, where the 'value' has a certain type.""" @@ -490,7 +495,11 @@ def wait_on_thread(self, deferred: Deferred, timeout: int = 10) -> None: def wait_for_background_updates(self) -> None: """Block until all background database updates have completed.""" - store = self.hs.get_datastores().main + self._wait_for_background_updates(self.hs) + + def _wait_for_background_updates(self, hs: HomeServer) -> None: + """Block until all background database updates have completed.""" + store = hs.get_datastores().main while not self.get_success( store.db_pool.updates.has_completed_background_updates() ): @@ -677,10 +686,6 @@ def setup_test_homeserver( # construct a homeserver with a matching name. server_name = config_obj.server.server_name - async def run_bg_updates() -> None: - with LoggingContext(name="run_bg_updates", server_name=server_name): - self.get_success(stor.db_pool.updates.run_background_updates(False)) - hs = setup_test_homeserver( cleanup_func=self.addCleanup, server_name=server_name, @@ -689,11 +694,48 @@ async def run_bg_updates() -> None: clock=clock, **extra_homeserver_attributes, ) - stor = hs.get_datastores().main - # Run the database background updates, when running against "master". - if hs.__class__.__name__ == "TestHomeServer": - self.get_success(run_bg_updates()) + # Wait for the database background updates to complete. This is important + # because tests assume that the database is using the latest schema. + # + # We could use `self._wait_for_background_updates(hs)` to accomplish the same + # thing but we don't want to start or drive the background updates here. We want + # to ensure the homeserver itself is doing that. + start_time_s = time.time() + store = hs.get_datastores().main + while not self.get_success( + # This check is slightly naive. It only checks if there is anything left in + # the `background_updates` database table so it is possible that the + # homeserver mistakenly never registered any background updates to be run. + # Since `register_background_xxx(...)` is done across the codebase, we can't + # really assert that everything was registered as expected. + store.db_pool.updates.has_completed_background_updates() + ): + # Timeout if it takes too long. This should be pretty immediate as we're + # working with an empty database. + current_time_s = time.time() + if current_time_s - start_time_s > BACKGROUND_UPDATE_TIMEOUT_SECONDS: + background_update_status = store.db_pool.updates.get_status() + + # Add some better context when we give up + extra_message = "" + if background_update_status == UpdaterStatus.NOT_STARTED: + extra_message = ( + "Did you forget to `start_doing_background_updates()`?" + ) + elif background_update_status == UpdaterStatus.RUNNING_UPDATE: + extra_message = "Background updates were still running when we gave up. Are they stuck?" + else: + extra_message = ( + f"Background update status was {background_update_status}." + ) + + raise AssertionError( + f"Timed out waiting for background updates to complete ({BACKGROUND_UPDATE_TIMEOUT_SECONDS}s). " + + extra_message + ) + + self.pump(by=0.1) return hs diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py index e97f0ed6111..bebe51a12ba 100644 --- a/tests/util/test_task_scheduler.py +++ b/tests/util/test_task_scheduler.py @@ -20,9 +20,10 @@ # from typing import List, Optional, Tuple -from twisted.internet.task import deferLater +from twisted.internet.defer import Deferred from twisted.internet.testing import MemoryReactor +from synapse.logging.context import make_deferred_yieldable from synapse.server import HomeServer from synapse.types import JsonMapping, ScheduledTask, TaskStatus from synapse.util.clock import Clock @@ -87,7 +88,7 @@ async def _sleeping_task( self, task: ScheduledTask ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: # Sleep for a second - await deferLater(self.reactor, 1, lambda: None) + await self.hs.get_clock().sleep(1) return TaskStatus.COMPLETE, None, None def test_schedule_lot_of_tasks(self) -> None: @@ -170,8 +171,10 @@ async def _resumable_task( return TaskStatus.COMPLETE, {"success": True}, None else: await self.task_scheduler.update_task(task.id, result={"in_progress": True}) + # Create a deferred which we will never complete + incomplete_d: Deferred = Deferred() # Await forever to simulate an aborted task because of a restart - await deferLater(self.reactor, 2**16, lambda: None) + await make_deferred_yieldable(incomplete_d) # This should never been called return TaskStatus.ACTIVE, None, None