|
28 | 28 | Union, |
29 | 29 | ) |
30 | 30 |
|
31 | | -from pymongo import _csot, ssl_support |
| 31 | +from pymongo import ssl_support |
32 | 32 | from pymongo._asyncio_task import create_task |
33 | 33 | from pymongo.errors import _OperationCancelled |
34 | 34 | from pymongo.socket_checker import _errno_from_exception |
@@ -316,62 +316,47 @@ async def _async_receive(conn: socket.socket, length: int, loop: AbstractEventLo |
316 | 316 | return mv |
317 | 317 |
|
318 | 318 |
|
319 | | -# Sync version: |
320 | | -def wait_for_read(conn: Connection, deadline: Optional[float]) -> None: |
321 | | - """Block until at least one byte is read, or a timeout, or a cancel.""" |
322 | | - sock = conn.conn |
323 | | - timed_out = False |
324 | | - # Check if the connection's socket has been manually closed |
325 | | - if sock.fileno() == -1: |
326 | | - return |
327 | | - while True: |
328 | | - # SSLSocket can have buffered data which won't be caught by select. |
329 | | - if hasattr(sock, "pending") and sock.pending() > 0: |
330 | | - readable = True |
331 | | - else: |
332 | | - # Wait up to 500ms for the socket to become readable and then |
333 | | - # check for cancellation. |
334 | | - if deadline: |
335 | | - remaining = deadline - time.monotonic() |
336 | | - # When the timeout has expired perform one final check to |
337 | | - # see if the socket is readable. This helps avoid spurious |
338 | | - # timeouts on AWS Lambda and other FaaS environments. |
339 | | - if remaining <= 0: |
340 | | - timed_out = True |
341 | | - timeout = max(min(remaining, _POLL_TIMEOUT), 0) |
342 | | - else: |
343 | | - timeout = _POLL_TIMEOUT |
344 | | - readable = conn.socket_checker.select(sock, read=True, timeout=timeout) |
345 | | - if conn.cancel_context.cancelled: |
346 | | - raise _OperationCancelled("operation cancelled") |
347 | | - if readable: |
348 | | - return |
349 | | - if timed_out: |
350 | | - raise socket.timeout("timed out") |
351 | | - |
352 | | - |
353 | 319 | def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> memoryview: |
354 | 320 | buf = bytearray(length) |
355 | 321 | mv = memoryview(buf) |
356 | 322 | bytes_read = 0 |
357 | | - while bytes_read < length: |
358 | | - try: |
359 | | - wait_for_read(conn, deadline) |
360 | | - # CSOT: Update timeout. When the timeout has expired perform one |
361 | | - # final non-blocking recv. This helps avoid spurious timeouts when |
362 | | - # the response is actually already buffered on the client. |
363 | | - if _csot.get_timeout() and deadline is not None: |
364 | | - conn.set_conn_timeout(max(deadline - time.monotonic(), 0)) |
365 | | - chunk_length = conn.conn.recv_into(mv[bytes_read:]) |
366 | | - except BLOCKING_IO_ERRORS: |
367 | | - raise socket.timeout("timed out") from None |
368 | | - except OSError as exc: |
369 | | - if _errno_from_exception(exc) == errno.EINTR: |
| 323 | + # To support cancelling a network read, we shorten the socket timeout and |
| 324 | + # check for the cancellation signal after each timeout. Alternatively we |
| 325 | + # could close the socket but that does not reliably cancel recv() calls |
| 326 | + # on all OSes. |
| 327 | + orig_timeout = conn.conn.gettimeout() |
| 328 | + try: |
| 329 | + while bytes_read < length: |
| 330 | + if deadline is not None: |
| 331 | + # CSOT: Update timeout. When the timeout has expired perform one |
| 332 | + # final non-blocking recv. This helps avoid spurious timeouts when |
| 333 | + # the response is actually already buffered on the client. |
| 334 | + short_timeout = min(max(deadline - time.monotonic(), 0), _POLL_TIMEOUT) |
| 335 | + else: |
| 336 | + short_timeout = _POLL_TIMEOUT |
| 337 | + conn.set_conn_timeout(short_timeout) |
| 338 | + try: |
| 339 | + chunk_length = conn.conn.recv_into(mv[bytes_read:]) |
| 340 | + except BLOCKING_IO_ERRORS: |
| 341 | + if conn.cancel_context.cancelled: |
| 342 | + raise _OperationCancelled("operation cancelled") from None |
| 343 | + # We reached the true deadline. |
| 344 | + raise socket.timeout("timed out") from None |
| 345 | + except socket.timeout: |
| 346 | + if conn.cancel_context.cancelled: |
| 347 | + raise _OperationCancelled("operation cancelled") from None |
370 | 348 | continue |
371 | | - raise |
372 | | - if chunk_length == 0: |
373 | | - raise OSError("connection closed") |
374 | | - |
375 | | - bytes_read += chunk_length |
| 349 | + except OSError as exc: |
| 350 | + if conn.cancel_context.cancelled: |
| 351 | + raise _OperationCancelled("operation cancelled") from None |
| 352 | + if _errno_from_exception(exc) == errno.EINTR: |
| 353 | + continue |
| 354 | + raise |
| 355 | + if chunk_length == 0: |
| 356 | + raise OSError("connection closed") |
| 357 | + |
| 358 | + bytes_read += chunk_length |
| 359 | + finally: |
| 360 | + conn.set_conn_timeout(orig_timeout) |
376 | 361 |
|
377 | 362 | return mv |
0 commit comments