Skip to content

Commit 39dcb1e

Browse files
committed
debugging
1 parent fc010ee commit 39dcb1e

File tree

3 files changed

+51
-163
lines changed

3 files changed

+51
-163
lines changed

pymongo/asynchronous/network.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import datetime
1919
import logging
20+
import statistics
2021
import time
2122
from asyncio import streams, StreamReader
2223
from typing import (
@@ -61,6 +62,11 @@
6162

6263
_IS_SYNC = False
6364

65+
TOTAL = []
66+
TOTAL_WRITE = []
67+
TOTAL_READ = []
68+
# print(f"TOTALS: {TOTAL, TOTAL_WRITE, TOTAL_READ}")
69+
6470

6571
async def command_stream(
6672
conn: AsyncConnectionStream,
@@ -113,7 +119,6 @@ async def command_stream(
113119
bson._decode_all_selective.
114120
:param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed.
115121
"""
116-
# print("Running stream command!")
117122
name = next(iter(spec))
118123
ns = dbname + ".$cmd"
119124
speculative_hello = False
@@ -194,13 +199,24 @@ async def command_stream(
194199
)
195200

196201
try:
202+
write_start = time.monotonic()
197203
await async_sendall_stream(conn.conn[1], msg)
204+
write_elapsed = time.monotonic() - write_start
198205
if use_op_msg and unacknowledged:
199206
# Unacknowledged, fake a successful command response.
200207
reply = None
201208
response_doc: _DocumentOut = {"ok": 1}
202209
else:
210+
read_start = time.monotonic()
203211
reply = await receive_message_stream(conn.conn[0], request_id)
212+
read_elapsed = time.monotonic() - read_start
213+
# if name == "insert":
214+
# TOTAL.append(write_elapsed + read_elapsed)
215+
# TOTAL_READ.append(read_elapsed)
216+
# TOTAL_WRITE.append(write_elapsed)
217+
# if name == "endSessions":
218+
# print(
219+
# f"AVERAGE READ: {statistics.mean(TOTAL_READ)}, AVERAGE WRITE: {statistics.mean(TOTAL_WRITE)}, AVERAGE ELAPSED: {statistics.mean(TOTAL)}")
204220
conn.more_to_come = reply.more_to_come
205221
unpacked_docs = reply.unpack_response(
206222
codec_options=codec_options, user_fields=user_fields
@@ -313,7 +329,10 @@ async def receive_message_stream(
313329
# deadline = None
314330
deadline = None
315331
# Ignore the response's request id.
332+
read_start = time.monotonic()
316333
length, _, response_to, op_code = _UNPACK_HEADER(await async_receive_data_stream(conn, 16, deadline))
334+
read_elapsed = time.monotonic() - read_start
335+
# print(f"Read header in {read_elapsed}")
317336
# No request_id for exhaust cursor "getMore".
318337
if request_id is not None:
319338
if request_id != response_to:
@@ -333,7 +352,10 @@ async def receive_message_stream(
333352
)
334353
data = decompress(await async_receive_data_stream(conn, length - 25, deadline), compressor_id)
335354
else:
355+
read_start = time.monotonic()
336356
data = await async_receive_data_stream(conn, length - 16, deadline)
357+
read_elapsed = time.monotonic() - read_start
358+
# print(f"Read body in {read_elapsed}")
337359

338360
try:
339361
unpack_reply = _UNPACK_REPLY[op_code]

pymongo/asynchronous/server.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
from __future__ import annotations
1717

1818
import logging
19+
import statistics
20+
import time
1921
from datetime import datetime
2022
from typing import (
2123
TYPE_CHECKING,
@@ -58,6 +60,12 @@
5860
_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}}
5961

6062

63+
TOTAL = []
64+
TOTAL_WRITE = []
65+
TOTAL_READ = []
66+
# print(f"TOTALS: {TOTAL, TOTAL_WRITE, TOTAL_READ}")
67+
68+
6169
class Server:
6270
def __init__(
6371
self,
@@ -204,8 +212,19 @@ async def run_operation(
204212
if more_to_come:
205213
reply = await conn.receive_message(None)
206214
else:
215+
write_start = time.monotonic()
207216
await conn.send_message(data, max_doc_size)
217+
write_elapsed = time.monotonic() - write_start
218+
219+
read_start = time.monotonic()
208220
reply = await conn.receive_message(request_id)
221+
read_elapsed = time.monotonic() - read_start
222+
223+
# TOTAL.append(write_elapsed + read_elapsed)
224+
# TOTAL_READ.append(read_elapsed)
225+
# TOTAL_WRITE.append(write_elapsed)
226+
# print(
227+
# f"AVERAGE READ: {statistics.mean(TOTAL_READ)}, AVERAGE WRITE: {statistics.mean(TOTAL_WRITE)}, AVERAGE ELAPSED: {statistics.mean(TOTAL)}")
209228

210229
# Unpack and check for command errors.
211230
if use_cmd:

pymongo/network_layer.py

Lines changed: 9 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import asyncio
1919
import errno
2020
import socket
21+
import statistics
2122
import struct
2223
import sys
2324
import time
@@ -68,6 +69,7 @@
6869
# Errors raised by sockets (and TLS sockets) when in non-blocking mode.
6970
BLOCKING_IO_ERRORS = (BlockingIOError, BLOCKING_IO_LOOKUP_ERROR, *ssl_support.BLOCKING_IO_ERRORS)
7071

72+
7173
async def async_sendall_stream(stream: asyncio.StreamWriter, buf: bytes) -> None:
7274
try:
7375
stream.write(buf)
@@ -77,161 +79,14 @@ async def async_sendall_stream(stream: asyncio.StreamWriter, buf: bytes) -> None
7779
raise socket.timeout("timed out") from exc
7880

7981

80-
if sys.platform != "win32":
81-
82-
async def _async_sendall_ssl(
83-
sock: Union[socket.socket, _sslConn], buf: bytes, loop: AbstractEventLoop
84-
) -> None:
85-
view = memoryview(buf)
86-
sent = 0
87-
88-
def _is_ready(fut: Future) -> None:
89-
if fut.done():
90-
return
91-
fut.set_result(None)
92-
93-
while sent < len(buf):
94-
try:
95-
sent += sock.send(view[sent:])
96-
except BLOCKING_IO_ERRORS as exc:
97-
fd = sock.fileno()
98-
# Check for closed socket.
99-
if fd == -1:
100-
raise SSLError("Underlying socket has been closed") from None
101-
if isinstance(exc, BLOCKING_IO_READ_ERROR):
102-
fut = loop.create_future()
103-
loop.add_reader(fd, _is_ready, fut)
104-
try:
105-
await fut
106-
finally:
107-
loop.remove_reader(fd)
108-
if isinstance(exc, BLOCKING_IO_WRITE_ERROR):
109-
fut = loop.create_future()
110-
loop.add_writer(fd, _is_ready, fut)
111-
try:
112-
await fut
113-
finally:
114-
loop.remove_writer(fd)
115-
if _HAVE_PYOPENSSL and isinstance(exc, BLOCKING_IO_LOOKUP_ERROR):
116-
fut = loop.create_future()
117-
loop.add_reader(fd, _is_ready, fut)
118-
try:
119-
loop.add_writer(fd, _is_ready, fut)
120-
await fut
121-
finally:
122-
loop.remove_reader(fd)
123-
loop.remove_writer(fd)
124-
125-
async def _async_receive_ssl(
126-
conn: _sslConn, length: int, loop: AbstractEventLoop, once: Optional[bool] = False
127-
) -> memoryview:
128-
mv = memoryview(bytearray(length))
129-
total_read = 0
130-
131-
def _is_ready(fut: Future) -> None:
132-
if fut.done():
133-
return
134-
fut.set_result(None)
135-
136-
while total_read < length:
137-
try:
138-
read = conn.recv_into(mv[total_read:])
139-
if read == 0:
140-
raise OSError("connection closed")
141-
# KMS responses update their expected size after the first batch, stop reading after one loop
142-
if once:
143-
return mv[:read]
144-
total_read += read
145-
except BLOCKING_IO_ERRORS as exc:
146-
fd = conn.fileno()
147-
# Check for closed socket.
148-
if fd == -1:
149-
raise SSLError("Underlying socket has been closed") from None
150-
if isinstance(exc, BLOCKING_IO_READ_ERROR):
151-
fut = loop.create_future()
152-
loop.add_reader(fd, _is_ready, fut)
153-
try:
154-
await fut
155-
finally:
156-
loop.remove_reader(fd)
157-
if isinstance(exc, BLOCKING_IO_WRITE_ERROR):
158-
fut = loop.create_future()
159-
loop.add_writer(fd, _is_ready, fut)
160-
try:
161-
await fut
162-
finally:
163-
loop.remove_writer(fd)
164-
if _HAVE_PYOPENSSL and isinstance(exc, BLOCKING_IO_LOOKUP_ERROR):
165-
fut = loop.create_future()
166-
loop.add_reader(fd, _is_ready, fut)
167-
try:
168-
loop.add_writer(fd, _is_ready, fut)
169-
await fut
170-
finally:
171-
loop.remove_reader(fd)
172-
loop.remove_writer(fd)
173-
return mv
174-
175-
else:
176-
# The default Windows asyncio event loop does not support loop.add_reader/add_writer:
177-
# https://docs.python.org/3/library/asyncio-platforms.html#asyncio-platform-support
178-
# Note: In PYTHON-4493 we plan to replace this code with asyncio streams.
179-
async def _async_sendall_ssl(
180-
sock: Union[socket.socket, _sslConn], buf: bytes, dummy: AbstractEventLoop
181-
) -> None:
182-
view = memoryview(buf)
183-
total_length = len(buf)
184-
total_sent = 0
185-
# Backoff starts at 1ms, doubles on timeout up to 512ms, and halves on success
186-
# down to 1ms.
187-
backoff = 0.001
188-
while total_sent < total_length:
189-
try:
190-
sent = sock.send(view[total_sent:])
191-
except BLOCKING_IO_ERRORS:
192-
await asyncio.sleep(backoff)
193-
sent = 0
194-
if sent > 0:
195-
backoff = max(backoff / 2, 0.001)
196-
else:
197-
backoff = min(backoff * 2, 0.512)
198-
total_sent += sent
199-
200-
async def _async_receive_ssl(
201-
conn: _sslConn, length: int, dummy: AbstractEventLoop, once: Optional[bool] = False
202-
) -> memoryview:
203-
mv = memoryview(bytearray(length))
204-
total_read = 0
205-
# Backoff starts at 1ms, doubles on timeout up to 512ms, and halves on success
206-
# down to 1ms.
207-
backoff = 0.001
208-
while total_read < length:
209-
try:
210-
read = conn.recv_into(mv[total_read:])
211-
if read == 0:
212-
raise OSError("connection closed")
213-
# KMS responses update their expected size after the first batch, stop reading after one loop
214-
if once:
215-
return mv[:read]
216-
except BLOCKING_IO_ERRORS:
217-
await asyncio.sleep(backoff)
218-
read = 0
219-
if read > 0:
220-
backoff = max(backoff / 2, 0.001)
221-
else:
222-
backoff = min(backoff * 2, 0.512)
223-
total_read += read
224-
return mv
225-
226-
22782
def sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:
22883
sock.sendall(buf)
22984

23085

23186
async def _poll_cancellation(conn: AsyncConnection) -> None:
232-
# while True:
233-
# if conn.cancel_context.cancelled:
234-
# return
87+
while True:
88+
if conn.cancel_context.cancelled:
89+
return
23590

23691
await asyncio.sleep(_POLL_TIMEOUT)
23792

@@ -295,19 +150,11 @@ async def async_receive_data_socket(
295150
sock.settimeout(sock_timeout)
296151

297152

298-
299153
async def _async_receive_stream(reader: asyncio.StreamReader, length: int) -> memoryview:
300-
mv = bytearray(length)
301-
total_read = 0
302-
303-
while total_read < length:
304-
bytes = await reader.read(length)
305-
chunk_length = len(bytes)
306-
if chunk_length == 0:
307-
raise OSError("connection closed")
308-
mv[total_read:] = bytes
309-
total_read += chunk_length
310-
return memoryview(mv)
154+
try:
155+
return memoryview(await reader.readexactly(length))
156+
except asyncio.IncompleteReadError:
157+
raise OSError("connection closed")
311158

312159
def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> memoryview:
313160
buf = bytearray(length)

0 commit comments

Comments
 (0)