1
1
import itertools
2
+ import sys
2
3
from collections import defaultdict
3
4
4
5
from bson import SON , json_util
11
12
from django .db .models .functions .math import Power
12
13
from django .db .models .lookups import IsNull
13
14
from django .db .models .sql import compiler
14
- from django .db .models .sql .constants import GET_ITERATOR_CHUNK_SIZE , MULTI , SINGLE
15
+ from django .db .models .sql .constants import MULTI , SINGLE
15
16
from django .db .models .sql .datastructures import BaseTable
16
17
from django .utils .functional import cached_property
17
18
from pymongo import ASCENDING , DESCENDING
18
19
19
20
from .query import MongoQuery , wrap_database_errors
20
21
22
+ # Maximum document batch size for MongoDB cursor responses.
23
+ MAX_BATCH_SIZE_MB = 1024 * 1024 * 16
24
+
21
25
22
26
class SQLCompiler (compiler .SQLCompiler ):
23
27
"""Base class for all Mongo compilers."""
@@ -235,9 +239,7 @@ def pre_sql_setup(self, with_col_aliases=False):
235
239
self .order_by_objs = [expr .replace_expressions (all_replacements ) for expr , _ in order_by ]
236
240
return extra_select , order_by , group_by
237
241
238
- def execute_sql (
239
- self , result_type = MULTI , chunked_fetch = False , chunk_size = GET_ITERATOR_CHUNK_SIZE
240
- ):
242
+ def execute_sql (self , result_type = MULTI , chunked_fetch = False , chunk_size = None ):
241
243
self .pre_sql_setup ()
242
244
try :
243
245
query = self .build_query (
@@ -258,7 +260,8 @@ def execute_sql(
258
260
else :
259
261
return self ._make_result (obj , self .columns )
260
262
# result_type is MULTI
261
- cursor .batch_size (chunk_size )
263
+ # if chunk_size is not None:
264
+ # cursor.batch_size(chunk_size)
262
265
result = self .cursor_iter (cursor , chunk_size , self .columns )
263
266
if not chunked_fetch :
264
267
# If using non-chunked reads, read data into memory.
@@ -270,7 +273,7 @@ def results_iter(
270
273
results = None ,
271
274
tuple_expected = False ,
272
275
chunked_fetch = False ,
273
- chunk_size = GET_ITERATOR_CHUNK_SIZE ,
276
+ chunk_size = None ,
274
277
):
275
278
"""
276
279
Return an iterator over the results from executing query given
@@ -318,12 +321,25 @@ def _make_result(self, entity, columns):
318
321
result .append (obj .get (name ))
319
322
return result
320
323
321
- def cursor_iter (self , cursor , chunk_size , columns ):
322
- """Yield chunks of results from cursor."""
324
+ def cursor_iter (self , cursor , _ , columns ):
325
+ """
326
+ Yield chunks of results from cursor.
327
+ MongoDB ignores all chunk_size overrides. Cursor iteration abides by
328
+ MongoDB's default cursor batch size response.
329
+ Read more here: https://www.mongodb.com/docs/manual/core/cursors/#cursor-batches
330
+ """
323
331
chunk = []
324
- for row in cursor :
332
+ chunk_size = 101 # MongoDB's default initial batch size
333
+
334
+ for i , row in enumerate (cursor ):
325
335
chunk .append (self ._make_result (row , columns ))
336
+
326
337
if len (chunk ) == chunk_size :
338
+ if i == chunk_size - 1 : # First chunk complete
339
+ # Using current row as representation, approximate
340
+ # how many rows can fit in a 16MB payload (MongoDB batch_size max)
341
+ # then set that as the new chunk size.
342
+ chunk_size = MAX_BATCH_SIZE_MB // sys .getsizeof (row )
327
343
yield chunk
328
344
chunk = []
329
345
yield chunk
0 commit comments