1
1
import operator
2
- from datetime import timedelta
3
2
from decimal import Decimal
4
3
5
4
from django .core .exceptions import FieldDoesNotExist , ValidationError
6
- from django .db .models import ExpressionWrapper , F , IntegerField , Max , Model , Sum
5
+ from django .db .models import (
6
+ Exists ,
7
+ ExpressionWrapper ,
8
+ F ,
9
+ IntegerField ,
10
+ Max ,
11
+ Model ,
12
+ OuterRef ,
13
+ Subquery ,
14
+ Sum ,
15
+ )
7
16
from django .test import SimpleTestCase , TestCase
8
17
9
18
from django_mongodb .fields import EmbeddedModelField
16
25
DecimalParent ,
17
26
EmbeddedModel ,
18
27
EmbeddedModelFieldModel ,
28
+ Library ,
19
29
)
20
30
21
31
@@ -186,9 +196,9 @@ def test_embedded_with_json_field(self):
186
196
models [1 :3 ],
187
197
)
188
198
189
- @ staticmethod
190
- def _truncate_ms ( time ):
191
- return time - timedelta ( microseconds = time .microsecond )
199
+ def truncate_ms ( self , value ):
200
+ """Truncate microsends to millisecond precision as supported by MongoDB."""
201
+ return value . replace ( microsecond = ( value .microsecond // 1000 ) * 1000 )
192
202
193
203
################
194
204
def test_ordering_by_embedded_field (self ):
@@ -201,23 +211,26 @@ def test_ordering_by_embedded_field(self):
201
211
self .assertSequenceEqual (query , expected )
202
212
203
213
def test_ordering_grouping_by_embedded_field (self ):
214
+ expected = sorted (
215
+ (
216
+ EmbeddedModelFieldModel .objects .create (simple = EmbeddedModel (someint = x ))
217
+ for x in range (6 )
218
+ ),
219
+ key = lambda x : x .simple .someint ,
220
+ )
204
221
query = (
205
222
EmbeddedModelFieldModel .objects .annotate (
206
223
group = ExpressionWrapper (F ("simple__someint" ) + 5 , output_field = IntegerField ())
207
224
)
208
225
.values ("group" )
209
- .annotate (max_pk = Max ("simple__auto_now" ))
226
+ .annotate (max_auto_now = Max ("simple__auto_now" ))
210
227
.order_by ("simple__someint" )
211
228
)
212
- query = [{** e , "max_pk" : self ._truncate_ms (e ["max_pk" ])} for e in query ]
213
- expected = [
214
- EmbeddedModelFieldModel .objects .create (simple = EmbeddedModel (someint = x ))
215
- for x in range (6 )
216
- ]
229
+ query_response = [{** e , "max_auto_now" : self .truncate_ms (e ["max_auto_now" ])} for e in query ]
217
230
self .assertSequenceEqual (
218
- query ,
231
+ query_response ,
219
232
[
220
- {"group" : e .simple .someint + 5 , "max_pk " : self ._truncate_ms (e .simple .auto_now )}
233
+ {"group" : e .simple .someint + 5 , "max_auto_now " : self .truncate_ms (e .simple .auto_now )}
221
234
for e in expected
222
235
],
223
236
)
@@ -230,3 +243,62 @@ def test_ordering_grouping_by_sum(self):
230
243
.order_by ("sum" )
231
244
)
232
245
self .assertQuerySetEqual (qs , [0 , 2 , 4 , 6 , 8 , 10 ], operator .itemgetter ("sum" ))
246
+
247
+
248
+ class SubqueryExistsTestCase (TestCase ):
249
+ def setUp (self ):
250
+ # Create test data
251
+ address1 = Address .objects .create (city = "New York" , state = "NY" , zip_code = 10001 )
252
+ address2 = Address .objects .create (city = "Boston" , state = "MA" , zip_code = 20002 )
253
+ author1 = Author .objects .create (name = "Alice" , age = 30 , address = address1 )
254
+ author2 = Author .objects .create (name = "Bob" , age = 40 , address = address2 )
255
+ book1 = Book .objects .create (name = "Book A" , author = author1 )
256
+ book2 = Book .objects .create (name = "Book B" , author = author2 )
257
+ Book .objects .create (name = "Book C" , author = author2 )
258
+ Book .objects .create (name = "Book D" , author = author2 )
259
+ Book .objects .create (name = "Book E" , author = author1 )
260
+
261
+ library1 = Library .objects .create (
262
+ name = "Central Library" , location = "Downtown" , best_seller = "Book A"
263
+ )
264
+ library2 = Library .objects .create (
265
+ name = "Community Library" , location = "Suburbs" , best_seller = "Book A"
266
+ )
267
+
268
+ # Add books to libraries
269
+ library1 .books .add (book1 , book2 )
270
+ library2 .books .add (book2 )
271
+
272
+ def test_exists_subquery (self ):
273
+ subquery = Book .objects .filter (
274
+ author__name = OuterRef ("name" ), author__address__city = "Boston"
275
+ )
276
+ queryset = Author .objects .filter (Exists (subquery ))
277
+
278
+ self .assertEqual (queryset .count (), 1 )
279
+
280
+ def test_in_subquery (self ):
281
+ subquery = Author .objects .filter (age__gt = 35 ).values ("name" )
282
+ queryset = Book .objects .filter (author__name__in = Subquery (subquery )).order_by ("name" )
283
+
284
+ self .assertEqual (queryset .count (), 3 )
285
+ self .assertQuerySetEqual (queryset , ["Book B" , "Book C" , "Book D" ], lambda book : book .name )
286
+
287
+ def test_range_query (self ):
288
+ queryset = Author .objects .filter (age__range = (25 , 45 )).order_by ("name" )
289
+
290
+ self .assertEqual (queryset .count (), 2 )
291
+ self .assertQuerySetEqual (queryset , ["Alice" , "Bob" ], lambda author : author .name )
292
+
293
+ def test_exists_with_foreign_object (self ):
294
+ subquery = Library .objects .filter (best_seller = OuterRef ("name" ))
295
+ queryset = Book .objects .filter (Exists (subquery ))
296
+
297
+ self .assertEqual (queryset .count (), 1 )
298
+ self .assertEqual (queryset .first ().name , "Book A" )
299
+
300
+ def test_foreign_field_with_ranges (self ):
301
+ queryset = Library .objects .filter (books__author__age__range = (25 , 35 ))
302
+
303
+ self .assertEqual (queryset .count (), 1 )
304
+ self .assertEqual (queryset .first ().name , "Central Library" )
0 commit comments