Skip to content

Commit 5cf39d9

Browse files
[PR #9780/eac8fb84 backport][3.11] Replace get_event_loop with get_running_loop in the compressor (#9789)
Co-authored-by: J. Nick Koston <[email protected]>
1 parent fb22e1e commit 5cf39d9

File tree

2 files changed

+41
-4
lines changed

2 files changed

+41
-4
lines changed

aiohttp/compression_utils.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ def compress_sync(self, data: bytes) -> bytes:
7070
return self._compressor.compress(data)
7171

7272
async def compress(self, data: bytes) -> bytes:
73+
"""Compress the data and returned the compressed bytes.
74+
75+
Note that flush() must be called after the last call to compress()
76+
77+
If the data size is large than the max_sync_chunk_size, the compression
78+
will be done in the executor. Otherwise, the compression will be done
79+
in the event loop.
80+
"""
7381
async with self._compress_lock:
7482
# To ensure the stream is consistent in the event
7583
# there are multiple writers, we need to lock
@@ -79,8 +87,8 @@ async def compress(self, data: bytes) -> bytes:
7987
self._max_sync_chunk_size is not None
8088
and len(data) > self._max_sync_chunk_size
8189
):
82-
return await asyncio.get_event_loop().run_in_executor(
83-
self._executor, self.compress_sync, data
90+
return await asyncio.get_running_loop().run_in_executor(
91+
self._executor, self._compressor.compress, data
8492
)
8593
return self.compress_sync(data)
8694

@@ -107,12 +115,18 @@ def decompress_sync(self, data: bytes, max_length: int = 0) -> bytes:
107115
return self._decompressor.decompress(data, max_length)
108116

109117
async def decompress(self, data: bytes, max_length: int = 0) -> bytes:
118+
"""Decompress the data and return the decompressed bytes.
119+
120+
If the data size is large than the max_sync_chunk_size, the decompression
121+
will be done in the executor. Otherwise, the decompression will be done
122+
in the event loop.
123+
"""
110124
if (
111125
self._max_sync_chunk_size is not None
112126
and len(data) > self._max_sync_chunk_size
113127
):
114-
return await asyncio.get_event_loop().run_in_executor(
115-
self._executor, self.decompress_sync, data, max_length
128+
return await asyncio.get_running_loop().run_in_executor(
129+
self._executor, self._decompressor.decompress, data, max_length
116130
)
117131
return self.decompress_sync(data, max_length)
118132

tests/test_compression_utils.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""Tests for compression utils."""
2+
3+
from aiohttp.compression_utils import ZLibCompressor, ZLibDecompressor
4+
5+
6+
async def test_compression_round_trip_in_executor() -> None:
7+
"""Ensure that compression and decompression work correctly in the executor."""
8+
compressor = ZLibCompressor(max_sync_chunk_size=1)
9+
decompressor = ZLibDecompressor(max_sync_chunk_size=1)
10+
data = b"Hi" * 100
11+
compressed_data = await compressor.compress(data) + compressor.flush()
12+
decompressed_data = await decompressor.decompress(compressed_data)
13+
assert data == decompressed_data
14+
15+
16+
async def test_compression_round_trip_in_event_loop() -> None:
17+
"""Ensure that compression and decompression work correctly in the event loop."""
18+
compressor = ZLibCompressor(max_sync_chunk_size=10000)
19+
decompressor = ZLibDecompressor(max_sync_chunk_size=10000)
20+
data = b"Hi" * 100
21+
compressed_data = await compressor.compress(data) + compressor.flush()
22+
decompressed_data = await decompressor.decompress(compressed_data)
23+
assert data == decompressed_data

0 commit comments

Comments
 (0)