Skip to content

Commit 7f4c504

Browse files
committed
PYTHON-1674 Refactor Cursor/CommandCursor.__send_message
Move exhaust getMore out of Cursor. Move cursor command response decoding into Server so that all command listener events can be published in the same location.
1 parent 05a3825 commit 7f4c504

File tree

7 files changed

+199
-241
lines changed

7 files changed

+199
-241
lines changed

pymongo/command_cursor.py

Lines changed: 19 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from bson.py3compat import integer_types
2222
from pymongo import helpers
23-
from pymongo.errors import (AutoReconnect,
23+
from pymongo.errors import (ConnectionFailure,
2424
InvalidOperation,
2525
NotMasterError,
2626
OperationFailure)
@@ -127,87 +127,42 @@ def kill():
127127
self.__end_session(True)
128128

129129
client = self.__collection.database.client
130-
listeners = client._event_listeners
131-
publish = listeners.enabled_for_commands
132-
start = datetime.datetime.now()
133-
134-
def duration(): return datetime.datetime.now() - start
135-
136130
try:
137131
response = client._send_message_with_response(
138-
operation, address=self.__address)
139-
except AutoReconnect:
132+
operation, address=self.__address,
133+
unpack_res=self._unpack_response)
134+
except OperationFailure:
135+
kill()
136+
raise
137+
except NotMasterError:
138+
# Don't send kill cursors to another server after a "not master"
139+
# error. It's completely pointless.
140+
kill()
141+
raise
142+
except ConnectionFailure:
140143
# Don't try to send kill cursors on another socket
141144
# or to another server. It can cause a _pinValue
142145
# assertion on some server releases if we get here
143146
# due to a socket timeout.
144147
kill()
145148
raise
149+
except Exception:
150+
# Close the cursor
151+
self.__die()
152+
raise
146153

147-
rqst_id = response.request_id
148154
from_command = response.from_command
149155
reply = response.data
150-
151-
try:
152-
with client._reset_on_error(self.__address, self.__session):
153-
user_fields = None
154-
legacy_response = True
155-
if from_command:
156-
user_fields = {'cursor': {'nextBatch': 1}}
157-
legacy_response = False
158-
docs = self._unpack_response(
159-
reply, self.__id, self.__collection.codec_options,
160-
legacy_response=legacy_response, user_fields=user_fields)
161-
if from_command:
162-
first = docs[0]
163-
client._process_response(first, self.__session)
164-
helpers._check_command_response(first)
165-
except OperationFailure as exc:
166-
kill()
167-
168-
if publish:
169-
listeners.publish_command_failure(
170-
duration(), exc.details, "getMore", rqst_id, self.__address)
171-
172-
raise
173-
except NotMasterError as exc:
174-
# Don't send kill cursors to another server after a "not master"
175-
# error. It's completely pointless.
176-
kill()
177-
178-
if publish:
179-
listeners.publish_command_failure(
180-
duration(), exc.details, "getMore", rqst_id, self.__address)
181-
182-
raise
183-
except Exception as exc:
184-
if publish:
185-
listeners.publish_command_failure(
186-
duration(), _convert_exception(exc), "getMore", rqst_id,
187-
self.__address)
188-
raise
156+
docs = response.docs
189157

190158
if from_command:
191159
cursor = docs[0]['cursor']
192160
documents = cursor['nextBatch']
193161
self.__id = cursor['id']
194-
if publish:
195-
listeners.publish_command_success(
196-
duration(), docs[0], "getMore", rqst_id,
197-
self.__address)
198162
else:
199163
documents = docs
200164
self.__id = reply.cursor_id
201165

202-
if publish:
203-
# Must publish in getMore command response format.
204-
res = {"cursor": {"id": self.__id,
205-
"ns": self.__collection.full_name,
206-
"nextBatch": documents},
207-
"ok": 1}
208-
listeners.publish_command_success(
209-
duration(), res, "getMore", rqst_id, self.__address)
210-
211166
if self.__id == 0:
212167
kill()
213168
self.__data = deque(documents)
@@ -239,7 +194,8 @@ def _refresh(self):
239194
read_pref,
240195
self.__session,
241196
self.__collection.database.client,
242-
self.__max_await_time_ms))
197+
self.__max_await_time_ms,
198+
False))
243199
else: # Cursor id is zero nothing else to return
244200
self.__killed = True
245201
self.__end_session(True)

pymongo/cursor.py

Lines changed: 39 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
"""Cursor class to iterate over Mongo query results."""
1616

1717
import copy
18-
import datetime
1918
import warnings
2019

2120
from collections import deque
@@ -29,13 +28,11 @@
2928
from pymongo import helpers
3029
from pymongo.common import validate_boolean, validate_is_mapping
3130
from pymongo.collation import validate_collation_or_none
32-
from pymongo.errors import (AutoReconnect,
33-
ConnectionFailure,
31+
from pymongo.errors import (ConnectionFailure,
3432
InvalidOperation,
3533
NotMasterError,
3634
OperationFailure)
37-
from pymongo.message import (_convert_exception,
38-
_CursorAddress,
35+
from pymongo.message import (_CursorAddress,
3936
_GetMore,
4037
_RawBatchGetMore,
4138
_Query,
@@ -50,7 +47,6 @@
5047
"await_data": 32,
5148
"exhaust": 64,
5249
"partial": 128}
53-
_CURSOR_DOC_FIELDS = {'cursor': {'firstBatch': 1, 'nextBatch': 1}}
5450

5551

5652
class CursorType(object):
@@ -941,129 +937,54 @@ def __send_message(self, operation):
941937
Can raise ConnectionFailure.
942938
"""
943939
client = self.__collection.database.client
944-
listeners = client._event_listeners
945-
publish = listeners.enabled_for_commands
946-
from_command = False
947-
start = datetime.datetime.now()
948-
949-
def duration(): return datetime.datetime.now() - start
950-
951-
if operation:
952-
try:
953-
response = client._send_message_with_response(
954-
operation, exhaust=self.__exhaust, address=self.__address)
955-
self.__address = response.address
956-
if self.__exhaust:
957-
# 'response' is an ExhaustResponse.
958-
self.__exhaust_mgr = _SocketManager(response.socket_info,
959-
response.pool)
960-
961-
cmd_name = operation.name
962-
reply = response.data
963-
rqst_id = response.request_id
964-
from_command = response.from_command
965-
except AutoReconnect:
966-
# Don't try to send kill cursors on another socket
967-
# or to another server. It can cause a _pinValue
968-
# assertion on some server releases if we get here
969-
# due to a socket timeout.
970-
self.__killed = True
971-
self.__die()
972-
raise
973-
else:
974-
# Exhaust cursor - no getMore message.
975-
rqst_id = 0
976-
cmd_name = 'getMore'
977-
if publish:
978-
# Fake a getMore command.
979-
cmd = SON([('getMore', self.__id),
980-
('collection', self.__collection.name)])
981-
if self.__batch_size:
982-
cmd['batchSize'] = self.__batch_size
983-
if self.__max_time_ms:
984-
cmd['maxTimeMS'] = self.__max_time_ms
985-
listeners.publish_command_start(
986-
cmd, self.__collection.database.name, 0, self.__address)
987-
try:
988-
reply = self.__exhaust_mgr.sock.receive_message(None)
989-
except Exception as exc:
990-
if publish:
991-
listeners.publish_command_failure(
992-
duration(), _convert_exception(exc), cmd_name, rqst_id,
993-
self.__address)
994-
if isinstance(exc, ConnectionFailure):
995-
self.__die()
996-
raise
997-
998940
try:
999-
with client._reset_on_error(self.__address, self.__session):
1000-
user_fields = None
1001-
legacy_response = True
1002-
if from_command:
1003-
user_fields = _CURSOR_DOC_FIELDS
1004-
legacy_response = False
1005-
docs = self._unpack_response(
1006-
reply, self.__id, self.__collection.codec_options,
1007-
legacy_response=legacy_response, user_fields=user_fields)
1008-
if from_command:
1009-
first = docs[0]
1010-
client._process_response(first, self.__session)
1011-
helpers._check_command_response(first)
1012-
except OperationFailure as exc:
941+
response = client._send_message_with_response(
942+
operation, exhaust=self.__exhaust, address=self.__address,
943+
unpack_res=self._unpack_response)
944+
except OperationFailure:
1013945
self.__killed = True
1014946

1015947
# Make sure exhaust socket is returned immediately, if necessary.
1016948
self.__die()
1017949

1018-
if publish:
1019-
listeners.publish_command_failure(
1020-
duration(), exc.details, cmd_name, rqst_id, self.__address)
1021-
1022950
# If this is a tailable cursor the error is likely
1023951
# due to capped collection roll over. Setting
1024952
# self.__killed to True ensures Cursor.alive will be
1025953
# False. No need to re-raise.
1026954
if self.__query_flags & _QUERY_OPTIONS["tailable_cursor"]:
1027955
return
1028956
raise
1029-
except NotMasterError as exc:
957+
except NotMasterError:
1030958
# Don't send kill cursors to another server after a "not master"
1031959
# error. It's completely pointless.
1032960
self.__killed = True
1033961

1034962
# Make sure exhaust socket is returned immediately, if necessary.
1035963
self.__die()
1036964

1037-
if publish:
1038-
listeners.publish_command_failure(
1039-
duration(), exc.details, cmd_name, rqst_id, self.__address)
1040-
1041965
raise
1042-
except Exception as exc:
1043-
if publish:
1044-
listeners.publish_command_failure(
1045-
duration(), _convert_exception(exc), cmd_name, rqst_id,
1046-
self.__address)
966+
except ConnectionFailure:
967+
# Don't try to send kill cursors on another socket
968+
# or to another server. It can cause a _pinValue
969+
# assertion on some server releases if we get here
970+
# due to a socket timeout.
971+
self.__killed = True
972+
self.__die()
973+
raise
974+
except Exception:
975+
# Close the cursor
976+
self.__die()
1047977
raise
1048978

1049-
if publish:
1050-
# Must publish in find / getMore / explain command response format.
1051-
if from_command:
1052-
res = docs[0]
1053-
elif cmd_name == "explain":
1054-
res = docs[0] if docs else {}
1055-
else:
1056-
res = {"cursor": {"id": reply.cursor_id,
1057-
"ns": self.__collection.full_name},
1058-
"ok": 1}
1059-
if cmd_name == "find":
1060-
res["cursor"]["firstBatch"] = docs
1061-
else:
1062-
res["cursor"]["nextBatch"] = docs
1063-
listeners.publish_command_success(
1064-
duration(), res, cmd_name, rqst_id, self.__address)
979+
self.__address = response.address
980+
if self.__exhaust and not self.__exhaust_mgr:
981+
# 'response' is an ExhaustResponse.
982+
self.__exhaust_mgr = _SocketManager(response.socket_info,
983+
response.pool)
1065984

1066-
if from_command:
985+
cmd_name = operation.name
986+
docs = response.docs
987+
if response.from_command:
1067988
if cmd_name != "explain":
1068989
cursor = docs[0]['cursor']
1069990
self.__id = cursor['id']
@@ -1078,9 +999,9 @@ def duration(): return datetime.datetime.now() - start
1078999
self.__data = deque(docs)
10791000
self.__retrieved += len(docs)
10801001
else:
1081-
self.__id = reply.cursor_id
1002+
self.__id = response.data.cursor_id
10821003
self.__data = deque(docs)
1083-
self.__retrieved += reply.number_returned
1004+
self.__retrieved += response.data.number_returned
10841005

10851006
if self.__id == 0:
10861007
self.__killed = True
@@ -1147,19 +1068,17 @@ def _refresh(self):
11471068
limit = self.__batch_size
11481069

11491070
# Exhaust cursors don't send getMore messages.
1150-
if self.__exhaust:
1151-
self.__send_message(None)
1152-
else:
1153-
g = self._getmore_class(self.__collection.database.name,
1154-
self.__collection.name,
1155-
limit,
1156-
self.__id,
1157-
self.__codec_options,
1158-
self._read_preference(),
1159-
self.__session,
1160-
self.__collection.database.client,
1161-
self.__max_await_time_ms)
1162-
self.__send_message(g)
1071+
g = self._getmore_class(self.__collection.database.name,
1072+
self.__collection.name,
1073+
limit,
1074+
self.__id,
1075+
self.__codec_options,
1076+
self._read_preference(),
1077+
self.__session,
1078+
self.__collection.database.client,
1079+
self.__max_await_time_ms,
1080+
self.__exhaust_mgr)
1081+
self.__send_message(g)
11631082

11641083
return len(self.__data)
11651084

0 commit comments

Comments
 (0)