-
Notifications
You must be signed in to change notification settings - Fork 424
Implement synapse issue #16751: Treat local_media_directory as optional storage provider #19204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 5 commits
2e78aa8
7cc0aeb
8eed314
b6ffef6
ccc047d
5d272af
646a051
94c4358
f63c91a
99eb251
af958d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Made the local media directory optional by treating it as a storage provider. This allows off-site media storage without local cache, where media is stored directly to remote providers only, with temporary files used for thumbnail generation when needed. Contributed by Patrice Brend'amour @dr.allgood. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import logging | ||
| import os | ||
| import shutil | ||
| import tempfile | ||
| from contextlib import closing | ||
| from io import BytesIO | ||
| from types import TracebackType | ||
|
|
@@ -49,16 +50,16 @@ | |
| from synapse.api.errors import NotFoundError | ||
| from synapse.logging.context import defer_to_thread, run_in_background | ||
| from synapse.logging.opentracing import start_active_span, trace, trace_with_opname | ||
| from synapse.media._base import ThreadedFileSender | ||
| from synapse.media.storage_provider import FileStorageProviderBackend | ||
| from synapse.util.clock import Clock | ||
| from synapse.util.file_consumer import BackgroundFileConsumer | ||
|
|
||
| from ..types import JsonDict | ||
| from ._base import FileInfo, Responder | ||
| from ._base import FileInfo, Responder, ThreadedFileSender | ||
| from .filepath import MediaFilePaths | ||
|
|
||
| if TYPE_CHECKING: | ||
| from synapse.media.storage_provider import StorageProvider | ||
| from synapse.media.storage_provider import StorageProviderWrapper | ||
| from synapse.server import HomeServer | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -149,27 +150,31 @@ def __getattr__(self, attr_name: str) -> Any: | |
|
|
||
|
|
||
| class MediaStorage: | ||
| """Responsible for storing/fetching files from local sources. | ||
| """Responsible for storing/fetching files from storage providers. | ||
|
|
||
| Args: | ||
| hs | ||
| local_media_directory: Base path where we store media on disk | ||
| filepaths | ||
| storage_providers: List of StorageProvider that are used to fetch and store files. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| hs: "HomeServer", | ||
| local_media_directory: str, | ||
| filepaths: MediaFilePaths, | ||
| storage_providers: Sequence["StorageProvider"], | ||
| storage_providers: Sequence["StorageProviderWrapper"], | ||
drallgood marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ): | ||
| self.hs = hs | ||
| self.reactor = hs.get_reactor() | ||
| self.local_media_directory = local_media_directory | ||
| self.filepaths = filepaths | ||
| self.storage_providers = storage_providers | ||
| self.storage_providers = list(storage_providers) | ||
drallgood marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.local_provider = None | ||
| self.local_media_directory = None | ||
| for provider in self.storage_providers: | ||
| if isinstance(provider.backend, FileStorageProviderBackend): | ||
| self.local_provider = provider | ||
| self.local_media_directory = provider.backend.base_directory | ||
| break | ||
drallgood marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker | ||
| self.clock = hs.get_clock() | ||
|
|
||
|
|
@@ -221,53 +226,94 @@ async def store_into_file( | |
| """ | ||
|
|
||
| path = self._file_info_to_path(file_info) | ||
| fname = os.path.join(self.local_media_directory, path) | ||
|
|
||
| dirname = os.path.dirname(fname) | ||
| os.makedirs(dirname, exist_ok=True) | ||
|
|
||
| try: | ||
| with start_active_span("writing to main media repo"): | ||
| with open(fname, "wb") as f: | ||
| yield f, fname | ||
| if self.local_provider: | ||
| fname = os.path.join(self.local_media_directory, path) # type: ignore[arg-type] | ||
| dirname = os.path.dirname(fname) | ||
drallgood marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| os.makedirs(dirname, exist_ok=True) | ||
|
|
||
| with start_active_span("writing to other storage providers"): | ||
| spam_check = ( | ||
| await self._spam_checker_module_callbacks.check_media_file_for_spam( | ||
| try: | ||
| with start_active_span("writing to main media repo"): | ||
| with open(fname, "wb") as f: | ||
| yield f, fname | ||
|
|
||
| with start_active_span( | ||
| "spam checking and writing to other storage providers" | ||
| ): | ||
| spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam( | ||
| ReadableFileWrapper(self.clock, fname), file_info | ||
| ) | ||
| ) | ||
| if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: | ||
| logger.info("Blocking media due to spam checker") | ||
| # Note that we'll delete the stored media, due to the | ||
| # try/except below. The media also won't be stored in | ||
| # the DB. | ||
| # We currently ignore any additional field returned by | ||
| # the spam-check API. | ||
| raise SpamMediaException(errcode=spam_check[0]) | ||
|
|
||
| for provider in self.storage_providers: | ||
| with start_active_span(str(provider)): | ||
| await provider.store_file(path, file_info) | ||
|
|
||
| except Exception as e: | ||
| if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: | ||
| logger.info("Blocking media due to spam checker") | ||
| # Note that we'll delete the stored media, due to the | ||
| # try/except below. The media also won't be stored in | ||
| # the DB. | ||
| # We currently ignore any additional field returned by | ||
| # the spam-check API. | ||
| raise SpamMediaException(errcode=spam_check[0]) | ||
|
|
||
| for provider in self.storage_providers: | ||
| if provider is not self.local_provider: | ||
| with start_active_span(str(provider)): | ||
| await provider.store_file(path, file_info) | ||
|
|
||
| except Exception as e: | ||
| try: | ||
| os.remove(fname) | ||
| except Exception: | ||
| pass | ||
|
|
||
| raise e from None | ||
| else: | ||
| # No local provider, write to temp file | ||
| with tempfile.NamedTemporaryFile(delete=False) as f: | ||
| fname = f.name | ||
| yield cast(BinaryIO, f), fname | ||
|
|
||
| try: | ||
| os.remove(fname) | ||
| except Exception: | ||
| pass | ||
| with start_active_span( | ||
| "spam checking and writing to storage providers" | ||
| ): | ||
| spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam( | ||
| ReadableFileWrapper(self.clock, fname), file_info | ||
| ) | ||
| if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: | ||
| logger.info("Blocking media due to spam checker") | ||
| raise SpamMediaException(errcode=spam_check[0]) | ||
|
|
||
| for provider in self.storage_providers: | ||
| with start_active_span(str(provider)): | ||
| await provider.store_file(path, file_info) | ||
|
|
||
| raise e from None | ||
| except Exception as e: | ||
| try: | ||
| os.remove(fname) | ||
| except Exception: | ||
| pass | ||
|
|
||
| raise e from None | ||
|
|
||
| async def fetch_media(self, file_info: FileInfo) -> Responder | None: | ||
| """Attempts to fetch media described by file_info from the local cache | ||
| and configured storage providers. | ||
| """Attempts to fetch media described by file_info from the configured storage providers. | ||
|
|
||
| Args: | ||
| file_info: Metadata about the media file | ||
|
|
||
| Returns: | ||
| Returns a Responder if the file was found, otherwise None. | ||
| """ | ||
| # URL cache files are stored locally and should not go through storage providers | ||
| if file_info.url_cache: | ||
| path = self._file_info_to_path(file_info) | ||
| if self.local_provider: | ||
| local_path = os.path.join(self.local_media_directory, path) # type: ignore[arg-type] | ||
| if os.path.isfile(local_path): | ||
| # Import here to avoid circular import | ||
| from .media_storage import FileResponder | ||
|
|
||
| return FileResponder(self.hs, open(local_path, "rb")) | ||
| return None | ||
|
|
||
| paths = [self._file_info_to_path(file_info)] | ||
|
|
||
| # fallback for remote thumbnails with no method in the filename | ||
|
|
@@ -282,13 +328,6 @@ async def fetch_media(self, file_info: FileInfo) -> Responder | None: | |
| ) | ||
| ) | ||
|
|
||
| for path in paths: | ||
| local_path = os.path.join(self.local_media_directory, path) | ||
| if os.path.exists(local_path): | ||
| logger.debug("responding with local file %s", local_path) | ||
| return FileResponder(self.hs, open(local_path, "rb")) | ||
| logger.debug("local file %s did not exist", local_path) | ||
|
|
||
| for provider in self.storage_providers: | ||
| for path in paths: | ||
| res: Any = await provider.fetch(path, file_info) | ||
|
|
@@ -311,39 +350,61 @@ async def ensure_media_is_in_local_cache(self, file_info: FileInfo) -> str: | |
| Full path to local file | ||
| """ | ||
| path = self._file_info_to_path(file_info) | ||
| local_path = os.path.join(self.local_media_directory, path) | ||
| if os.path.exists(local_path): | ||
| return local_path | ||
|
|
||
| # Fallback for paths without method names | ||
| # Should be removed in the future | ||
| if file_info.thumbnail and file_info.server_name: | ||
| legacy_path = self.filepaths.remote_media_thumbnail_rel_legacy( | ||
| server_name=file_info.server_name, | ||
| file_id=file_info.file_id, | ||
| width=file_info.thumbnail.width, | ||
| height=file_info.thumbnail.height, | ||
| content_type=file_info.thumbnail.type, | ||
| ) | ||
| legacy_local_path = os.path.join(self.local_media_directory, legacy_path) | ||
| if os.path.exists(legacy_local_path): | ||
| return legacy_local_path | ||
|
|
||
| dirname = os.path.dirname(local_path) | ||
| os.makedirs(dirname, exist_ok=True) | ||
|
|
||
| for provider in self.storage_providers: | ||
| res: Any = await provider.fetch(path, file_info) | ||
| if res: | ||
| with res: | ||
| consumer = BackgroundFileConsumer( | ||
| open(local_path, "wb"), self.reactor | ||
| ) | ||
| await res.write_to_consumer(consumer) | ||
| await consumer.wait() | ||
| if self.local_provider: | ||
| local_path = os.path.join(self.local_media_directory, path) # type: ignore[arg-type] | ||
| if os.path.exists(local_path): | ||
| return local_path | ||
|
|
||
| raise NotFoundError() | ||
| # Fallback for paths without method names | ||
| # Should be removed in the future | ||
| if file_info.thumbnail and file_info.server_name: | ||
| legacy_path = self.filepaths.remote_media_thumbnail_rel_legacy( | ||
| server_name=file_info.server_name, | ||
| file_id=file_info.file_id, | ||
| width=file_info.thumbnail.width, | ||
| height=file_info.thumbnail.height, | ||
| content_type=file_info.thumbnail.type, | ||
| ) | ||
| legacy_local_path = os.path.join( | ||
| self.local_media_directory, # type: ignore[arg-type] | ||
| legacy_path, | ||
| ) | ||
| if os.path.exists(legacy_local_path): | ||
| return legacy_local_path | ||
|
|
||
| dirname = os.path.dirname(local_path) | ||
drallgood marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| os.makedirs(dirname, exist_ok=True) | ||
|
|
||
| for provider in self.storage_providers: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we could make use of the
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not really. They have a different purpose. The ensure_media_is_in_local_cache is used to ensure a local copy so a thumbnail can be generated. While the fetch_media returns a Responder (to stream the file). |
||
| if provider is self.local_provider: | ||
| continue | ||
| remote_res: Any = await provider.fetch(path, file_info) | ||
| if remote_res: | ||
| with remote_res: | ||
| consumer = BackgroundFileConsumer( | ||
| open(local_path, "wb"), self.reactor | ||
| ) | ||
| await remote_res.write_to_consumer(consumer) | ||
| await consumer.wait() | ||
| return local_path | ||
|
|
||
| raise NotFoundError() | ||
| else: | ||
| # No local provider, download to temp | ||
| for provider in self.storage_providers: | ||
| res: Any = await provider.fetch(path, file_info) | ||
| if res: | ||
| temp_dir = tempfile.gettempdir() | ||
| temp_path = os.path.join(temp_dir, os.path.basename(path)) | ||
| with res: | ||
| consumer = BackgroundFileConsumer( | ||
| open(temp_path, "wb"), self.reactor | ||
| ) | ||
| await res.write_to_consumer(consumer) | ||
| await consumer.wait() | ||
| return temp_path | ||
|
|
||
| raise NotFoundError() | ||
|
|
||
| @trace | ||
| def _file_info_to_path(self, file_info: FileInfo) -> str: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.