-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-4636 - Avoid blocking I/O calls in async code paths #1870
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
adb1670
3d399da
7f71430
d69b5f6
45b4045
0e03581
b241f6d
43a6a54
838852c
33a927c
5dc1fd2
d9c9612
4ddb553
1e9a134
bd9c003
2422125
5b37776
94eb693
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -16,15 +16,21 @@ | |||
from __future__ import annotations | ||||
|
||||
import asyncio | ||||
import errno | ||||
import socket | ||||
import struct | ||||
import sys | ||||
import time | ||||
from asyncio import AbstractEventLoop, Future | ||||
from typing import ( | ||||
TYPE_CHECKING, | ||||
Optional, | ||||
Union, | ||||
) | ||||
|
||||
from pymongo import ssl_support | ||||
from pymongo import _csot, ssl_support | ||||
from pymongo.errors import _OperationCancelled | ||||
from pymongo.socket_checker import _errno_from_exception | ||||
|
||||
try: | ||||
from ssl import SSLError, SSLSocket | ||||
|
@@ -51,6 +57,10 @@ | |||
BLOCKING_IO_WRITE_ERROR, | ||||
) | ||||
|
||||
if TYPE_CHECKING: | ||||
from pymongo.asynchronous.pool import AsyncConnection | ||||
from pymongo.synchronous.pool import Connection | ||||
|
||||
_UNPACK_HEADER = struct.Struct("<iiii").unpack | ||||
_UNPACK_COMPRESSION_HEADER = struct.Struct("<iiB").unpack | ||||
_POLL_TIMEOUT = 0.5 | ||||
|
@@ -111,6 +121,46 @@ def _is_ready(fut: Future) -> None: | |||
loop.add_reader(fd, _is_ready, fut) | ||||
loop.add_writer(fd, _is_ready, fut) | ||||
await fut | ||||
|
||||
async def _async_receive_ssl( | ||||
conn: _sslConn, length: int, loop: AbstractEventLoop | ||||
) -> memoryview: | ||||
mv = memoryview(bytearray(length)) | ||||
fd = conn.fileno() | ||||
read = 0 | ||||
|
||||
def _is_ready(fut: Future) -> None: | ||||
loop.remove_writer(fd) | ||||
loop.remove_reader(fd) | ||||
if fut.done(): | ||||
return | ||||
fut.set_result(None) | ||||
|
||||
while read < length: | ||||
try: | ||||
read += conn.recv_into(mv[read:]) | ||||
if read == 0: | ||||
raise OSError("connection closed") | ||||
except BLOCKING_IO_ERRORS as exc: | ||||
fd = conn.fileno() | ||||
# Check for closed socket. | ||||
if fd == -1: | ||||
raise SSLError("Underlying socket has been closed") from None | ||||
if isinstance(exc, BLOCKING_IO_READ_ERROR): | ||||
fut = loop.create_future() | ||||
loop.add_reader(fd, _is_ready, fut) | ||||
await fut | ||||
if isinstance(exc, BLOCKING_IO_WRITE_ERROR): | ||||
fut = loop.create_future() | ||||
loop.add_writer(fd, _is_ready, fut) | ||||
await fut | ||||
if _HAVE_PYOPENSSL and isinstance(exc, BLOCKING_IO_LOOKUP_ERROR): | ||||
fut = loop.create_future() | ||||
loop.add_reader(fd, _is_ready, fut) | ||||
loop.add_writer(fd, _is_ready, fut) | ||||
await fut | ||||
return mv | ||||
|
||||
else: | ||||
# The default Windows asyncio event loop does not support loop.add_reader/add_writer: | ||||
# https://docs.python.org/3/library/asyncio-platforms.html#asyncio-platform-support | ||||
|
@@ -128,6 +178,136 @@ async def _async_sendall_ssl( | |||
sent = 0 | ||||
total_sent += sent | ||||
|
||||
async def _async_receive_ssl( | ||||
conn: _sslConn, length: int, dummy: AbstractEventLoop | ||||
) -> memoryview: | ||||
mv = memoryview(bytearray(length)) | ||||
total_read = 0 | ||||
while total_read < length: | ||||
try: | ||||
read = conn.recv_into(mv[total_read:]) | ||||
except BLOCKING_IO_ERRORS: | ||||
await asyncio.sleep(0.5) | ||||
|
await asyncio.sleep(0.5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right but the existing code was only for send() whereas this new code is for recv so the perf impact could be different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I'll schedule some Windows tasks.
ShaneHarvey marked this conversation as resolved.
Show resolved
Hide resolved
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we used read_task.result() would that avoid the type error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be something like: