Skip to content

Commit 933af51

Browse files
committed
Add utility for reading aiohttp response into bytearray
1 parent 51c4e4b commit 933af51

File tree

5 files changed

+100
-33
lines changed

5 files changed

+100
-33
lines changed

mautrix/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
from mautrix import __optional_imports__, __version__ as mautrix_version
2424
from mautrix.errors import MatrixConnectionError, MatrixRequestError, make_request_error
25-
from mautrix.util.async_iter_bytes import AsyncBody, async_iter_bytes
25+
from mautrix.util.async_body import AsyncBody, async_iter_bytes
2626
from mautrix.util.logging import TraceLogger
2727
from mautrix.util.opt_prometheus import Counter
2828

mautrix/client/api/modules/media_repository.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
MXOpenGraph,
2121
SerializerError,
2222
)
23-
from mautrix.util.async_iter_bytes import async_iter_bytes
23+
from mautrix.util.async_body import async_iter_bytes
2424
from mautrix.util.opt_prometheus import Histogram
2525

2626
from ..base import BaseClientAPI

mautrix/util/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
__all__ = [
2+
# Directory modules
23
"async_db",
34
"config",
45
"db",
56
"formatter",
67
"logging",
8+
# File modules
9+
"async_body",
710
"async_getter_lock",
8-
"async_iter_bytes",
911
"bridge_state",
1012
"color_log",
1113
"ffmpeg",

mautrix/util/async_body.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Copyright (c) 2023 Tulir Asokan
2+
#
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
from __future__ import annotations
7+
8+
from typing import AsyncGenerator, Union
9+
import logging
10+
11+
import aiohttp
12+
13+
AsyncBody = AsyncGenerator[Union[bytes, bytearray, memoryview], None]
14+
15+
16+
async def async_iter_bytes(data: bytearray | bytes, chunk_size: int = 1024**2) -> AsyncBody:
17+
"""
18+
Return memory views into a byte array in chunks. This is used to prevent aiohttp from copying
19+
the entire request body.
20+
21+
Args:
22+
data: The underlying data to iterate through.
23+
chunk_size: How big each returned chunk should be.
24+
25+
Returns:
26+
An async generator that yields the given data in chunks.
27+
"""
28+
with memoryview(data) as mv:
29+
for i in range(0, len(data), chunk_size):
30+
yield mv[i : i + chunk_size]
31+
32+
33+
class FileTooLargeError(Exception):
34+
def __init__(self, max_size: int) -> None:
35+
super().__init__(f"File size larger than maximum ({max_size / 1024 / 1024} MiB)")
36+
37+
38+
_default_dl_log = logging.getLogger("mau.util.download")
39+
40+
41+
async def read_response_chunks(
42+
resp: aiohttp.ClientResponse, max_size: int, log: logging.Logger = _default_dl_log
43+
) -> bytearray:
44+
"""
45+
Read the body from an aiohttp response in chunks into a mutable bytearray.
46+
47+
Args:
48+
resp: The aiohttp response object to read the body from.
49+
max_size: The maximum size to read. FileTooLargeError will be raised if the Content-Length
50+
is higher than this, or if the body exceeds this size during reading.
51+
log: A logger for logging download status.
52+
53+
Returns:
54+
The body data as a byte array.
55+
56+
Raises:
57+
FileTooLargeError: if the body is larger than the provided max_size.
58+
"""
59+
content_length = int(resp.headers.get("Content-Length", "0"))
60+
if 0 < max_size < content_length:
61+
raise FileTooLargeError(max_size)
62+
size_str = "unknown length" if content_length == 0 else f"{content_length} bytes"
63+
log.info(f"Reading file download response with {size_str} (max: {max_size})")
64+
data = bytearray(content_length)
65+
mv = memoryview(data) if content_length > 0 else None
66+
read_size = 0
67+
max_size += 1
68+
while True:
69+
block = await resp.content.readany()
70+
if not block:
71+
break
72+
max_size -= len(block)
73+
if max_size <= 0:
74+
raise FileTooLargeError(max_size)
75+
if len(data) >= read_size + len(block):
76+
mv[read_size : read_size + len(block)] = block
77+
elif len(data) > read_size:
78+
log.warning("File being downloaded is bigger than expected")
79+
mv[read_size:] = block[: len(data) - read_size]
80+
mv.release()
81+
mv = None
82+
data.extend(block[len(data) - read_size :])
83+
else:
84+
if mv is not None:
85+
mv.release()
86+
mv = None
87+
data.extend(block)
88+
read_size += len(block)
89+
if mv is not None:
90+
mv.release()
91+
log.info(f"Successfully read {read_size} bytes of file download response")
92+
return data
93+
94+
95+
__all__ = ["AsyncBody", "FileTooLargeError", "async_iter_bytes", "async_read_bytes"]

mautrix/util/async_iter_bytes.py

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)