From 5bdb5d930eed0bcd441235c461898519626914f7 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 1 Oct 2025 17:00:30 +0100 Subject: [PATCH 1/7] Replace manual LoggingContext usage with `ModuleApi.run_in_background` Attempt to replace manual usage of LoggingContext with the provided module API's `run_in_background` method. --- s3_storage_provider.py | 85 +++++++++++++++++------------------------- 1 file changed, 35 insertions(+), 50 deletions(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index e3ce7f4..2413148 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -27,16 +27,10 @@ from twisted.python.failure import Failure from twisted.python.threadpool import ThreadPool -from synapse.logging.context import LoggingContext, make_deferred_yieldable +from synapse.module_api import run_in_background from synapse.rest.media.v1._base import Responder from synapse.rest.media.v1.storage_provider import StorageProvider -# Synapse 1.13.0 moved current_context to a module-level function. -try: - from synapse.logging.context import current_context -except ImportError: - current_context = LoggingContext.current_context - logger = logging.getLogger("synapse.s3") @@ -121,34 +115,29 @@ def _get_s3_client(self): def store_file(self, path, file_info): """See StorageProvider.store_file""" - parent_logcontext = current_context() - def _store_file(): - with LoggingContext(parent_context=parent_logcontext): - self._get_s3_client().upload_file( - Filename=os.path.join(self.cache_directory, path), - Bucket=self.bucket, - Key=self.prefix + path, - ExtraArgs=self.extra_args, - ) - - return make_deferred_yieldable( - threads.deferToThreadPool(reactor, self._s3_pool, _store_file) + self._get_s3_client().upload_file( + Filename=os.path.join(self.cache_directory, path), + Bucket=self.bucket, + Key=self.prefix + path, + ExtraArgs=self.extra_args, + ) + + return run_in_background( + threads.deferToThreadPool, reactor, self._s3_pool, _store_file ) def fetch(self, path, file_info): """See StorageProvider.fetch""" - logcontext = current_context() - d = defer.Deferred() def _get_file(): s3_download_task( - self._get_s3_client(), self.bucket, self.prefix + path, self.extra_args, d, logcontext + self._get_s3_client(), self.bucket, self.prefix + path, self.extra_args, d ) - self._s3_pool.callInThread(_get_file) - return make_deferred_yieldable(d) + run_in_background(self._s3_pool.callInThread, _get_file) + return d @staticmethod def parse_config(config): @@ -196,7 +185,7 @@ def parse_config(config): return result -def s3_download_task(s3_client, bucket, key, extra_args, deferred, parent_logcontext): +def s3_download_task(s3_client, bucket, key, extra_args, deferred): """Attempts to download a file from S3. Args: @@ -206,35 +195,31 @@ def s3_download_task(s3_client, bucket, key, extra_args, deferred, parent_logcon deferred (Deferred[_S3Responder|None]): If file exists resolved with an _S3Responder instance, if it doesn't exist then resolves with None. - parent_logcontext (LoggingContext): the logcontext to report logs and metrics - against. """ - with LoggingContext(parent_context=parent_logcontext): - logger.info("Fetching %s from S3", key) - - try: - if "SSECustomerKey" in extra_args and "SSECustomerAlgorithm" in extra_args: - resp = s3_client.get_object( - Bucket=bucket, - Key=key, - SSECustomerKey=extra_args["SSECustomerKey"], - SSECustomerAlgorithm=extra_args["SSECustomerAlgorithm"], - ) - else: - resp = s3_client.get_object(Bucket=bucket, Key=key) - - except botocore.exceptions.ClientError as e: - if e.response["Error"]["Code"] in ("404", "NoSuchKey",): - logger.info("Media %s not found in S3", key) - reactor.callFromThread(deferred.callback, None) - return + logger.info("Fetching %s from S3", key) - reactor.callFromThread(deferred.errback, Failure()) + try: + if "SSECustomerKey" in extra_args and "SSECustomerAlgorithm" in extra_args: + resp = s3_client.get_object( + Bucket=bucket, + Key=key, + SSECustomerKey=extra_args["SSECustomerKey"], + SSECustomerAlgorithm=extra_args["SSECustomerAlgorithm"], + ) + else: + resp = s3_client.get_object(Bucket=bucket, Key=key) + + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] in ("404", "NoSuchKey",): + logger.info("Media %s not found in S3", key) return - producer = _S3Responder() - reactor.callFromThread(deferred.callback, producer) - _stream_to_producer(reactor, producer, resp["Body"], timeout=90.0) + reactor.callFromThread(deferred.errback, Failure()) + return + + producer = _S3Responder() + reactor.callFromThread(deferred.callback, producer) + _stream_to_producer(reactor, producer, resp["Body"], timeout=90.0) def _stream_to_producer(reactor, producer, body, status=None, timeout=None): From 190686c44ec602a762b49d570f275a2f657f9b01 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 2 Oct 2025 11:01:46 +0100 Subject: [PATCH 2/7] Source `ModuleApi` from `hs` and use `defer_to_thread` We also make `store_file` and `fetch` async, as they are async in the base class. This also simplifies the implementation. We could go through and convert the whole module from deferreds to async, but that should be done separately. --- s3_storage_provider.py | 51 +++++++++++++++++------------------------- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index 2413148..73d3e66 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -23,11 +23,11 @@ import boto3 import botocore -from twisted.internet import defer, reactor, threads +from twisted.internet import defer, reactor from twisted.python.failure import Failure -from twisted.python.threadpool import ThreadPool -from synapse.module_api import run_in_background +from synapse.logging.context import make_deferred_yieldable +from synapse.module_api import ModuleApi from synapse.rest.media.v1._base import Responder from synapse.rest.media.v1.storage_provider import StorageProvider @@ -54,6 +54,7 @@ class S3StorageProviderBackend(StorageProvider): """ def __init__(self, hs, config): + self._module_api: ModuleApi = hs.get_module_api() self.cache_directory = hs.config.media.media_store_path self.bucket = config["bucket"] self.prefix = config["prefix"] @@ -82,15 +83,6 @@ def __init__(self, hs, config): self._s3_client_lock = threading.Lock() threadpool_size = config.get("threadpool_size", 40) - self._s3_pool = ThreadPool(name="s3-pool", maxthreads=threadpool_size) - self._s3_pool.start() - - # Manually stop the thread pool on shutdown. If we don't do this then - # stopping Synapse takes an extra ~30s as Python waits for the threads - # to exit. - reactor.addSystemEventTrigger( - "during", "shutdown", self._s3_pool.stop, - ) def _get_s3_client(self): # this method is designed to be thread-safe, so that we can share a @@ -112,32 +104,31 @@ def _get_s3_client(self): self._s3_client = s3 = b3_session.client("s3", **self.api_kwargs) return s3 - def store_file(self, path, file_info): + async def store_file(self, path, file_info): """See StorageProvider.store_file""" - def _store_file(): - self._get_s3_client().upload_file( - Filename=os.path.join(self.cache_directory, path), - Bucket=self.bucket, - Key=self.prefix + path, - ExtraArgs=self.extra_args, - ) - - return run_in_background( - threads.deferToThreadPool, reactor, self._s3_pool, _store_file + return await self._module_api.defer_to_thread( + self._get_s3_client().upload_file, + Filename=os.path.join(self.cache_directory, path), + Bucket=self.bucket, + Key=self.prefix + path, + ExtraArgs=self.extra_args, ) - def fetch(self, path, file_info): + async def fetch(self, path, file_info): """See StorageProvider.fetch""" d = defer.Deferred() - def _get_file(): - s3_download_task( - self._get_s3_client(), self.bucket, self.prefix + path, self.extra_args, d - ) + await self._module_api.defer_to_thread( + s3_download_task, + self._get_s3_client(), + self.bucket, + self.prefix + path, + self.extra_args, + d, + ) - run_in_background(self._s3_pool.callInThread, _get_file) - return d + return await make_deferred_yieldable(d) @staticmethod def parse_config(config): From 18b385b8b58c3956f3e4f73d790151de3544f010 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 8 Oct 2025 14:17:19 +0100 Subject: [PATCH 3/7] Use `defer_to_threadpool` As introduced by https://github.com/element-hq/synapse/pull/19032. --- s3_storage_provider.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index 8061c8a..5719b53 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -26,6 +26,7 @@ from twisted.internet import defer, reactor from twisted.python.failure import Failure +from twisted.python.threadpool import ThreadPool from synapse.logging.context import make_deferred_yieldable from synapse.module_api import ModuleApi @@ -89,6 +90,15 @@ def __init__(self, hs, config): self._s3_client_lock = threading.Lock() threadpool_size = config.get("threadpool_size", 40) + self._s3_pool = ThreadPool(name="s3-pool", maxthreads=threadpool_size) + self._s3_pool.start() + + # Manually stop the thread pool on shutdown. If we don't do this then + # stopping Synapse takes an extra ~30s as Python waits for the threads + # to exit. + reactor.addSystemEventTrigger( + "during", "shutdown", self._s3_pool.stop, + ) def _get_s3_client(self): # this method is designed to be thread-safe, so that we can share a @@ -113,7 +123,8 @@ def _get_s3_client(self): async def store_file(self, path, file_info): """See StorageProvider.store_file""" - return await self._module_api.defer_to_thread( + return await self._module_api.defer_to_threadpool( + self._s3_pool, self._get_s3_client().upload_file, Filename=os.path.join(self.cache_directory, path), Bucket=self.bucket, @@ -125,7 +136,8 @@ async def fetch(self, path, file_info): """See StorageProvider.fetch""" d = defer.Deferred() - await self._module_api.defer_to_thread( + await self._module_api.defer_to_threadpool( + self._s3_pool, s3_download_task, self._get_s3_client(), self.bucket, From e2abe39d4de426ee8068847836b49769ddb8c186 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 9 Oct 2025 12:12:32 +0100 Subject: [PATCH 4/7] Don't `await` deferred before passing it back to Synapse Discovered by @erikj: > the thread is waiting for synapse to use the S3Responder, but the responder isn't returned to Synapse until the thread is finished --- s3_storage_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index 5719b53..5a985a3 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -146,7 +146,7 @@ async def fetch(self, path, file_info): d, ) - return await make_deferred_yieldable(d) + return make_deferred_yieldable(d) @staticmethod def parse_config(config): From e9aacbf6705f2d30c6033c4846563d7057c8ea52 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 9 Oct 2025 12:20:32 +0100 Subject: [PATCH 5/7] await on the correct thing > the `d` we pass in to `s3_download_task` gets resolved once we connect to S3, and the thread is concluded only once we finish the download. --- s3_storage_provider.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index 5a985a3..912b7f7 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -136,7 +136,10 @@ async def fetch(self, path, file_info): """See StorageProvider.fetch""" d = defer.Deferred() - await self._module_api.defer_to_threadpool( + # Don't await this directly, as it will resolve only one the streaming + # download from S3 is concluded. Before that happens, we want to pass + # execution back to Synapse to stream the file's chunks. + self._module_api.defer_to_threadpool( self._s3_pool, s3_download_task, self._get_s3_client(), @@ -146,7 +149,10 @@ async def fetch(self, path, file_info): d, ) - return make_deferred_yieldable(d) + # DO await on `d`, as it will resolve once a connection to S3 has been + # opened. We only want to return to Synapse once we can start streaming + # chunks. + return await make_deferred_yieldable(d) @staticmethod def parse_config(config): @@ -204,6 +210,10 @@ def s3_download_task(s3_client, bucket, key, extra_args, deferred): deferred (Deferred[_S3Responder|None]): If file exists resolved with an _S3Responder instance, if it doesn't exist then resolves with None. + + Returns: + A deferred which resolves to an _S3Responder if the file exists. + Otherwise the deferred fails. """ logger.info("Fetching %s from S3", key) From 76431656e493905e6ba8f26d20700a95ba56e9d0 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 9 Oct 2025 12:33:39 +0100 Subject: [PATCH 6/7] Use `run_in_background` on `defer_to_threadpool` --- s3_storage_provider.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index 912b7f7..6ed0bb6 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -136,17 +136,22 @@ async def fetch(self, path, file_info): """See StorageProvider.fetch""" d = defer.Deferred() - # Don't await this directly, as it will resolve only one the streaming + # Don't await this directly, as it will resolve only once the streaming # download from S3 is concluded. Before that happens, we want to pass # execution back to Synapse to stream the file's chunks. - self._module_api.defer_to_threadpool( - self._s3_pool, - s3_download_task, - self._get_s3_client(), - self.bucket, - self.prefix + path, - self.extra_args, - d, + # + # We do, however, need to wrap in `run_in_background` to ensure that + # `s3_download_task` follows the Synapse logcontext rules. + self._module_api.run_in_background( + self._module_api.defer_to_threadpool( + self._s3_pool, + s3_download_task, + self._get_s3_client(), + self.bucket, + self.prefix + path, + self.extra_args, + d, + ) ) # DO await on `d`, as it will resolve once a connection to S3 has been From c8823065783b3c7601cbcb6c3e568c1a6b34ffc9 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 9 Oct 2025 12:39:07 +0100 Subject: [PATCH 7/7] Update documentation to mention dangling coroutines --- s3_storage_provider.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/s3_storage_provider.py b/s3_storage_provider.py index 6ed0bb6..128b810 100644 --- a/s3_storage_provider.py +++ b/s3_storage_provider.py @@ -140,8 +140,9 @@ async def fetch(self, path, file_info): # download from S3 is concluded. Before that happens, we want to pass # execution back to Synapse to stream the file's chunks. # - # We do, however, need to wrap in `run_in_background` to ensure that - # `s3_download_task` follows the Synapse logcontext rules. + # We do, however, need to wrap in `run_in_background` to ensure that the + # coroutine returned by `defer_to_threadpool` is used, and therefore + # actually run. self._module_api.run_in_background( self._module_api.defer_to_threadpool( self._s3_pool,