Skip to content

Commit 449cb8f

Browse files
authored
PYTHON-2722 Improve performance of find/aggregate_raw_batches (#1047)
1 parent 0143881 commit 449cb8f

File tree

4 files changed

+245
-38
lines changed

4 files changed

+245
-38
lines changed

bson/__init__.py

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@
128128
from array import array
129129
from mmap import mmap
130130

131-
132131
try:
133132
from bson import _cbson # type: ignore[attr-defined]
134133

@@ -520,19 +519,32 @@ def _get_decimal128(
520519
if _USE_C:
521520

522521
def _element_to_dict(
523-
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions
522+
data: Any,
523+
view: Any,
524+
position: int,
525+
obj_end: int,
526+
opts: CodecOptions,
527+
raw_array: bool = False,
524528
) -> Any:
525-
return _cbson._element_to_dict(data, position, obj_end, opts)
529+
return _cbson._element_to_dict(data, position, obj_end, opts, raw_array)
526530

527531
else:
528532

529533
def _element_to_dict(
530-
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions
534+
data: Any,
535+
view: Any,
536+
position: int,
537+
obj_end: int,
538+
opts: CodecOptions,
539+
raw_array: bool = False,
531540
) -> Any:
532541
"""Decode a single key, value pair."""
533542
element_type = data[position]
534543
position += 1
535544
element_name, position = _get_c_string(data, view, position, opts)
545+
if raw_array and element_type == ord(BSONARR):
546+
_, end = _get_object_size(data, position, len(data))
547+
return element_name, view[position : end + 1], end + 1
536548
try:
537549
value, position = _ELEMENT_GETTER[element_type](
538550
data, view, position, obj_end, opts, element_name
@@ -551,20 +563,30 @@ def _element_to_dict(
551563
_T = TypeVar("_T", bound=MutableMapping[Any, Any])
552564

553565

554-
def _raw_to_dict(data: Any, position: int, obj_end: int, opts: CodecOptions, result: _T) -> _T:
566+
def _raw_to_dict(
567+
data: Any, position: int, obj_end: int, opts: CodecOptions, result: _T, raw_array: bool = False
568+
) -> _T:
555569
data, view = get_data_and_view(data)
556-
return _elements_to_dict(data, view, position, obj_end, opts, result)
570+
return _elements_to_dict(data, view, position, obj_end, opts, result, raw_array=raw_array)
557571

558572

559573
def _elements_to_dict(
560-
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions, result: Any = None
574+
data: Any,
575+
view: Any,
576+
position: int,
577+
obj_end: int,
578+
opts: CodecOptions,
579+
result: Any = None,
580+
raw_array: bool = False,
561581
) -> Any:
562582
"""Decode a BSON document into result."""
563583
if result is None:
564584
result = opts.document_class()
565585
end = obj_end - 1
566586
while position < end:
567-
key, value, position = _element_to_dict(data, view, position, obj_end, opts)
587+
key, value, position = _element_to_dict(
588+
data, view, position, obj_end, opts, raw_array=raw_array
589+
)
568590
result[key] = value
569591
if position != obj_end:
570592
raise InvalidBSON("bad object or element length")
@@ -1119,14 +1141,44 @@ def _decode_selective(rawdoc: Any, fields: Any, codec_options: Any) -> Mapping[A
11191141
return doc
11201142

11211143

1144+
def _array_of_documents_to_buffer(view: memoryview) -> bytes:
1145+
# Extract the raw bytes of each document.
1146+
position = 0
1147+
_, end = _get_object_size(view, position, len(view))
1148+
position += 4
1149+
buffers: List[memoryview] = []
1150+
append = buffers.append
1151+
while position < end - 1:
1152+
# Just skip the keys.
1153+
while view[position] != 0:
1154+
position += 1
1155+
position += 1
1156+
obj_size, _ = _get_object_size(view, position, end)
1157+
append(view[position : position + obj_size])
1158+
position += obj_size
1159+
if position != end:
1160+
raise InvalidBSON("bad object or element length")
1161+
return b"".join(buffers)
1162+
1163+
1164+
if _USE_C:
1165+
_array_of_documents_to_buffer = _cbson._array_of_documents_to_buffer # noqa: F811
1166+
1167+
11221168
def _convert_raw_document_lists_to_streams(document: Any) -> None:
1169+
"""Convert raw array of documents to a stream of BSON documents."""
11231170
cursor = document.get("cursor")
1124-
if cursor:
1125-
for key in ("firstBatch", "nextBatch"):
1126-
batch = cursor.get(key)
1127-
if batch:
1128-
stream = b"".join(doc.raw for doc in batch)
1129-
cursor[key] = [stream]
1171+
if not cursor:
1172+
return
1173+
for key in ("firstBatch", "nextBatch"):
1174+
batch = cursor.get(key)
1175+
if not batch:
1176+
continue
1177+
data = _array_of_documents_to_buffer(batch)
1178+
if data:
1179+
cursor[key] = [data]
1180+
else:
1181+
cursor[key] = []
11301182

11311183

11321184
def _decode_all_selective(data: Any, codec_options: CodecOptions, fields: Any) -> List[Any]:

bson/_cbsonmodule.c

Lines changed: 138 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1615,7 +1615,7 @@ static PyObject *_dbref_hook(PyObject* self, PyObject* value) {
16151615

16161616
static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
16171617
unsigned* position, unsigned char type,
1618-
unsigned max, const codec_options_t* options) {
1618+
unsigned max, const codec_options_t* options, int raw_array) {
16191619
struct module_state *state = GETSTATE(self);
16201620
PyObject* value = NULL;
16211621
switch (type) {
@@ -1712,11 +1712,20 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
17121712
if (size < BSON_MIN_SIZE || max < size) {
17131713
goto invalid;
17141714
}
1715+
17151716
end = *position + size - 1;
17161717
/* Check for bad eoo */
17171718
if (buffer[end]) {
17181719
goto invalid;
17191720
}
1721+
1722+
if (raw_array != 0) {
1723+
// Treat it as a binary buffer.
1724+
value = PyBytes_FromStringAndSize(buffer + *position, size);
1725+
*position += size;
1726+
break;
1727+
}
1728+
17201729
*position += 4;
17211730

17221731
value = PyList_New(0);
@@ -1740,7 +1749,7 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
17401749
goto invalid;
17411750
}
17421751
to_append = get_value(self, name, buffer, position, bson_type,
1743-
max - (unsigned)key_size, options);
1752+
max - (unsigned)key_size, options, raw_array);
17441753
Py_LeaveRecursiveCall();
17451754
if (!to_append) {
17461755
Py_DECREF(value);
@@ -2464,6 +2473,7 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
24642473
static int _element_to_dict(PyObject* self, const char* string,
24652474
unsigned position, unsigned max,
24662475
const codec_options_t* options,
2476+
int raw_array,
24672477
PyObject** name, PyObject** value) {
24682478
unsigned char type = (unsigned char)string[position++];
24692479
size_t name_length = strlen(string + position);
@@ -2504,7 +2514,7 @@ static int _element_to_dict(PyObject* self, const char* string,
25042514
}
25052515
position += (unsigned)name_length + 1;
25062516
*value = get_value(self, *name, string, &position, type,
2507-
max - position, options);
2517+
max - position, options, raw_array);
25082518
if (!*value) {
25092519
Py_DECREF(*name);
25102520
return -1;
@@ -2520,12 +2530,13 @@ static PyObject* _cbson_element_to_dict(PyObject* self, PyObject* args) {
25202530
unsigned position;
25212531
unsigned max;
25222532
int new_position;
2533+
int raw_array = 0;
25232534
PyObject* name;
25242535
PyObject* value;
25252536
PyObject* result_tuple;
25262537

2527-
if (!PyArg_ParseTuple(args, "OIIO&", &bson, &position, &max,
2528-
convert_codec_options, &options)) {
2538+
if (!PyArg_ParseTuple(args, "OIIO&p", &bson, &position, &max,
2539+
convert_codec_options, &options, &raw_array)) {
25292540
return NULL;
25302541
}
25312542

@@ -2535,8 +2546,7 @@ static PyObject* _cbson_element_to_dict(PyObject* self, PyObject* args) {
25352546
}
25362547
string = PyBytes_AS_STRING(bson);
25372548

2538-
new_position = _element_to_dict(self, string, position, max, &options,
2539-
&name, &value);
2549+
new_position = _element_to_dict(self, string, position, max, &options, raw_array, &name, &value);
25402550
if (new_position < 0) {
25412551
return NULL;
25422552
}
@@ -2560,13 +2570,14 @@ static PyObject* _elements_to_dict(PyObject* self, const char* string,
25602570
if (!dict) {
25612571
return NULL;
25622572
}
2573+
int raw_array = 0;
25632574
while (position < max) {
25642575
PyObject* name = NULL;
25652576
PyObject* value = NULL;
25662577
int new_position;
25672578

25682579
new_position = _element_to_dict(
2569-
self, string, position, max, options, &name, &value);
2580+
self, string, position, max, options, raw_array, &name, &value);
25702581
if (new_position < 0) {
25712582
Py_DECREF(dict);
25722583
return NULL;
@@ -2649,7 +2660,6 @@ static PyObject* _cbson_bson_to_dict(PyObject* self, PyObject* args) {
26492660
}
26502661

26512662
string = (char*)view.buf;
2652-
26532663
memcpy(&size, string, 4);
26542664
size = (int32_t)BSON_UINT32_FROM_LE(size);
26552665
if (size < BSON_MIN_SIZE) {
@@ -2797,6 +2807,124 @@ static PyObject* _cbson_decode_all(PyObject* self, PyObject* args) {
27972807
return result;
27982808
}
27992809

2810+
2811+
static PyObject* _cbson_array_of_documents_to_buffer(PyObject* self, PyObject* args) {
2812+
uint32_t size;
2813+
uint32_t value_length;
2814+
uint32_t position = 0;
2815+
buffer_t buffer;
2816+
const char* string;
2817+
PyObject* arr;
2818+
PyObject* result = NULL;
2819+
Py_buffer view = {0};
2820+
2821+
if (!PyArg_ParseTuple(args, "O", &arr)) {
2822+
return NULL;
2823+
}
2824+
2825+
if (!_get_buffer(arr, &view)) {
2826+
return NULL;
2827+
}
2828+
2829+
buffer = pymongo_buffer_new();
2830+
if (!buffer) {
2831+
PyBuffer_Release(&view);
2832+
return NULL;
2833+
}
2834+
2835+
string = (char*)view.buf;
2836+
2837+
if (view.len < BSON_MIN_SIZE) {
2838+
PyObject* InvalidBSON = _error("InvalidBSON");
2839+
if (InvalidBSON) {
2840+
PyErr_SetString(InvalidBSON,
2841+
"not enough data for a BSON document");
2842+
Py_DECREF(InvalidBSON);
2843+
}
2844+
goto done;
2845+
}
2846+
2847+
memcpy(&size, string, 4);
2848+
size = BSON_UINT32_FROM_LE(size);
2849+
/* save space for length */
2850+
if (pymongo_buffer_save_space(buffer, size) == -1) {
2851+
goto fail;
2852+
}
2853+
pymongo_buffer_update_position(buffer, 0);
2854+
2855+
position += 4;
2856+
while (position < size - 1) {
2857+
// Verify the value is an object.
2858+
unsigned char type = (unsigned char)string[position];
2859+
if (type != 3) {
2860+
PyObject* InvalidBSON = _error("InvalidBSON");
2861+
if (InvalidBSON) {
2862+
PyErr_SetString(InvalidBSON, "array element was not an object");
2863+
Py_DECREF(InvalidBSON);
2864+
}
2865+
goto fail;
2866+
}
2867+
2868+
// Just skip the keys.
2869+
position = position + strlen(string + position) + 1;
2870+
2871+
if (position >= size || (size - position) < BSON_MIN_SIZE) {
2872+
PyObject* InvalidBSON = _error("InvalidBSON");
2873+
if (InvalidBSON) {
2874+
PyErr_SetString(InvalidBSON, "invalid array content");
2875+
Py_DECREF(InvalidBSON);
2876+
}
2877+
goto fail;
2878+
}
2879+
2880+
memcpy(&value_length, string + position, 4);
2881+
value_length = BSON_UINT32_FROM_LE(value_length);
2882+
if (value_length < BSON_MIN_SIZE) {
2883+
PyObject* InvalidBSON = _error("InvalidBSON");
2884+
if (InvalidBSON) {
2885+
PyErr_SetString(InvalidBSON, "invalid message size");
2886+
Py_DECREF(InvalidBSON);
2887+
}
2888+
goto fail;
2889+
}
2890+
2891+
if (view.len < size) {
2892+
PyObject* InvalidBSON = _error("InvalidBSON");
2893+
if (InvalidBSON) {
2894+
PyErr_SetString(InvalidBSON, "objsize too large");
2895+
Py_DECREF(InvalidBSON);
2896+
}
2897+
goto fail;
2898+
}
2899+
2900+
if (string[size - 1]) {
2901+
PyObject* InvalidBSON = _error("InvalidBSON");
2902+
if (InvalidBSON) {
2903+
PyErr_SetString(InvalidBSON, "bad eoo");
2904+
Py_DECREF(InvalidBSON);
2905+
}
2906+
goto fail;
2907+
}
2908+
2909+
if (pymongo_buffer_write(buffer, string + position, value_length) == 1) {
2910+
goto fail;
2911+
}
2912+
position += value_length;
2913+
}
2914+
2915+
/* objectify buffer */
2916+
result = Py_BuildValue("y#", pymongo_buffer_get_buffer(buffer),
2917+
(Py_ssize_t)pymongo_buffer_get_position(buffer));
2918+
goto done;
2919+
fail:
2920+
result = NULL;
2921+
done:
2922+
PyBuffer_Release(&view);
2923+
pymongo_buffer_free(buffer);
2924+
return result;
2925+
}
2926+
2927+
28002928
static PyMethodDef _CBSONMethods[] = {
28012929
{"_dict_to_bson", _cbson_dict_to_bson, METH_VARARGS,
28022930
"convert a dictionary to a string containing its BSON representation."},
@@ -2806,6 +2934,7 @@ static PyMethodDef _CBSONMethods[] = {
28062934
"convert binary data to a sequence of documents."},
28072935
{"_element_to_dict", _cbson_element_to_dict, METH_VARARGS,
28082936
"Decode a single key, value pair."},
2937+
{"_array_of_documents_to_buffer", _cbson_array_of_documents_to_buffer, METH_VARARGS, "Convert raw array of documents to a stream of BSON documents"},
28092938
{NULL, NULL, 0, NULL}
28102939
};
28112940

0 commit comments

Comments
 (0)