Skip to content

Commit a3556c4

Browse files
committed
PYTHON-2012 Update FLE to support commands larger than 6MiB
Bulk write command are batched at 2MiB when auto encryption is enabled.
1 parent c65367b commit a3556c4

File tree

4 files changed

+80
-52
lines changed

4 files changed

+80
-52
lines changed

pymongo/_cmessagemodule.c

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1441,13 +1441,15 @@ _batched_write_command(
14411441
long max_bson_size;
14421442
long max_cmd_size;
14431443
long max_write_batch_size;
1444+
long max_split_size;
14441445
int idx = 0;
14451446
int cmd_len_loc;
14461447
int lst_len_loc;
14471448
int position;
14481449
int length;
14491450
PyObject* max_bson_size_obj = NULL;
14501451
PyObject* max_write_batch_size_obj = NULL;
1452+
PyObject* max_split_size_obj = NULL;
14511453
PyObject* doc = NULL;
14521454
PyObject* iterator = NULL;
14531455

@@ -1478,6 +1480,20 @@ _batched_write_command(
14781480
return 0;
14791481
}
14801482

1483+
// max_split_size is the size at which to perform a batch split.
1484+
// Normally this this value is equal to max_bson_size (16MiB). However,
1485+
// when auto encryption is enabled max_split_size is reduced to 2MiB.
1486+
max_split_size_obj = PyObject_GetAttrString(ctx, "max_split_size");
1487+
#if PY_MAJOR_VERSION >= 3
1488+
max_split_size = PyLong_AsLong(max_split_size_obj);
1489+
#else
1490+
max_split_size = PyInt_AsLong(max_split_size_obj);
1491+
#endif
1492+
Py_XDECREF(max_split_size_obj);
1493+
if (max_split_size == -1) {
1494+
return 0;
1495+
}
1496+
14811497
if (!buffer_write_bytes(buffer,
14821498
"\x00\x00\x00\x00", /* flags */
14831499
4) ||
@@ -1570,7 +1586,6 @@ _batched_write_command(
15701586
* max_cmd_size accounts for the two trailing null bytes.
15711587
*/
15721588
cur_size = buffer_get_position(buffer) - cur_doc_begin;
1573-
enough_data = (buffer_get_position(buffer) > max_cmd_size);
15741589
/* This single document is too large for the command. */
15751590
if (cur_size > max_cmd_size) {
15761591
if (op == _INSERT) {
@@ -1591,6 +1606,8 @@ _batched_write_command(
15911606
}
15921607
goto fail;
15931608
}
1609+
enough_data = (idx >= 1 &&
1610+
(buffer_get_position(buffer) > max_split_size));
15941611
if (enough_data) {
15951612
/*
15961613
* Roll the existing buffer back to the beginning

pymongo/encryption.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@
5050
EncryptionError,
5151
InvalidOperation,
5252
ServerSelectionTimeoutError)
53-
from pymongo.message import (_COMMAND_OVERHEAD,
54-
_MAX_ENC_BSON_SIZE,
55-
_raise_document_too_large)
5653
from pymongo.mongo_client import MongoClient
5754
from pymongo.pool import _configured_socket, PoolOptions
5855
from pymongo.read_concern import ReadConcern
@@ -277,10 +274,6 @@ def encrypt(self, database, cmd, check_keys, codec_options):
277274
# check_keys.
278275
cluster_time = check_keys and cmd.pop('$clusterTime', None)
279276
encoded_cmd = _dict_to_bson(cmd, check_keys, codec_options)
280-
max_cmd_size = _MAX_ENC_BSON_SIZE + _COMMAND_OVERHEAD
281-
if len(encoded_cmd) > max_cmd_size:
282-
raise _raise_document_too_large(
283-
next(iter(cmd)), len(encoded_cmd), max_cmd_size)
284277
with _wrap_encryption_errors():
285278
encrypted_cmd = self._auto_encrypter.encrypt(database, encoded_cmd)
286279
# TODO: PYTHON-1922 avoid decoding the encrypted_cmd.

pymongo/message.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -929,6 +929,11 @@ def max_write_batch_size(self):
929929
"""A proxy for SockInfo.max_write_batch_size."""
930930
return self.sock_info.max_write_batch_size
931931

932+
@property
933+
def max_split_size(self):
934+
"""The maximum size of a BSON command before batch splitting."""
935+
return self.max_bson_size
936+
932937
def legacy_bulk_insert(
933938
self, request_id, msg, max_doc_size, acknowledged, docs, compress):
934939
if compress:
@@ -1011,10 +1016,11 @@ def _fail(self, request_id, failure, duration):
10111016
request_id, self.sock_info.address, self.op_id)
10121017

10131018

1014-
# 2MiB
1015-
_MAX_ENC_BSON_SIZE = 2 * (1024 * 1024)
1016-
# 6MB
1017-
_MAX_ENC_MESSAGE_SIZE = 6 * (1000 * 1000)
1019+
# From the Client Side Encryption spec:
1020+
# Because automatic encryption increases the size of commands, the driver
1021+
# MUST split bulk writes at a reduced size limit before undergoing automatic
1022+
# encryption. The write payload MUST be split at 2MiB (2097152).
1023+
_MAX_SPLIT_SIZE_ENC = 2097152
10181024

10191025

10201026
class _EncryptedBulkWriteContext(_BulkWriteContext):
@@ -1049,14 +1055,9 @@ def execute_unack(self, docs, client):
10491055
return to_send
10501056

10511057
@property
1052-
def max_bson_size(self):
1053-
"""A proxy for SockInfo.max_bson_size."""
1054-
return min(self.sock_info.max_bson_size, _MAX_ENC_BSON_SIZE)
1055-
1056-
@property
1057-
def max_message_size(self):
1058-
"""A proxy for SockInfo.max_message_size."""
1059-
return min(self.sock_info.max_message_size, _MAX_ENC_MESSAGE_SIZE)
1058+
def max_split_size(self):
1059+
"""Reduce the batch splitting size."""
1060+
return _MAX_SPLIT_SIZE_ENC
10601061

10611062

10621063
def _raise_document_too_large(operation, doc_size, max_size):
@@ -1388,6 +1389,7 @@ def _batched_write_command_impl(
13881389
# Max BSON object size + 16k - 2 bytes for ending NUL bytes.
13891390
# Server guarantees there is enough room: SERVER-10643.
13901391
max_cmd_size = max_bson_size + _COMMAND_OVERHEAD
1392+
max_split_size = ctx.max_split_size
13911393

13921394
# No options
13931395
buf.write(_ZERO_32)
@@ -1424,12 +1426,13 @@ def _batched_write_command_impl(
14241426
# Is there enough room to add this document? max_cmd_size accounts for
14251427
# the two trailing null bytes.
14261428
doc_too_large = len(value) > max_cmd_size
1427-
enough_data = (buf.tell() + len(key) + len(value)) >= max_cmd_size
1428-
enough_documents = (idx >= max_write_batch_size)
14291429
if doc_too_large:
14301430
write_op = list(_FIELD_MAP.keys())[operation]
14311431
_raise_document_too_large(
14321432
write_op, len(value), max_bson_size)
1433+
enough_data = (idx >= 1 and
1434+
(buf.tell() + len(key) + len(value)) >= max_split_size)
1435+
enough_documents = (idx >= max_write_batch_size)
14331436
if enough_data or enough_documents:
14341437
break
14351438
buf.write(_BSONOBJ)

test/test_encryption.py

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@
3535
from bson.son import SON
3636

3737
from pymongo.cursor import CursorType
38-
from pymongo.errors import (ConfigurationError,
39-
EncryptionError,
40-
InvalidOperation,
41-
OperationFailure)
4238
from pymongo.encryption import (Algorithm,
4339
ClientEncryption)
44-
from pymongo.errors import ConfigurationError, DocumentTooLarge
4540
from pymongo.encryption_options import AutoEncryptionOpts, _HAVE_PYMONGOCRYPT
46-
from pymongo.message import _COMMAND_OVERHEAD
41+
from pymongo.errors import (BulkWriteError,
42+
ConfigurationError,
43+
EncryptionError,
44+
InvalidOperation,
45+
OperationFailure,
46+
WriteError)
4747
from pymongo.mongo_client import MongoClient
4848
from pymongo.operations import InsertOne
4949
from pymongo.write_concern import WriteConcern
@@ -918,6 +918,10 @@ def test_corpus_local_schema(self):
918918
self._test_corpus(opts)
919919

920920

921+
_2_MiB = 2097152
922+
_16_MiB = 16777216
923+
924+
921925
class TestBsonSizeBatches(EncryptionIntegrationTest):
922926
"""Prose tests for BSON size limits and batch splitting."""
923927

@@ -955,27 +959,14 @@ def tearDownClass(cls):
955959
super(TestBsonSizeBatches, cls).tearDownClass()
956960

957961
def test_01_insert_succeeds_under_2MiB(self):
958-
doc = {'_id': 'no_encryption_under_2mib',
959-
'unencrypted': 'a' * ((2**21) - 1000)}
962+
doc = {'_id': 'over_2mib_under_16mib', 'unencrypted': 'a' * _2_MiB}
960963
self.coll_encrypted.insert_one(doc)
961964

962965
# Same with bulk_write.
963-
doc = {'_id': 'no_encryption_under_2mib_bulk',
964-
'unencrypted': 'a' * ((2**21) - 1000)}
966+
doc['_id'] = 'over_2mib_under_16mib_bulk'
965967
self.coll_encrypted.bulk_write([InsertOne(doc)])
966968

967-
def test_02_insert_fails_over_2MiB(self):
968-
doc = {'_id': 'no_encryption_over_2mib',
969-
'unencrypted': 'a' * (2**21 + _COMMAND_OVERHEAD)}
970-
971-
with self.assertRaises(DocumentTooLarge):
972-
self.coll_encrypted.insert_one(doc)
973-
with self.assertRaises(DocumentTooLarge):
974-
self.coll_encrypted.insert_many([doc])
975-
with self.assertRaises(DocumentTooLarge):
976-
self.coll_encrypted.bulk_write([InsertOne(doc)])
977-
978-
def test_03_insert_succeeds_over_2MiB_post_encryption(self):
969+
def test_02_insert_succeeds_over_2MiB_post_encryption(self):
979970
doc = {'_id': 'encryption_exceeds_2mib',
980971
'unencrypted': 'a' * ((2**21) - 2000)}
981972
doc.update(json_data('limits', 'limits-doc.json'))
@@ -985,29 +976,53 @@ def test_03_insert_succeeds_over_2MiB_post_encryption(self):
985976
doc['_id'] = 'encryption_exceeds_2mib_bulk'
986977
self.coll_encrypted.bulk_write([InsertOne(doc)])
987978

988-
def test_04_bulk_batch_split(self):
989-
doc1 = {'_id': 'no_encryption_under_2mib_1',
990-
'unencrypted': 'a' * ((2**21) - 1000)}
991-
doc2 = {'_id': 'no_encryption_under_2mib_2',
992-
'unencrypted': 'a' * ((2**21) - 1000)}
979+
def test_03_bulk_batch_split(self):
980+
doc1 = {'_id': 'over_2mib_1', 'unencrypted': 'a' * _2_MiB}
981+
doc2 = {'_id': 'over_2mib_2', 'unencrypted': 'a' * _2_MiB}
993982
self.listener.reset()
994983
self.coll_encrypted.bulk_write([InsertOne(doc1), InsertOne(doc2)])
995984
self.assertEqual(
996985
self.listener.started_command_names(), ['insert', 'insert'])
997986

998-
def test_05_bulk_batch_split(self):
987+
def test_04_bulk_batch_split(self):
999988
limits_doc = json_data('limits', 'limits-doc.json')
1000989
doc1 = {'_id': 'encryption_exceeds_2mib_1',
1001-
'unencrypted': 'a' * ((2**21) - 2000)}
990+
'unencrypted': 'a' * (_2_MiB - 2000)}
1002991
doc1.update(limits_doc)
1003992
doc2 = {'_id': 'encryption_exceeds_2mib_2',
1004-
'unencrypted': 'a' * ((2**21) - 2000)}
993+
'unencrypted': 'a' * (_2_MiB - 2000)}
1005994
doc2.update(limits_doc)
1006995
self.listener.reset()
1007996
self.coll_encrypted.bulk_write([InsertOne(doc1), InsertOne(doc2)])
1008997
self.assertEqual(
1009998
self.listener.started_command_names(), ['insert', 'insert'])
1010999

1000+
def test_05_insert_succeeds_just_under_16MiB(self):
1001+
doc = {'_id': 'under_16mib', 'unencrypted': 'a' * (_16_MiB - 2000)}
1002+
self.coll_encrypted.insert_one(doc)
1003+
1004+
# Same with bulk_write.
1005+
doc['_id'] = 'under_16mib_bulk'
1006+
self.coll_encrypted.bulk_write([InsertOne(doc)])
1007+
1008+
def test_06_insert_fails_over_16MiB(self):
1009+
limits_doc = json_data('limits', 'limits-doc.json')
1010+
doc = {'_id': 'encryption_exceeds_16mib',
1011+
'unencrypted': 'a' * (_16_MiB - 2000)}
1012+
doc.update(limits_doc)
1013+
1014+
with self.assertRaisesRegex(WriteError, 'object to insert too large'):
1015+
self.coll_encrypted.insert_one(doc)
1016+
1017+
# Same with bulk_write.
1018+
doc['_id'] = 'encryption_exceeds_16mib_bulk'
1019+
with self.assertRaises(BulkWriteError) as ctx:
1020+
self.coll_encrypted.bulk_write([InsertOne(doc)])
1021+
err = ctx.exception.details['writeErrors'][0]
1022+
self.assertEqual(2, err['code'])
1023+
self.assertIn('object to insert too large', err['errmsg'])
1024+
1025+
10111026

10121027
class TestCustomEndpoint(EncryptionIntegrationTest):
10131028
"""Prose tests for creating data keys with a custom endpoint."""

0 commit comments

Comments
 (0)