Skip to content

Commit 7491162

Browse files
committed
PYTHON-1783 Decode user-facing documents but not internal driver-server
communications.
1 parent 007aa6b commit 7491162

File tree

13 files changed

+484
-53
lines changed

13 files changed

+484
-53
lines changed

bson/__init__.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ def _get_array(data, position, obj_end, opts, element_name):
215215
end -= 1
216216
result = []
217217

218-
# Avoid doing global and attibute lookups in the loop.
218+
# Avoid doing global and attribute lookups in the loop.
219219
append = result.append
220220
index = data.index
221221
getter = _ELEMENT_GETTER
@@ -940,6 +940,59 @@ def decode_all(data, codec_options=DEFAULT_CODEC_OPTIONS):
940940
decode_all = _cbson.decode_all
941941

942942

943+
def _decode_selective(rawdoc, fields, codec_options):
944+
doc = codec_options.document_class()
945+
for key, value in iteritems(rawdoc):
946+
if key in fields:
947+
if fields[key] == list:
948+
doc[key] = [_bson_to_dict(r.raw, codec_options) for r in value]
949+
elif fields[key] == dict:
950+
doc[key] = _bson_to_dict(value.raw, codec_options)
951+
else:
952+
doc[key] = _decode_selective(value, fields[key], codec_options)
953+
continue
954+
doc[key] = value
955+
return doc
956+
957+
958+
def _decode_all_selective(data, codec_options, fields):
959+
"""Decode BSON data to a single document while using user-provided
960+
custom decoding logic.
961+
962+
`data` must be a string representing a valid, BSON-encoded document.
963+
964+
:Parameters:
965+
- `data`: BSON data
966+
- `codec_options`: An instance of
967+
:class:`~bson.codec_options.CodecOptions` with user-specified type
968+
decoders. If no decoders are found, this method is the same as
969+
``decode_all``.
970+
- `fields`: Map of document namespaces where data that needs
971+
to be custom decoded lives or None. For example, to custom decode a
972+
list of objects in 'field1.subfield1', the specified value should be
973+
``{'field1': {'subfield1': list}}``. Use ``dict`` instead of ``list``
974+
if the field contains a single object to custom decode. If ``fields``
975+
is an empty map or None, this method is the same as ``decode_all``.
976+
977+
:Returns:
978+
- `document_list`: Single-member list containing the decoded document.
979+
980+
.. versionadded:: 3.8
981+
"""
982+
if not codec_options.type_registry._decoder_map:
983+
return decode_all(data, codec_options)
984+
985+
if not fields:
986+
return decode_all(data, codec_options.with_options(type_registry=None))
987+
988+
# Decode documents for internal use.
989+
from bson.raw_bson import RawBSONDocument
990+
internal_codec_options = codec_options.with_options(
991+
document_class=RawBSONDocument, type_registry=None)
992+
_doc = _bson_to_dict(data, internal_codec_options)
993+
return [_decode_selective(_doc, fields, codec_options,)]
994+
995+
943996
def decode_iter(data, codec_options=DEFAULT_CODEC_OPTIONS):
944997
"""Decode BSON data to multiple documents as a generator.
945998

gridfs/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
from gridfs.grid_file import (GridIn,
2626
GridOut,
2727
GridOutCursor,
28-
DEFAULT_CHUNK_SIZE)
28+
DEFAULT_CHUNK_SIZE,
29+
_clear_entity_type_registry)
2930
from pymongo import (ASCENDING,
3031
DESCENDING)
3132
from pymongo.common import UNAUTHORIZED_CODES, validate_string
@@ -61,6 +62,8 @@ def __init__(self, database, collection="fs", disable_md5=False):
6162
if not isinstance(database, Database):
6263
raise TypeError("database must be an instance of Database")
6364

65+
database = _clear_entity_type_registry(database)
66+
6467
if not database.write_concern.acknowledged:
6568
raise ConfigurationError('database must use '
6669
'acknowledged write_concern')
@@ -443,6 +446,8 @@ def __init__(self, db, bucket_name="fs",
443446
if not isinstance(db, Database):
444447
raise TypeError("database must be an instance of Database")
445448

449+
db = _clear_entity_type_registry(db)
450+
446451
wtc = write_concern if write_concern is not None else db.write_concern
447452
if not wtc.acknowledged:
448453
raise ConfigurationError('write concern must be acknowledged')

gridfs/grid_file.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ def getter(self):
9898
return property(getter, doc=docstring)
9999

100100

101+
def _clear_entity_type_registry(entity, **kwargs):
102+
"""Clear the given database/collection object's type registry."""
103+
codecopts = entity.codec_options.with_options(type_registry=None)
104+
return entity.with_options(codec_options=codecopts, **kwargs)
105+
106+
101107
class GridIn(object):
102108
"""Class to write data to GridFS.
103109
"""
@@ -168,8 +174,8 @@ def __init__(
168174
if "chunk_size" in kwargs:
169175
kwargs["chunkSize"] = kwargs.pop("chunk_size")
170176

171-
coll = root_collection.with_options(
172-
read_preference=ReadPreference.PRIMARY)
177+
coll = _clear_entity_type_registry(
178+
root_collection, read_preference=ReadPreference.PRIMARY)
173179

174180
if not disable_md5:
175181
kwargs["md5"] = hashlib.md5()
@@ -449,6 +455,8 @@ def __init__(self, root_collection, file_id=None, file_document=None,
449455
raise TypeError("root_collection must be an "
450456
"instance of Collection")
451457

458+
root_collection = _clear_entity_type_registry(root_collection)
459+
452460
self.__chunks = root_collection.chunks
453461
self.__files = root_collection.files
454462
self.__file_id = file_id
@@ -800,6 +808,8 @@ def __init__(self, collection, filter=None, skip=0, limit=0,
800808
801809
.. mongodoc:: cursors
802810
"""
811+
collection = _clear_entity_type_registry(collection)
812+
803813
# Hold on to the base "fs" collection to create GridOut objects later.
804814
self.__root_collection = collection
805815

pymongo/collection.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
_NO_OBJ_ERROR = "No matching object found"
5555
_UJOIN = u"%s.%s"
56+
_FIND_AND_MODIFY_DOC_FIELDS = {'value': dict}
5657

5758

5859
class ReturnDocument(object):
@@ -203,7 +204,8 @@ def _command(self, sock_info, command, slave_ok=False,
203204
write_concern=None,
204205
collation=None,
205206
session=None,
206-
retryable_write=False):
207+
retryable_write=False,
208+
user_fields=None):
207209
"""Internal command helper.
208210
209211
:Parameters:
@@ -242,7 +244,8 @@ def _command(self, sock_info, command, slave_ok=False,
242244
collation=collation,
243245
session=s,
244246
client=self.__database.client,
245-
retryable_write=retryable_write)
247+
retryable_write=retryable_write,
248+
user_fields=user_fields)
246249

247250
def __create(self, options, collation, session):
248251
"""Sends a create command with the given options.
@@ -315,9 +318,8 @@ def database(self):
315318
"""
316319
return self.__database
317320

318-
def with_options(
319-
self, codec_options=None, read_preference=None,
320-
write_concern=None, read_concern=None):
321+
def with_options(self, codec_options=None, read_preference=None,
322+
write_concern=None, read_concern=None):
321323
"""Get a clone of this collection changing the specified settings.
322324
323325
>>> coll1.read_preference
@@ -2310,7 +2312,8 @@ def _aggregate(self, pipeline, cursor_class, first_batch_size, session,
23102312
write_concern=write_concern,
23112313
collation=collation,
23122314
session=session,
2313-
client=self.__database.client)
2315+
client=self.__database.client,
2316+
user_fields={'cursor': {'firstBatch': list}})
23142317

23152318
if "cursor" in result:
23162319
cursor = result["cursor"]
@@ -2571,7 +2574,8 @@ def group(self, key, condition, initial, reduce, finalize=None, **kwargs):
25712574

25722575
with self._socket_for_reads(session=None) as (sock_info, slave_ok):
25732576
return self._command(sock_info, cmd, slave_ok,
2574-
collation=collation)["retval"]
2577+
collation=collation,
2578+
user_fields={'retval': list})["retval"]
25752579

25762580
def rename(self, new_name, session=None, **kwargs):
25772581
"""Rename this collection.
@@ -2675,7 +2679,8 @@ def distinct(self, key, filter=None, session=None, **kwargs):
26752679
with self._socket_for_reads(session) as (sock_info, slave_ok):
26762680
return self._command(sock_info, cmd, slave_ok,
26772681
read_concern=self.read_concern,
2678-
collation=collation, session=session)["values"]
2682+
collation=collation,
2683+
session=session)["values"]
26792684

26802685
def map_reduce(self, map, reduce, out, full_response=False, session=None,
26812686
**kwargs):
@@ -2755,12 +2760,17 @@ def map_reduce(self, map, reduce, out, full_response=False, session=None,
27552760
write_concern = self._write_concern_for(session)
27562761
else:
27572762
write_concern = None
2763+
if inline:
2764+
user_fields = {'results': list}
2765+
else:
2766+
user_fields = None
27582767

27592768
response = self._command(
27602769
sock_info, cmd, slave_ok, read_pref,
27612770
read_concern=read_concern,
27622771
write_concern=write_concern,
2763-
collation=collation, session=session)
2772+
collation=collation, session=session,
2773+
user_fields=user_fields)
27642774

27652775
if full_response or not response.get('result'):
27662776
return response
@@ -2810,16 +2820,19 @@ def inline_map_reduce(self, map, reduce, full_response=False, session=None,
28102820
("map", map),
28112821
("reduce", reduce),
28122822
("out", {"inline": 1})])
2823+
user_fields = {'results': list}
28132824
collation = validate_collation_or_none(kwargs.pop('collation', None))
28142825
cmd.update(kwargs)
28152826
with self._socket_for_reads(session) as (sock_info, slave_ok):
28162827
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
28172828
res = self._command(sock_info, cmd, slave_ok,
28182829
read_concern=self.read_concern,
2819-
collation=collation, session=session)
2830+
collation=collation, session=session,
2831+
user_fields=user_fields)
28202832
else:
28212833
res = self._command(sock_info, cmd, slave_ok,
2822-
collation=collation, session=session)
2834+
collation=collation, session=session,
2835+
user_fields=user_fields)
28232836

28242837
if full_response:
28252838
return res
@@ -2837,6 +2850,7 @@ def __find_and_modify(self, filter, projection, sort, upsert=None,
28372850
return_document=ReturnDocument.BEFORE,
28382851
array_filters=None, session=None, **kwargs):
28392852
"""Internal findAndModify helper."""
2853+
28402854
common.validate_is_mapping("filter", filter)
28412855
if not isinstance(return_document, bool):
28422856
raise ValueError("return_document must be "
@@ -2876,8 +2890,10 @@ def _find_and_modify(session, sock_info, retryable_write):
28762890
write_concern=write_concern,
28772891
allowable_errors=[_NO_OBJ_ERROR],
28782892
collation=collation, session=session,
2879-
retryable_write=retryable_write)
2893+
retryable_write=retryable_write,
2894+
user_fields=_FIND_AND_MODIFY_DOC_FIELDS)
28802895
_check_write_command_response(out)
2896+
28812897
return out.get("value")
28822898

28832899
return self.__database.client._retryable_write(
@@ -3293,7 +3309,8 @@ def _find_and_modify(session, sock_info, retryable_write):
32933309
result = self._command(
32943310
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
32953311
allowable_errors=[_NO_OBJ_ERROR], collation=collation,
3296-
session=session, retryable_write=retryable_write)
3312+
session=session, retryable_write=retryable_write,
3313+
user_fields=_FIND_AND_MODIFY_DOC_FIELDS)
32973314

32983315
_check_write_command_response(result)
32993316
return result

pymongo/command_cursor.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,18 @@ def duration(): return datetime.datetime.now() - start
150150

151151
try:
152152
with client._reset_on_error(self.__address, self.__session):
153-
docs = self._unpack_response(reply,
154-
self.__id,
155-
self.__collection.codec_options)
153+
user_fields = None
154+
legacy_response = True
155+
if from_command:
156+
user_fields = {'cursor': {'nextBatch': list}}
157+
legacy_response = False
158+
docs = self._unpack_response(
159+
reply, self.__id, self.__collection.codec_options,
160+
legacy_response=legacy_response, user_fields=user_fields)
156161
if from_command:
157162
first = docs[0]
158163
client._process_response(first, self.__session)
159164
helpers._check_command_response(first)
160-
161165
except OperationFailure as exc:
162166
kill()
163167

@@ -208,8 +212,10 @@ def duration(): return datetime.datetime.now() - start
208212
kill()
209213
self.__data = deque(documents)
210214

211-
def _unpack_response(self, response, cursor_id, codec_options):
212-
return response.unpack_response(cursor_id, codec_options)
215+
def _unpack_response(self, response, cursor_id, codec_options,
216+
user_fields=None, legacy_response=False):
217+
return response.unpack_response(cursor_id, codec_options, user_fields,
218+
legacy_response)
213219

214220
def _refresh(self):
215221
"""Refreshes the cursor with more data from the server.
@@ -330,7 +336,8 @@ def __init__(self, collection, cursor_info, address, retrieved=0,
330336
collection, cursor_info, address, retrieved, batch_size,
331337
max_await_time_ms, session, explicit_session)
332338

333-
def _unpack_response(self, response, cursor_id, codec_options):
339+
def _unpack_response(self, response, cursor_id, codec_options,
340+
user_fields=None, legacy_response=False):
334341
return response.raw_response(cursor_id)
335342

336343
def __getitem__(self, index):

pymongo/cursor.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
_RawBatchGetMore,
4141
_Query,
4242
_RawBatchQuery)
43-
from pymongo.read_preferences import ReadPreference
43+
4444

4545
_QUERY_OPTIONS = {
4646
"tailable_cursor": 2,
@@ -50,6 +50,7 @@
5050
"await_data": 32,
5151
"exhaust": 64,
5252
"partial": 128}
53+
_CURSOR_DOC_FIELDS = {'cursor': {'firstBatch': list, 'nextBatch': list}}
5354

5455

5556
class CursorType(object):
@@ -996,9 +997,14 @@ def duration(): return datetime.datetime.now() - start
996997

997998
try:
998999
with client._reset_on_error(self.__address, self.__session):
999-
docs = self._unpack_response(reply,
1000-
self.__id,
1001-
self.__collection.codec_options)
1000+
user_fields = None
1001+
legacy_response = True
1002+
if from_command:
1003+
user_fields = _CURSOR_DOC_FIELDS
1004+
legacy_response = False
1005+
docs = self._unpack_response(
1006+
reply, self.__id, self.__collection.codec_options,
1007+
legacy_response=legacy_response, user_fields=user_fields)
10021008
if from_command:
10031009
first = docs[0]
10041010
client._process_response(first, self.__session)
@@ -1085,8 +1091,10 @@ def duration(): return datetime.datetime.now() - start
10851091
if self.__limit and self.__id and self.__limit <= self.__retrieved:
10861092
self.__die()
10871093

1088-
def _unpack_response(self, response, cursor_id, codec_options):
1089-
return response.unpack_response(cursor_id, codec_options)
1094+
def _unpack_response(self, response, cursor_id, codec_options,
1095+
user_fields=None, legacy_response=False):
1096+
return response.unpack_response(cursor_id, codec_options, user_fields,
1097+
legacy_response)
10901098

10911099
def _read_preference(self):
10921100
if self.__read_preference is None:
@@ -1303,7 +1311,8 @@ def __init__(self, *args, **kwargs):
13031311
raise InvalidOperation(
13041312
"Cannot use RawBatchCursor with manipulate=True")
13051313

1306-
def _unpack_response(self, response, cursor_id, codec_options):
1314+
def _unpack_response(self, response, cursor_id, codec_options,
1315+
user_fields=None, legacy_response=False):
13071316
return response.raw_response(cursor_id)
13081317

13091318
def explain(self):

0 commit comments

Comments
 (0)