1- from django .core .exceptions import ValidationError
1+ import operator
2+
3+ from django .core .exceptions import FieldDoesNotExist , ValidationError
24from django .db import models
5+ from django .db .models import (
6+ Exists ,
7+ ExpressionWrapper ,
8+ F ,
9+ IntegerField ,
10+ Max ,
11+ OuterRef ,
12+ Subquery ,
13+ Sum ,
14+ )
315from django .test import SimpleTestCase , TestCase
416from django .test .utils import isolate_apps
517
1123 Book ,
1224 Data ,
1325 Holder ,
26+ Library ,
1427)
1528
1629
@@ -104,6 +117,85 @@ def test_nested(self):
104117 )
105118 self .assertCountEqual (Book .objects .filter (author__address__city = "NYC" ), [obj ])
106119
120+ def test_nested_not_exists (self ):
121+ msg = "Address.city has no field named 'president'"
122+ with self .assertRaisesMessage (FieldDoesNotExist , msg ):
123+ Book .objects .filter (author__address__city__president = "NYC" )
124+
125+ def test_not_exists_in_embedded (self ):
126+ msg = "Address has no field named 'floor'"
127+ with self .assertRaisesMessage (FieldDoesNotExist , msg ):
128+ Book .objects .filter (author__address__floor = "NYC" )
129+
130+ def test_embedded_with_json_field (self ):
131+ models = []
132+ for i in range (4 ):
133+ m = Holder .objects .create (
134+ data = Data (json_value = {"field1" : i * 5 , "field2" : {"0" : {"value" : list (range (i ))}}})
135+ )
136+ models .append (m )
137+
138+ all_models = Holder .objects .all ()
139+
140+ self .assertCountEqual (
141+ Holder .objects .filter (data__json_value__field2__0__value__0 = 0 ),
142+ models [1 :],
143+ )
144+ self .assertCountEqual (
145+ Holder .objects .filter (data__json_value__field2__0__value__1 = 1 ),
146+ models [2 :],
147+ )
148+ self .assertCountEqual (Holder .objects .filter (data__json_value__field2__0__value__1 = 5 ), [])
149+
150+ self .assertCountEqual (Holder .objects .filter (data__json_value__field1__lt = 100 ), all_models )
151+ self .assertCountEqual (Holder .objects .filter (data__json_value__field1__gt = 100 ), [])
152+ self .assertCountEqual (
153+ Holder .objects .filter (
154+ data__json_value__field1__gte = 5 , data__json_value__field1__lte = 10
155+ ),
156+ models [1 :3 ],
157+ )
158+
159+ def truncate_ms (self , value ):
160+ """Truncate microsends to millisecond precision as supported by MongoDB."""
161+ return value .replace (microsecond = (value .microsecond // 1000 ) * 1000 )
162+
163+ def test_ordering_by_embedded_field (self ):
164+ query = Holder .objects .filter (data__integer__gt = 3 ).order_by ("-data__integer" ).values ("pk" )
165+ expected = [{"pk" : e .pk } for e in list (reversed (self .objs [4 :]))]
166+ self .assertSequenceEqual (query , expected )
167+
168+ def test_ordering_grouping_by_embedded_field (self ):
169+ expected = sorted (
170+ (Holder .objects .create (data = Data (integer = x )) for x in range (6 )),
171+ key = lambda x : x .data .integer ,
172+ )
173+ query = (
174+ Holder .objects .annotate (
175+ group = ExpressionWrapper (F ("data__integer" ) + 5 , output_field = IntegerField ())
176+ )
177+ .values ("group" )
178+ .annotate (max_auto_now = Max ("data__auto_now" ))
179+ .order_by ("data__integer" )
180+ )
181+ query_response = [{** e , "max_auto_now" : self .truncate_ms (e ["max_auto_now" ])} for e in query ]
182+ self .assertSequenceEqual (
183+ query_response ,
184+ [
185+ {"group" : e .data .integer + 5 , "max_auto_now" : self .truncate_ms (e .data .auto_now )}
186+ for e in expected
187+ ],
188+ )
189+
190+ def test_ordering_grouping_by_sum (self ):
191+ [Holder .objects .create (data = Data (integer = x )) for x in range (6 )]
192+ qs = (
193+ Holder .objects .values ("data__integer" )
194+ .annotate (sum = Sum ("data__integer" ))
195+ .order_by ("sum" )
196+ )
197+ self .assertQuerySetEqual (qs , [0 , 2 , 4 , 6 , 8 , 10 ], operator .itemgetter ("sum" ))
198+
107199
108200@isolate_apps ("model_fields_" )
109201class CheckTests (SimpleTestCase ):
@@ -123,3 +215,62 @@ class MyModel(models.Model):
123215 self .assertEqual (
124216 msg , "Embedded models cannot have relational fields (Target.key is a ForeignKey)."
125217 )
218+
219+
220+ class SubqueryExistsTest (TestCase ):
221+ def setUp (self ):
222+ # Create test data
223+ address1 = Address .objects .create (city = "New York" , state = "NY" , zip_code = 10001 )
224+ address2 = Address .objects .create (city = "Boston" , state = "MA" , zip_code = 20002 )
225+ author1 = Author .objects .create (name = "Alice" , age = 30 , address = address1 )
226+ author2 = Author .objects .create (name = "Bob" , age = 40 , address = address2 )
227+ book1 = Book .objects .create (name = "Book A" , author = author1 )
228+ book2 = Book .objects .create (name = "Book B" , author = author2 )
229+ Book .objects .create (name = "Book C" , author = author2 )
230+ Book .objects .create (name = "Book D" , author = author2 )
231+ Book .objects .create (name = "Book E" , author = author1 )
232+
233+ library1 = Library .objects .create (
234+ name = "Central Library" , location = "Downtown" , best_seller = "Book A"
235+ )
236+ library2 = Library .objects .create (
237+ name = "Community Library" , location = "Suburbs" , best_seller = "Book A"
238+ )
239+
240+ # Add books to libraries
241+ library1 .books .add (book1 , book2 )
242+ library2 .books .add (book2 )
243+
244+ def test_exists_subquery (self ):
245+ subquery = Book .objects .filter (
246+ author__name = OuterRef ("name" ), author__address__city = "Boston"
247+ )
248+ queryset = Author .objects .filter (Exists (subquery ))
249+
250+ self .assertEqual (queryset .count (), 1 )
251+
252+ def test_in_subquery (self ):
253+ subquery = Author .objects .filter (age__gt = 35 ).values ("name" )
254+ queryset = Book .objects .filter (author__name__in = Subquery (subquery )).order_by ("name" )
255+
256+ self .assertEqual (queryset .count (), 3 )
257+ self .assertQuerySetEqual (queryset , ["Book B" , "Book C" , "Book D" ], lambda book : book .name )
258+
259+ def test_range_query (self ):
260+ queryset = Author .objects .filter (age__range = (25 , 45 )).order_by ("name" )
261+
262+ self .assertEqual (queryset .count (), 2 )
263+ self .assertQuerySetEqual (queryset , ["Alice" , "Bob" ], lambda author : author .name )
264+
265+ def test_exists_with_foreign_object (self ):
266+ subquery = Library .objects .filter (best_seller = OuterRef ("name" ))
267+ queryset = Book .objects .filter (Exists (subquery ))
268+
269+ self .assertEqual (queryset .count (), 1 )
270+ self .assertEqual (queryset .first ().name , "Book A" )
271+
272+ def test_foreign_field_with_ranges (self ):
273+ queryset = Library .objects .filter (books__author__age__range = (25 , 35 ))
274+
275+ self .assertEqual (queryset .count (), 1 )
276+ self .assertEqual (queryset .first ().name , "Central Library" )
0 commit comments