|
18 | 18 | import asyncio
|
19 | 19 | import socket
|
20 | 20 | import struct
|
| 21 | +import sys |
| 22 | +from asyncio import AbstractEventLoop, Future |
21 | 23 | from typing import (
|
22 |
| - TYPE_CHECKING, |
23 | 24 | Union,
|
24 | 25 | )
|
25 | 26 |
|
26 | 27 | from pymongo import ssl_support
|
27 | 28 |
|
28 |
| -if TYPE_CHECKING: |
29 |
| - from pymongo.pyopenssl_context import _sslConn |
| 29 | +try: |
| 30 | + from ssl import SSLError, SSLSocket |
| 31 | + |
| 32 | + _HAVE_SSL = True |
| 33 | +except ImportError: |
| 34 | + _HAVE_SSL = False |
| 35 | + |
| 36 | +try: |
| 37 | + from pymongo.pyopenssl_context import ( |
| 38 | + BLOCKING_IO_LOOKUP_ERROR, |
| 39 | + BLOCKING_IO_READ_ERROR, |
| 40 | + BLOCKING_IO_WRITE_ERROR, |
| 41 | + _sslConn, |
| 42 | + ) |
| 43 | + |
| 44 | + _HAVE_PYOPENSSL = True |
| 45 | +except ImportError: |
| 46 | + _HAVE_PYOPENSSL = False |
| 47 | + _sslConn = SSLSocket # type: ignore |
| 48 | + from pymongo.ssl_support import ( # type: ignore[assignment] |
| 49 | + BLOCKING_IO_LOOKUP_ERROR, |
| 50 | + BLOCKING_IO_READ_ERROR, |
| 51 | + BLOCKING_IO_WRITE_ERROR, |
| 52 | + ) |
30 | 53 |
|
31 | 54 | _UNPACK_HEADER = struct.Struct("<iiii").unpack
|
32 | 55 | _UNPACK_COMPRESSION_HEADER = struct.Struct("<iiB").unpack
|
33 | 56 | _POLL_TIMEOUT = 0.5
|
34 | 57 | # Errors raised by sockets (and TLS sockets) when in non-blocking mode.
|
35 |
| -BLOCKING_IO_ERRORS = (BlockingIOError, *ssl_support.BLOCKING_IO_ERRORS) |
| 58 | +BLOCKING_IO_ERRORS = (BlockingIOError, BLOCKING_IO_LOOKUP_ERROR, *ssl_support.BLOCKING_IO_ERRORS) |
36 | 59 |
|
37 | 60 |
|
38 |
| -async def async_sendall(socket: Union[socket.socket, _sslConn], buf: bytes) -> None: |
39 |
| - timeout = socket.gettimeout() |
40 |
| - socket.settimeout(0.0) |
| 61 | +async def async_sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None: |
| 62 | + timeout = sock.gettimeout() |
| 63 | + sock.settimeout(0.0) |
41 | 64 | loop = asyncio.get_event_loop()
|
42 | 65 | try:
|
43 |
| - await asyncio.wait_for(loop.sock_sendall(socket, buf), timeout=timeout) # type: ignore[arg-type] |
| 66 | + if _HAVE_SSL and isinstance(sock, (SSLSocket, _sslConn)): |
| 67 | + if sys.platform == "win32": |
| 68 | + await asyncio.wait_for(_async_sendall_ssl_windows(sock, buf), timeout=timeout) |
| 69 | + else: |
| 70 | + await asyncio.wait_for(_async_sendall_ssl(sock, buf, loop), timeout=timeout) |
| 71 | + else: |
| 72 | + await asyncio.wait_for(loop.sock_sendall(sock, buf), timeout=timeout) # type: ignore[arg-type] |
44 | 73 | finally:
|
45 |
| - socket.settimeout(timeout) |
| 74 | + sock.settimeout(timeout) |
| 75 | + |
| 76 | + |
| 77 | +async def _async_sendall_ssl( |
| 78 | + sock: Union[socket.socket, _sslConn], buf: bytes, loop: AbstractEventLoop |
| 79 | +) -> None: |
| 80 | + fd = sock.fileno() |
| 81 | + sent = 0 |
| 82 | + |
| 83 | + def _is_ready(fut: Future) -> None: |
| 84 | + loop.remove_writer(fd) |
| 85 | + loop.remove_reader(fd) |
| 86 | + if fut.done(): |
| 87 | + return |
| 88 | + fut.set_result(None) |
| 89 | + |
| 90 | + while sent < len(buf): |
| 91 | + try: |
| 92 | + sent += sock.send(buf) |
| 93 | + except BLOCKING_IO_ERRORS as exc: |
| 94 | + fd = sock.fileno() |
| 95 | + # Check for closed socket. |
| 96 | + if fd == -1: |
| 97 | + raise SSLError("Underlying socket has been closed") from None |
| 98 | + if isinstance(exc, BLOCKING_IO_READ_ERROR): |
| 99 | + fut = loop.create_future() |
| 100 | + loop.add_reader(fd, _is_ready, fut) |
| 101 | + await fut |
| 102 | + if isinstance(exc, BLOCKING_IO_WRITE_ERROR): |
| 103 | + fut = loop.create_future() |
| 104 | + loop.add_writer(fd, _is_ready, fut) |
| 105 | + await fut |
| 106 | + if _HAVE_PYOPENSSL and isinstance(exc, BLOCKING_IO_LOOKUP_ERROR): |
| 107 | + fut = loop.create_future() |
| 108 | + loop.add_reader(fd, _is_ready, fut) |
| 109 | + loop.add_writer(fd, _is_ready, fut) |
| 110 | + await fut |
| 111 | + |
| 112 | + |
| 113 | +# The default Windows asyncio event loop does not support loop.add_reader/add_writer: https://docs.python.org/3/library/asyncio-platforms.html#asyncio-platform-support |
| 114 | +async def _async_sendall_ssl_windows(sock: Union[socket.socket, _sslConn], buf: bytes) -> None: |
| 115 | + view = memoryview(buf) |
| 116 | + total_length = len(buf) |
| 117 | + total_sent = 0 |
| 118 | + while total_sent < total_length: |
| 119 | + try: |
| 120 | + sent = sock.send(view[total_sent:]) |
| 121 | + except BLOCKING_IO_ERRORS: |
| 122 | + await asyncio.sleep(0.5) |
| 123 | + sent = 0 |
| 124 | + total_sent += sent |
46 | 125 |
|
47 | 126 |
|
48 |
| -def sendall(socket: Union[socket.socket, _sslConn], buf: bytes) -> None: |
49 |
| - socket.sendall(buf) |
| 127 | +def sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None: |
| 128 | + sock.sendall(buf) |
0 commit comments