4
4
from django .db import DatabaseError , IntegrityError , NotSupportedError
5
5
from django .db .models import Count , Expression
6
6
from django .db .models .aggregates import Aggregate
7
- from django .db .models .expressions import OrderBy , Value
7
+ from django .db .models .expressions import Col , OrderBy , Value
8
8
from django .db .models .sql import compiler
9
9
from django .db .models .sql .constants import GET_ITERATOR_CHUNK_SIZE , MULTI , ORDER_DIR , SINGLE
10
10
from django .utils .functional import cached_property
@@ -18,6 +18,62 @@ class SQLCompiler(compiler.SQLCompiler):
18
18
19
19
query_class = MongoQuery
20
20
21
+ def pre_sql_setup (self ):
22
+ super ().pre_sql_setup ()
23
+ self .annotations = {}
24
+ group = {}
25
+ group_expressions = set ()
26
+ aggregation_idx = 1
27
+ for target , expr in self .query .annotation_select .items ():
28
+ if not expr .contains_aggregate :
29
+ result_expr = expr
30
+ else :
31
+ replacements = {}
32
+ for sub_expr in self ._get_aggregate_expressions (expr ):
33
+ alias = f"__aggregation{ aggregation_idx } "
34
+ group [alias ] = sub_expr .as_mql (self , self .connection )
35
+ aggregation_idx += 1
36
+ column_target = expr .output_field .__class__ ()
37
+ column_target .set_attributes_from_name (alias )
38
+ replacements [sub_expr ] = Col (self .collection_name , column_target )
39
+ result_expr = expr .replace_expressions (replacements )
40
+
41
+ self .annotations [target ] = result_expr
42
+ if group :
43
+ """
44
+ order_by = self.get_order_by()
45
+ for expr, (_, _, is_ref) in order_by:
46
+ # Skip references to the SELECT clause, as all expressions in
47
+ # the SELECT clause are already part of the GROUP BY.
48
+ if not is_ref:
49
+ group_expressions |= set(expr.get_group_by_cols())
50
+ having_group_by = self.having.get_group_by_cols() if self.having else ()
51
+ for expr in having_group_by:
52
+ group_expressions.add(expr)
53
+ """
54
+
55
+ ids = (
56
+ None
57
+ if not group_expressions
58
+ else {
59
+ col .target .column : col .as_mql (self , self .connection )
60
+ for col in group_expressions
61
+ }
62
+ )
63
+ group ["_id" ] = ids
64
+
65
+ pipeline = [{"$group" : group }]
66
+ if ids :
67
+ pipeline .append (
68
+ {"$addFields" : {key : f"$_id.{ value [1 :]} " for key , value in ids .items ()}}
69
+ )
70
+ if "_id" not in ids :
71
+ pipeline .append ({"$unSet" : "$_id" })
72
+
73
+ self ._group_pipeline = pipeline
74
+ else :
75
+ self ._group_pipeline = None
76
+
21
77
def execute_sql (
22
78
self , result_type = MULTI , chunked_fetch = False , chunk_size = GET_ITERATOR_CHUNK_SIZE
23
79
):
@@ -85,7 +141,7 @@ def results_iter(
85
141
return rows
86
142
87
143
def has_results (self ):
88
- return bool (self .get_count ( check_exists = True ))
144
+ return bool (self .execute_sql ( SINGLE ))
89
145
90
146
def _make_result (self , entity , columns ):
91
147
"""
@@ -174,9 +230,9 @@ def build_query(self, columns=None):
174
230
"""Check if the query is supported and prepare a MongoQuery."""
175
231
self .check_query ()
176
232
query = self .query_class (self )
177
- query .project_fields = self .get_project_fields (columns )
178
- query .lookup_pipeline = self .get_lookup_pipeline ()
179
233
query .aggregation_stage = self .get_aggregation_pipeline ()
234
+ query .lookup_pipeline = self .get_lookup_pipeline ()
235
+ query .project_fields = self .get_project_fields (columns )
180
236
try :
181
237
query .mongo_query = {"$expr" : self .query .where .as_mql (self , self .connection )}
182
238
except FullResultSet :
@@ -216,7 +272,7 @@ def project_field(column):
216
272
217
273
return (
218
274
tuple (map (project_field , columns ))
219
- + tuple (self .query . annotation_select .items ())
275
+ + tuple (self .annotations .items ())
220
276
+ tuple (map (project_field , related_columns ))
221
277
)
222
278
@@ -281,52 +337,34 @@ def get_lookup_pipeline(self):
281
337
result += self .query .alias_map [alias ].as_mql (self , self .connection )
282
338
return result
283
339
284
- def get_aggregation_pipeline (self ):
285
- pipeline = None
286
- if any (isinstance (a , Aggregate ) for a in self .query .annotations .values ()):
287
- result = {}
288
- # self.get_group_by(self.select, [])
289
- for alias , annotation in self .query .annotation_select .items ():
290
- value = annotation .as_mql (self , self .connection )
291
- if isinstance (value , list ):
292
- value = value [0 ]
293
- result [alias ] = value
294
-
295
- expressions = set ()
296
- for expr , * _ in self .select :
297
- expressions |= set (expr .get_group_by_cols ())
298
- order_by = self .get_order_by ()
299
- for expr , (_ , _ , is_ref ) in order_by :
300
- # Skip references to the SELECT clause, as all expressions in
301
- # the SELECT clause are already part of the GROUP BY.
302
- if not is_ref :
303
- expressions |= set (expr .get_group_by_cols ())
304
- having_group_by = self .having .get_group_by_cols () if self .having else ()
305
- for expr in having_group_by :
306
- expressions .add (expr )
307
-
308
- ids = (
309
- None
310
- if not expressions
311
- else {col .target .column : col .as_mql (self , self .connection ) for col in expressions }
312
- )
313
- result ["_id" ] = ids
314
-
315
- pipeline = [{"$group" : result }]
316
- if ids :
317
- pipeline .append (
318
- {"$addFields" : {key : f"$_id.{ value [1 :]} " for key , value in ids .items ()}}
340
+ def _get_aggregate_expressions2 (self , expr ):
341
+ stack = [(None , expr )]
342
+ while stack :
343
+ parent , expr = stack .pop ()
344
+ if isinstance (expr , Aggregate ):
345
+ yield parent
346
+ elif hasattr (expr , "get_source_expressions" ):
347
+ stack .extend (
348
+ [((expr , idx ), se ) for idx , se in enumerate (expr .get_source_expressions ())]
319
349
)
320
- if "_id" not in ids :
321
- pipeline .append ({"$unSet" : "$_id" })
322
350
323
- return pipeline
351
+ def _get_aggregate_expressions (self , expr ):
352
+ stack = [expr ]
353
+ while stack :
354
+ expr = stack .pop ()
355
+ if isinstance (expr , Aggregate ):
356
+ yield expr
357
+ elif hasattr (expr , "get_source_expressions" ):
358
+ stack .extend (expr .get_source_expressions ())
359
+
360
+ def get_aggregation_pipeline (self ):
361
+ return self ._group_pipeline
324
362
325
363
def get_project_fields (self , columns = None ):
326
364
fields = {}
327
365
for name , expr in columns or []:
328
366
try :
329
- column = name if isinstance ( expr , Aggregate ) else expr .target .column
367
+ column = expr .target .column
330
368
except AttributeError :
331
369
# Generate the MQL for an annotation.
332
370
try :
0 commit comments