Skip to content
110 changes: 43 additions & 67 deletions s3_storage_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,14 @@
import botocore
from botocore.config import Config

from twisted.internet import defer, reactor, threads
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.python.threadpool import ThreadPool

from synapse.logging.context import LoggingContext, make_deferred_yieldable
from synapse.logging.context import make_deferred_yieldable
from synapse.module_api import ModuleApi
from synapse.rest.media.v1._base import Responder
from synapse.rest.media.v1.storage_provider import StorageProvider

# Synapse 1.13.0 moved current_context to a module-level function.
try:
from synapse.logging.context import current_context
except ImportError:
current_context = LoggingContext.current_context

logger = logging.getLogger("synapse.s3")


Expand All @@ -61,6 +55,7 @@ class S3StorageProviderBackend(StorageProvider):
"""

def __init__(self, hs, config):
self._module_api: ModuleApi = hs.get_module_api()
self.cache_directory = hs.config.media.media_store_path
self.bucket = config["bucket"]
self.prefix = config["prefix"]
Expand Down Expand Up @@ -94,15 +89,6 @@ def __init__(self, hs, config):
self._s3_client_lock = threading.Lock()

threadpool_size = config.get("threadpool_size", 40)
self._s3_pool = ThreadPool(name="s3-pool", maxthreads=threadpool_size)
self._s3_pool.start()

# Manually stop the thread pool on shutdown. If we don't do this then
# stopping Synapse takes an extra ~30s as Python waits for the threads
# to exit.
reactor.addSystemEventTrigger(
"during", "shutdown", self._s3_pool.stop,
)

def _get_s3_client(self):
# this method is designed to be thread-safe, so that we can share a
Expand All @@ -124,37 +110,31 @@ def _get_s3_client(self):
self._s3_client = s3 = b3_session.client("s3", **self.api_kwargs)
return s3

def store_file(self, path, file_info):
async def store_file(self, path, file_info):
"""See StorageProvider.store_file"""

parent_logcontext = current_context()

def _store_file():
with LoggingContext(parent_context=parent_logcontext):
self._get_s3_client().upload_file(
Filename=os.path.join(self.cache_directory, path),
Bucket=self.bucket,
Key=self.prefix + path,
ExtraArgs=self.extra_args,
)

return make_deferred_yieldable(
threads.deferToThreadPool(reactor, self._s3_pool, _store_file)
return await self._module_api.defer_to_thread(
self._get_s3_client().upload_file,
Filename=os.path.join(self.cache_directory, path),
Bucket=self.bucket,
Key=self.prefix + path,
ExtraArgs=self.extra_args,
)

def fetch(self, path, file_info):
async def fetch(self, path, file_info):
"""See StorageProvider.fetch"""
logcontext = current_context()

d = defer.Deferred()

def _get_file():
s3_download_task(
self._get_s3_client(), self.bucket, self.prefix + path, self.extra_args, d, logcontext
)
await self._module_api.defer_to_thread(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module previously used its own threadpool (self._s3_pool), and even had an option to configure the size (threadpool_size, default 40). In previous issues, we've suggested users configure boto3's connection count to match the configured threadpool size: #117

The default threadpool size in Twisted is 10. I worry that switching to Synapse's default threadpool will hurt the performance of this module, with no way for users to configure it.

-- @anoadragon453, #134 (comment)

Can we recommend people increase the size of the the default Twisted threadpool?

Is there any difference in having a separate threadpool?

Copy link

@MadLittleMods MadLittleMods Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in #synapse-dev:matrix.org,

Here is why a separate threadpool is important:

I'd be worried about s3 monopolising the thread pool and blocking other operations (like DNS lookups)

-- @richvdh

As @anoadragon453 points out, DNS lookups have their own threadpool already but the point still stands generally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we recommend people increase the size of the the default Twisted threadpool?

We don't currently have such an option in Synapse, but it would be better than nothing.

Perhaps we should try deploying the module with the default threadpool on matrix.org and see if performance suffers? I'm just worried that if we go ahead with not adding any way to configure the threadpool size, yet require people to upgrade this module to use the latest Synapse, they could be stuck between a rock and a hard place.

Otherwise, I think:

Perhaps we should additionally expose defer_to_threadpool to modules, allowing modules to specify a threadpool? Though this would break compatibility with older versions of Synapse. We could check if the method existed on ModuleApi and use the default threadpool if not.

May be the way to go.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we recommend people increase the size of the the default Twisted threadpool?

The size of the default Twisted threadpool can only be increased through code, i.e.:

we don't provide a configuration option which tweaks this currently, so users are unable to increase the size of the default threadpool.


Above I suggested exposing X to modules. Another alternative is to add an argument to the already-exposed defer_to_thread ModuleApi method to allow specifying a threadpool, defaulting to the default Twisted threadpool if not provided. It would then use defer_to_threadpool under the hood instead of defer_to_thread.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds workable 👍

Copy link
Member Author

@anoadragon453 anoadragon453 Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried the latter approach in element-hq/synapse#19032.

s3_download_task,
self._get_s3_client(),
self.bucket,
self.prefix + path,
self.extra_args,
d,
)

self._s3_pool.callInThread(_get_file)
return make_deferred_yieldable(d)
return await make_deferred_yieldable(d)

@staticmethod
def parse_config(config):
Expand Down Expand Up @@ -202,7 +182,7 @@ def parse_config(config):
return result


def s3_download_task(s3_client, bucket, key, extra_args, deferred, parent_logcontext):
def s3_download_task(s3_client, bucket, key, extra_args, deferred):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only real changes here are:

  1. Remove parent_logcontext.
  2. Removing with LoggingContext ... and de-indenting all of the code underneath it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these changes make sense. s3_download_task will use whatever the caller logcontext is.

And I think we maintain the logcontext when calling s3_download_task ✅ (at-least with the suggested patterns).

"""Attempts to download a file from S3.

Args:
Expand All @@ -212,35 +192,31 @@ def s3_download_task(s3_client, bucket, key, extra_args, deferred, parent_logcon
deferred (Deferred[_S3Responder|None]): If file exists
resolved with an _S3Responder instance, if it doesn't
exist then resolves with None.
parent_logcontext (LoggingContext): the logcontext to report logs and metrics
against.
"""
with LoggingContext(parent_context=parent_logcontext):
logger.info("Fetching %s from S3", key)

try:
if "SSECustomerKey" in extra_args and "SSECustomerAlgorithm" in extra_args:
resp = s3_client.get_object(
Bucket=bucket,
Key=key,
SSECustomerKey=extra_args["SSECustomerKey"],
SSECustomerAlgorithm=extra_args["SSECustomerAlgorithm"],
)
else:
resp = s3_client.get_object(Bucket=bucket, Key=key)

except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] in ("404", "NoSuchKey",):
logger.info("Media %s not found in S3", key)
reactor.callFromThread(deferred.callback, None)
return
logger.info("Fetching %s from S3", key)

try:
if "SSECustomerKey" in extra_args and "SSECustomerAlgorithm" in extra_args:
resp = s3_client.get_object(
Bucket=bucket,
Key=key,
SSECustomerKey=extra_args["SSECustomerKey"],
SSECustomerAlgorithm=extra_args["SSECustomerAlgorithm"],
)
else:
resp = s3_client.get_object(Bucket=bucket, Key=key)

reactor.callFromThread(deferred.errback, Failure())
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] in ("404", "NoSuchKey",):
logger.info("Media %s not found in S3", key)
return

producer = _S3Responder()
reactor.callFromThread(deferred.callback, producer)
_stream_to_producer(reactor, producer, resp["Body"], timeout=90.0)
reactor.callFromThread(deferred.errback, Failure())
return

producer = _S3Responder()
reactor.callFromThread(deferred.callback, producer)
_stream_to_producer(reactor, producer, resp["Body"], timeout=90.0)


def _stream_to_producer(reactor, producer, body, status=None, timeout=None):
Expand Down