Skip to content

Commit a1f5a5b

Browse files
committed
Add byte iter memory optimization to async media uploads
1 parent bee00cf commit a1f5a5b

File tree

4 files changed

+54
-23
lines changed

4 files changed

+54
-23
lines changed

mautrix/api.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
from mautrix import __optional_imports__, __version__ as mautrix_version
2424
from mautrix.errors import MatrixConnectionError, MatrixRequestError, make_request_error
25+
from mautrix.util.async_iter_bytes import AsyncBody, async_iter_bytes
2526
from mautrix.util.logging import TraceLogger
2627
from mautrix.util.opt_prometheus import Counter
2728

@@ -155,7 +156,6 @@ def replace(self, find: str, replace: str) -> PathBuilder:
155156
"""
156157

157158
_req_id = 0
158-
AsyncBody = AsyncGenerator[Union[bytes, bytearray, memoryview], None]
159159

160160

161161
def _next_global_req_id() -> int:
@@ -164,12 +164,6 @@ def _next_global_req_id() -> int:
164164
return _req_id
165165

166166

167-
async def _async_iter_bytes(data: bytearray | bytes, chunk_size: int = 1024**2) -> AsyncBody:
168-
with memoryview(data) as mv:
169-
for i in range(0, len(data), chunk_size):
170-
yield mv[i : i + chunk_size]
171-
172-
173167
class HTTPAPI:
174168
"""HTTPAPI is a simple asyncio Matrix API request sender."""
175169

@@ -395,7 +389,7 @@ async def request(
395389
method, log_url, content, orig_content, query_params, headers, req_id, sensitive
396390
)
397391
API_CALLS.labels(method=metrics_method).inc()
398-
req_content = _async_iter_bytes(content) if do_fake_iter else content
392+
req_content = async_iter_bytes(content) if do_fake_iter else content
399393
start = time.monotonic()
400394
try:
401395
resp_data, resp = await self._send(

mautrix/client/api/modules/media_repository.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
MXOpenGraph,
2121
SerializerError,
2222
)
23+
from mautrix.util.async_iter_bytes import async_iter_bytes
2324
from mautrix.util.opt_prometheus import Histogram
2425

2526
from ..base import BaseClientAPI
@@ -286,15 +287,20 @@ async def _upload_to_url(
286287
headers: dict[str, str],
287288
data: bytes | bytearray | AsyncIterable[bytes],
288289
post_upload_query: dict[str, str],
290+
min_iter_size: int = 25 * 1024 * 1024,
289291
) -> None:
290292
retry_count = self.api.default_retry_count
291293
backoff = 4
294+
do_fake_iter = data and hasattr(data, "__len__") and len(data) > min_iter_size
295+
if do_fake_iter:
296+
headers["Content-Length"] = str(len(data))
292297
while True:
293298
self.log.debug("Uploading media to external URL %s", upload_url)
294299
upload_response = None
295300
try:
301+
req_data = async_iter_bytes(data) if do_fake_iter else data
296302
upload_response = await self.api.session.put(
297-
upload_url, data=data, headers=headers
303+
upload_url, data=req_data, headers=headers
298304
)
299305
upload_response.raise_for_status()
300306
except Exception as e:

mautrix/util/__init__.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
11
__all__ = [
2+
"async_db",
3+
"config",
4+
"db",
25
"formatter",
36
"logging",
4-
"config",
5-
"signed_token",
6-
"simple_template",
7-
"manhole",
8-
"markdown",
9-
"simple_lock",
7+
"async_getter_lock",
8+
"async_iter_bytes",
9+
"bridge_state",
10+
"color_log",
11+
"ffmpeg",
1012
"file_store",
11-
"program",
12-
"async_db",
13-
"db",
14-
"opt_prometheus",
13+
"format_duration",
1514
"magic",
16-
"bridge_state",
15+
"manhole",
16+
"markdown",
1717
"message_send_checkpoint",
18-
"variation_selector",
19-
"format_duration",
20-
"ffmpeg",
18+
"opt_prometheus",
19+
"program",
20+
"signed_token",
21+
"simple_lock",
22+
"simple_template",
2123
"utf16_surrogate",
24+
"variation_selector",
2225
]

mautrix/util/async_iter_bytes.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright (c) 2023 Tulir Asokan
2+
#
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
from typing import AsyncGenerator, Union
7+
8+
AsyncBody = AsyncGenerator[Union[bytes, bytearray, memoryview], None]
9+
10+
11+
async def async_iter_bytes(data: bytearray | bytes, chunk_size: int = 1024**2) -> AsyncBody:
12+
"""
13+
Return memory views into a byte array in chunks. This is used to prevent aiohttp from copying
14+
the entire request body.
15+
16+
Args:
17+
data: The underlying data to iterate through.
18+
chunk_size: How big each returned chunk should be.
19+
20+
Returns:
21+
An async generator that yields the given data in chunks.
22+
"""
23+
with memoryview(data) as mv:
24+
for i in range(0, len(data), chunk_size):
25+
yield mv[i : i + chunk_size]
26+
27+
28+
__all__ = ["AsyncBody", "async_iter_bytes"]

0 commit comments

Comments
 (0)