Skip to content

Commit a5e8559

Browse files
PYTHON-1636 Support exhaust cursors in OP_MSG (#629) (#632)
(cherry picked from commit d26bf93)
1 parent a317778 commit a5e8559

File tree

5 files changed

+94
-68
lines changed

5 files changed

+94
-68
lines changed

pymongo/cursor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,21 @@ class CursorType(object):
8181

8282
# This has to be an old style class due to
8383
# http://bugs.jython.org/issue1057
84-
class _SocketManager:
84+
class _ExhaustManager:
8585
"""Used with exhaust cursors to ensure the socket is returned.
8686
"""
87-
def __init__(self, sock, pool):
87+
def __init__(self, sock, pool, more_to_come):
8888
self.sock = sock
8989
self.pool = pool
90+
self.more_to_come = more_to_come
9091
self.__closed = False
9192

9293
def __del__(self):
9394
self.close()
9495

96+
def update_exhaust(self, more_to_come):
97+
self.more_to_come = more_to_come
98+
9599
def close(self):
96100
"""Return this instance's socket to the connection pool.
97101
"""
@@ -1043,10 +1047,14 @@ def __send_message(self, operation):
10431047
raise
10441048

10451049
self.__address = response.address
1046-
if self.__exhaust and not self.__exhaust_mgr:
1050+
if self.__exhaust:
10471051
# 'response' is an ExhaustResponse.
1048-
self.__exhaust_mgr = _SocketManager(response.socket_info,
1049-
response.pool)
1052+
if not self.__exhaust_mgr:
1053+
self.__exhaust_mgr = _ExhaustManager(response.socket_info,
1054+
response.pool,
1055+
response.more_to_come)
1056+
else:
1057+
self.__exhaust_mgr.update_exhaust(response.more_to_come)
10501058

10511059
cmd_name = operation.name
10521060
docs = response.docs

pymongo/message.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,11 @@ def namespace(self):
269269

270270
def use_command(self, sock_info, exhaust):
271271
use_find_cmd = False
272-
if sock_info.max_wire_version >= 4:
273-
if not exhaust:
274-
use_find_cmd = True
272+
if sock_info.max_wire_version >= 4 and not exhaust:
273+
use_find_cmd = True
274+
elif sock_info.max_wire_version >= 8:
275+
# OP_MSG supports exhaust on MongoDB 4.2+
276+
use_find_cmd = True
275277
elif not self.read_concern.ok_for_legacy:
276278
raise ConfigurationError(
277279
'read concern level of %s is not valid '
@@ -398,8 +400,15 @@ def namespace(self):
398400
return _UJOIN % (self.db, self.coll)
399401

400402
def use_command(self, sock_info, exhaust):
403+
use_cmd = False
404+
if sock_info.max_wire_version >= 4 and not exhaust:
405+
use_cmd = True
406+
elif sock_info.max_wire_version >= 8:
407+
# OP_MSG supports exhaust on MongoDB 4.2+
408+
use_cmd = True
409+
401410
sock_info.validate_session(self.client, self.session)
402-
return sock_info.max_wire_version >= 4 and not exhaust
411+
return use_cmd
403412

404413
def as_command(self, sock_info):
405414
"""Return a getMore command document for this query."""
@@ -433,8 +442,12 @@ def get_message(self, dummy0, sock_info, use_cmd=False):
433442
if use_cmd:
434443
spec = self.as_command(sock_info)[0]
435444
if sock_info.op_msg_enabled:
445+
if self.exhaust_mgr:
446+
flags = _OpMsg.EXHAUST_ALLOWED
447+
else:
448+
flags = 0
436449
request_id, msg, size, _ = _op_msg(
437-
0, spec, self.db, None,
450+
flags, spec, self.db, None,
438451
False, False, self.codec_options,
439452
ctx=sock_info.compression_context)
440453
return request_id, msg, size
@@ -448,27 +461,23 @@ class _RawBatchQuery(_Query):
448461
def use_command(self, socket_info, exhaust):
449462
# Compatibility checks.
450463
super(_RawBatchQuery, self).use_command(socket_info, exhaust)
451-
# Use OP_MSG when available.
452-
if socket_info.op_msg_enabled and not exhaust:
464+
if socket_info.max_wire_version >= 8:
465+
# MongoDB 4.2+ supports exhaust over OP_MSG
466+
return True
467+
elif socket_info.op_msg_enabled and not exhaust:
453468
return True
454469
return False
455470

456-
def get_message(self, set_slave_ok, sock_info, use_cmd=False):
457-
return super(_RawBatchQuery, self).get_message(
458-
set_slave_ok, sock_info, use_cmd)
459-
460471

461472
class _RawBatchGetMore(_GetMore):
462473
def use_command(self, socket_info, exhaust):
463-
# Use OP_MSG when available.
464-
if socket_info.op_msg_enabled and not exhaust:
474+
if socket_info.max_wire_version >= 8:
475+
# MongoDB 4.2+ supports exhaust over OP_MSG
476+
return True
477+
elif socket_info.op_msg_enabled and not exhaust:
465478
return True
466479
return False
467480

468-
def get_message(self, set_slave_ok, sock_info, use_cmd=False):
469-
return super(_RawBatchGetMore, self).get_message(
470-
set_slave_ok, sock_info, use_cmd)
471-
472481

473482
class _CursorAddress(tuple):
474483
"""The server address (host, port) of a cursor, with namespace property."""

pymongo/response.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,12 @@ def docs(self):
6767
"""The decoded document(s)."""
6868
return self._docs
6969

70+
7071
class ExhaustResponse(Response):
71-
__slots__ = ('_socket_info', '_pool')
72+
__slots__ = ('_socket_info', '_pool', '_more_to_come')
7273

7374
def __init__(self, data, address, socket_info, pool, request_id, duration,
74-
from_command, docs):
75+
from_command, docs, more_to_come):
7576
"""Represent a response to an exhaust cursor's initial query.
7677
7778
:Parameters:
@@ -82,6 +83,9 @@ def __init__(self, data, address, socket_info, pool, request_id, duration,
8283
- `request_id`: The request id of this operation.
8384
- `duration`: The duration of the operation.
8485
- `from_command`: If the response is the result of a db command.
86+
- `docs`: List of documents.
87+
- `more_to_come`: Bool indicating whether cursor is ready to be
88+
exhausted.
8589
"""
8690
super(ExhaustResponse, self).__init__(data,
8791
address,
@@ -90,6 +94,7 @@ def __init__(self, data, address, socket_info, pool, request_id, duration,
9094
from_command, docs)
9195
self._socket_info = socket_info
9296
self._pool = pool
97+
self._more_to_come = more_to_come
9398

9499
@property
95100
def socket_info(self):
@@ -105,3 +110,9 @@ def socket_info(self):
105110
def pool(self):
106111
"""The Pool from which the SocketInfo came."""
107112
return self._pool
113+
114+
@property
115+
def more_to_come(self):
116+
"""If true, server is ready to send batches on the socket until the
117+
result set is exhausted or there is an error."""
118+
return self._more_to_come

pymongo/server.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from pymongo.errors import NotMasterError, OperationFailure
2222
from pymongo.helpers import _check_command_response
23-
from pymongo.message import _convert_exception
23+
from pymongo.message import _convert_exception, _OpMsg
2424
from pymongo.response import Response, ExhaustResponse
2525
from pymongo.server_type import SERVER_TYPE
2626

@@ -95,16 +95,15 @@ def run_operation_with_response(
9595
if publish:
9696
start = datetime.now()
9797

98-
send_message = not operation.exhaust_mgr
99-
100-
if send_message:
101-
use_cmd = operation.use_command(sock_info, exhaust)
98+
use_cmd = operation.use_command(sock_info, exhaust)
99+
more_to_come = (operation.exhaust_mgr
100+
and operation.exhaust_mgr.more_to_come)
101+
if more_to_come:
102+
request_id = 0
103+
else:
102104
message = operation.get_message(
103105
set_slave_okay, sock_info, use_cmd)
104106
request_id, data, max_doc_size = self._split_message(message)
105-
else:
106-
use_cmd = False
107-
request_id = 0
108107

109108
if publish:
110109
cmd, dbn = operation.as_command(sock_info)
@@ -113,11 +112,11 @@ def run_operation_with_response(
113112
start = datetime.now()
114113

115114
try:
116-
if send_message:
115+
if more_to_come:
116+
reply = sock_info.receive_message(None)
117+
else:
117118
sock_info.send_message(data, max_doc_size)
118119
reply = sock_info.receive_message(request_id)
119-
else:
120-
reply = sock_info.receive_message(None)
121120

122121
# Unpack and check for command errors.
123122
if use_cmd:
@@ -176,6 +175,13 @@ def run_operation_with_response(
176175
decrypted, operation.codec_options, user_fields)
177176

178177
if exhaust:
178+
if isinstance(reply, _OpMsg):
179+
# In OP_MSG, the server keeps sending only if the
180+
# more_to_come flag is set.
181+
more_to_come = reply.more_to_come
182+
else:
183+
# In OP_REPLY, the server keeps sending until cursor_id is 0.
184+
more_to_come = bool(reply.cursor_id)
179185
response = ExhaustResponse(
180186
data=reply,
181187
address=self._description.address,
@@ -184,7 +190,8 @@ def run_operation_with_response(
184190
duration=duration,
185191
request_id=request_id,
186192
from_command=use_cmd,
187-
docs=docs)
193+
docs=docs,
194+
more_to_come=more_to_come)
188195
else:
189196
response = Response(
190197
data=reply,

test/test_monitoring.py

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ def test_not_master_error(self):
458458
@client_context.require_no_mongos
459459
def test_exhaust(self):
460460
self.client.pymongo_test.test.drop()
461-
self.client.pymongo_test.test.insert_many([{} for _ in range(10)])
461+
self.client.pymongo_test.test.insert_many([{} for _ in range(11)])
462462
self.listener.results.clear()
463463
cursor = self.client.pymongo_test.test.find(
464464
projection={'_id': False},
@@ -472,12 +472,10 @@ def test_exhaust(self):
472472
self.assertEqual(0, len(results['failed']))
473473
self.assertTrue(
474474
isinstance(started, monitoring.CommandStartedEvent))
475-
self.assertEqualCommand(
476-
SON([('find', 'test'),
477-
('filter', {}),
478-
('projection', {'_id': False}),
479-
('batchSize', 5)]),
480-
started.command)
475+
self.assertEqualCommand(SON([('find', 'test'),
476+
('filter', {}),
477+
('projection', {'_id': False}),
478+
('batchSize', 5)]), started.command)
481479
self.assertEqual('find', started.command_name)
482480
self.assertEqual(cursor.address, started.connection_id)
483481
self.assertEqual('pymongo_test', started.database_name)
@@ -498,32 +496,25 @@ def test_exhaust(self):
498496
self.listener.results.clear()
499497
tuple(cursor)
500498
results = self.listener.results
501-
started = results['started'][0]
502-
succeeded = results['succeeded'][0]
503499
self.assertEqual(0, len(results['failed']))
504-
self.assertTrue(
505-
isinstance(started, monitoring.CommandStartedEvent))
506-
self.assertEqualCommand(
507-
SON([('getMore', cursor_id),
508-
('collection', 'test'),
509-
('batchSize', 5)]),
510-
started.command)
511-
self.assertEqual('getMore', started.command_name)
512-
self.assertEqual(cursor.address, started.connection_id)
513-
self.assertEqual('pymongo_test', started.database_name)
514-
self.assertTrue(isinstance(started.request_id, int))
515-
self.assertTrue(
516-
isinstance(succeeded, monitoring.CommandSucceededEvent))
517-
self.assertTrue(isinstance(succeeded.duration_micros, int))
518-
self.assertEqual('getMore', succeeded.command_name)
519-
self.assertTrue(isinstance(succeeded.request_id, int))
520-
self.assertEqual(cursor.address, succeeded.connection_id)
521-
expected_result = {
522-
'cursor': {'id': 0,
523-
'ns': 'pymongo_test.test',
524-
'nextBatch': [{} for _ in range(5)]},
525-
'ok': 1}
526-
self.assertEqualReply(expected_result, succeeded.reply)
500+
for event in results['started']:
501+
self.assertTrue(isinstance(event, monitoring.CommandStartedEvent))
502+
self.assertEqualCommand(SON([('getMore', cursor_id),
503+
('collection', 'test'),
504+
('batchSize', 5)]), event.command)
505+
self.assertEqual('getMore', event.command_name)
506+
self.assertEqual(cursor.address, event.connection_id)
507+
self.assertEqual('pymongo_test', event.database_name)
508+
self.assertTrue(isinstance(event.request_id, int))
509+
for event in results['succeeded']:
510+
self.assertTrue(
511+
isinstance(event, monitoring.CommandSucceededEvent))
512+
self.assertTrue(isinstance(event.duration_micros, int))
513+
self.assertEqual('getMore', event.command_name)
514+
self.assertTrue(isinstance(event.request_id, int))
515+
self.assertEqual(cursor.address, event.connection_id)
516+
# Last getMore receives a response with cursor id 0.
517+
self.assertEqual(0, results['succeeded'][-1].reply['cursor']['id'])
527518

528519
def test_kill_cursors(self):
529520
with client_knobs(kill_cursor_frequency=0.01):

0 commit comments

Comments
 (0)