diff --git a/django_mongodb_backend/compiler.py b/django_mongodb_backend/compiler.py index e74f6a85..9d6e1cc1 100644 --- a/django_mongodb_backend/compiler.py +++ b/django_mongodb_backend/compiler.py @@ -9,14 +9,16 @@ from django.db.models.expressions import Case, Col, OrderBy, Ref, Value, When from django.db.models.functions.comparison import Coalesce from django.db.models.functions.math import Power -from django.db.models.lookups import IsNull +from django.db.models.lookups import IsNull, Lookup from django.db.models.sql import compiler from django.db.models.sql.constants import GET_ITERATOR_CHUNK_SIZE, MULTI, SINGLE from django.db.models.sql.datastructures import BaseTable +from django.db.models.sql.where import AND, WhereNode from django.utils.functional import cached_property from pymongo import ASCENDING, DESCENDING from .query import MongoQuery, wrap_database_errors +from .query_utils import is_direct_value class SQLCompiler(compiler.SQLCompiler): @@ -548,10 +550,26 @@ def get_combinator_queries(self): def get_lookup_pipeline(self): result = [] + # To improve join performance, push conditions (filters) from the + # WHERE ($match) clause to the JOIN ($lookup) clause. + where = self.get_where() + pushed_filters = defaultdict(list) + for expr in where.children if where and where.connector == AND else (): + # Push only basic lookups; no subqueries or complex conditions. + # To avoid duplication across subqueries, only use the LHS target + # table. + if ( + isinstance(expr, Lookup) + and isinstance(expr.lhs, Col) + and (is_direct_value(expr.rhs) or isinstance(expr.rhs, Value | Col)) + ): + pushed_filters[expr.lhs.alias].append(expr) for alias in tuple(self.query.alias_map): if not self.query.alias_refcount[alias] or self.collection_name == alias: continue - result += self.query.alias_map[alias].as_mql(self, self.connection) + result += self.query.alias_map[alias].as_mql( + self, self.connection, WhereNode(pushed_filters[alias], connector=AND) + ) return result def _get_aggregate_expressions(self, expr): diff --git a/django_mongodb_backend/query.py b/django_mongodb_backend/query.py index 04977520..25122cc5 100644 --- a/django_mongodb_backend/query.py +++ b/django_mongodb_backend/query.py @@ -123,25 +123,21 @@ def extra_where(self, compiler, connection): # noqa: ARG001 raise NotSupportedError("QuerySet.extra() is not supported on MongoDB.") -def join(self, compiler, connection): - lookup_pipeline = [] - lhs_fields = [] - rhs_fields = [] - # Add a join condition for each pair of joining fields. +def join(self, compiler, connection, pushed_filter_expression=None): + """ + Generate a MongoDB $lookup stage for a join. + + `pushed_filter_expression` is a Where expression involving fields from the + joined collection which can be pushed from the WHERE ($match) clause to the + JOIN ($lookup) clause to improve performance. + """ parent_template = "parent__field__" - for lhs, rhs in self.join_fields: - lhs, rhs = connection.ops.prepare_join_on_clause( - self.parent_alias, lhs, compiler.collection_name, rhs - ) - lhs_fields.append(lhs.as_mql(compiler, connection)) - # In the lookup stage, the reference to this column doesn't include - # the collection name. - rhs_fields.append(rhs.as_mql(compiler, connection)) - # Handle any join conditions besides matching field pairs. - extra = self.join_field.get_extra_restriction(self.table_alias, self.parent_alias) - if extra: + + def _get_reroot_replacements(expression): + if not expression: + return None columns = [] - for expr in extra.leaves(): + for expr in expression.leaves(): # Determine whether the column needs to be transformed or rerouted # as part of the subquery. for hand_side in ["lhs", "rhs"]: @@ -151,7 +147,7 @@ def join(self, compiler, connection): # lhs_fields. if hand_side_value.alias != self.table_alias: pos = len(lhs_fields) - lhs_fields.append(expr.lhs.as_mql(compiler, connection)) + lhs_fields.append(hand_side_value.as_mql(compiler, connection)) else: pos = None columns.append((hand_side_value, pos)) @@ -159,7 +155,9 @@ def join(self, compiler, connection): # based on their rerouted positions in the join pipeline. replacements = {} for col, parent_pos in columns: - column_target = Col(compiler.collection_name, expr.output_field.__class__()) + target = col.target.clone() + target.remote_field = col.target.remote_field + column_target = Col(compiler.collection_name, target) if parent_pos is not None: target_col = f"${parent_template}{parent_pos}" column_target.target.db_column = target_col @@ -167,11 +165,43 @@ def join(self, compiler, connection): else: column_target.target = col.target replacements[col] = column_target - # Apply the transformed expressions in the extra condition. - extra_condition = [extra.replace_expressions(replacements).as_mql(compiler, connection)] - else: - extra_condition = [] + return replacements + lookup_pipeline = [] + lhs_fields = [] + rhs_fields = [] + # Add a join condition for each pair of joining fields. + for lhs, rhs in self.join_fields: + lhs, rhs = connection.ops.prepare_join_on_clause( + self.parent_alias, lhs, compiler.collection_name, rhs + ) + lhs_fields.append(lhs.as_mql(compiler, connection)) + # In the lookup stage, the reference to this column doesn't include the + # collection name. + rhs_fields.append(rhs.as_mql(compiler, connection)) + # Handle any join conditions besides matching field pairs. + extra = self.join_field.get_extra_restriction(self.table_alias, self.parent_alias) + extra_conditions = [] + if extra: + replacements = _get_reroot_replacements(extra) + extra_conditions.append( + extra.replace_expressions(replacements).as_mql(compiler, connection) + ) + # pushed_filter_expression is a Where expression from the outer WHERE + # clause that involves fields from the joined (right-hand) table and + # possibly the outer (left-hand) table. If it can be safely evaluated + # within the $lookup pipeline (e.g., field comparisons like + # right.status = left.id), it is "pushed" into the join's $match stage to + # reduce the volume of joined documents. This only applies to INNER JOINs, + # as pushing filters into a LEFT JOIN can change the semantics of the + # result. LEFT JOINs may rely on null checks to detect missing RHS. + if pushed_filter_expression and self.join_type == INNER: + rerooted_replacement = _get_reroot_replacements(pushed_filter_expression) + extra_conditions.append( + pushed_filter_expression.replace_expressions(rerooted_replacement).as_mql( + compiler, connection + ) + ) lookup_pipeline = [ { "$lookup": { @@ -197,7 +227,7 @@ def join(self, compiler, connection): {"$eq": [f"$${parent_template}{i}", field]} for i, field in enumerate(rhs_fields) ] - + extra_condition + + extra_conditions } } } diff --git a/docs/source/releases/5.2.x.rst b/docs/source/releases/5.2.x.rst index 4efdd2ec..83dbd332 100644 --- a/docs/source/releases/5.2.x.rst +++ b/docs/source/releases/5.2.x.rst @@ -32,7 +32,13 @@ Bug fixes databases. - :meth:`QuerySet.explain() ` now :ref:`returns a string that can be parsed as JSON `. + +Performance improvements +------------------------ + - Improved ``QuerySet`` performance by removing low limit on server-side chunking. +- Improved ``QuerySet`` join (``$lookup``) performance by pushing some simple + conditions from the ``WHERE`` (``$match``) clause to the ``$lookup`` stage. 5.2.0 beta 1 ============ diff --git a/tests/queries_/models.py b/tests/queries_/models.py index 01510224..2e56ebc6 100644 --- a/tests/queries_/models.py +++ b/tests/queries_/models.py @@ -53,3 +53,18 @@ class Meta: def __str__(self): return str(self.pk) + + +class Reader(models.Model): + name = models.CharField(max_length=20) + + def __str__(self): + return self.name + + +class Library(models.Model): + name = models.CharField(max_length=20) + readers = models.ManyToManyField(Reader, related_name="libraries") + + def __str__(self): + return self.name diff --git a/tests/queries_/test_mql.py b/tests/queries_/test_mql.py index d61e5839..fed955a9 100644 --- a/tests/queries_/test_mql.py +++ b/tests/queries_/test_mql.py @@ -1,6 +1,23 @@ +import json +import re + +from bson import ObjectId +from django.db import models from django.test import TestCase -from .models import Author, Book +from .models import Author, Book, Library, Order, Tag + + +def uglify_mongo_aggregate(query_str): + """Remove whitespace from a formatted query.""" + # TODO: replace this with a better assertion help, e.g. + # self.assertQuery("collection", [pipeline]) + m = re.match(r"^(.*?)\((.*)\)$", query_str.strip(), re.DOTALL) + if not m: + raise ValueError("String does not match the expected pattern: prefix(...)") + prefix, inside = m.groups() + inside = str(json.loads(inside)) + return f"{prefix}({inside})" class MQLTests(TestCase): @@ -20,7 +37,533 @@ def test_join(self): "{'$lookup': {'from': 'queries__author', " "'let': {'parent__field__0': '$author_id'}, " "'pipeline': [{'$match': {'$expr': " - "{'$and': [{'$eq': ['$$parent__field__0', '$_id']}]}}}], 'as': 'queries__author'}}, " + "{'$and': [{'$eq': ['$$parent__field__0', '$_id']}, " + "{'$eq': ['$name', 'Bob']}]}}}], 'as': 'queries__author'}}, " "{'$unwind': '$queries__author'}, " "{'$match': {'$expr': {'$eq': ['$queries__author.name', 'Bob']}}}])", ) + + +class FKLookupConditionPushdownTests(TestCase): + def test_filter_on_local_and_related_fields(self): + with self.assertNumQueries(1) as ctx: + list(Book.objects.filter(title="Don", author__name="John")) + query = ctx.captured_queries[0]["sql"] + self.assertEqual( + query, + "db.queries__book.aggregate([" + "{'$lookup': {'from': 'queries__author', " + "'let': {'parent__field__0': '$author_id'}, 'pipeline': [" + "{'$match': {'$expr': {'$and': [{'$eq': ['$$parent__field__0', " + "'$_id']}, {'$eq': ['$name', 'John']}]}}}], 'as': " + "'queries__author'}}, {'$unwind': '$queries__author'}, {'$match': " + "{'$expr': {'$and': [{'$eq': ['$queries__author.name', 'John']}, " + "{'$eq': ['$title', 'Don']}]}}}])", + ) + + def test_or_mixing_local_and_related_fields_is_not_pushable(self): + with self.assertNumQueries(1) as ctx: + list(Book.objects.filter(models.Q(title="Don") | models.Q(author__name="John"))) + query = ctx.captured_queries[0]["sql"] + self.assertEqual( + query, + "db.queries__book.aggregate([{'$lookup': {'from': " + "'queries__author', 'let': {'parent__field__0': '$author_id'}, " + "'pipeline': [{'$match': {'$expr': {'$and': [{'$eq': " + "['$$parent__field__0', '$_id']}]}}}], 'as': 'queries__author'}}, " + "{'$unwind': '$queries__author'}, {'$match': {'$expr': {'$or': " + "[{'$eq': ['$title', 'Don']}, {'$eq': ['$queries__author.name', " + "'John']}]}}}])", + ) + + def test_filter_on_self_join_fields(self): + with self.assertNumQueries(1) as ctx: + list( + Tag.objects.filter( + parent__name="parent", parent__group_id=ObjectId("6891ff7822e475eddc20f159") + ) + ) + query = ctx.captured_queries[0]["sql"] + self.assertEqual( + query, + "db.queries__tag.aggregate([{'$lookup': {'from': 'queries__tag', 'let': " + "{'parent__field__0': '$parent_id'}, 'pipeline': [{'$match': {'$expr': " + "{'$and': [{'$eq': ['$$parent__field__0', '$_id']}, {'$and': [{'$eq': " + "['$group_id', ObjectId('6891ff7822e475eddc20f159')]}, {'$eq': ['$name', " + "'parent']}]}]}}}], 'as': 'T2'}}, {'$unwind': '$T2'}, {'$match': {'$expr': " + "{'$and': [{'$eq': ['$T2.group_id', ObjectId('6891ff7822e475eddc20f159')]}, " + "{'$eq': ['$T2.name', 'parent']}]}}}])", + ) + + def test_filter_on_reverse_foreignkey_relation(self): + with self.assertNumQueries(1) as ctx: + list(Order.objects.filter(items__status=ObjectId("6891ff7822e475eddc20f159"))) + query = ctx.captured_queries[0]["sql"] + self.assertEqual( + query, + "db.queries__order.aggregate([{'$lookup': {'from': " + "'queries__orderitem', 'let': {'parent__field__0': '$_id'}, " + "'pipeline': [{'$match': {'$expr': {'$and': [{'$eq': " + "['$$parent__field__0', '$order_id']}, {'$eq': ['$status', " + "ObjectId('6891ff7822e475eddc20f159')]}]}}}], 'as': " + "'queries__orderitem'}}, {'$unwind': '$queries__orderitem'}, " + "{'$match': {'$expr': {'$eq': ['$queries__orderitem.status', " + "ObjectId('6891ff7822e475eddc20f159')]}}}, " + "{'$addFields': {'_id': '$_id'}}, {'$sort': SON([('_id', 1)])}])", + ) + + def test_filter_on_local_and_nested_join_fields(self): + with self.assertNumQueries(1) as ctx: + list( + Order.objects.filter( + name="My Order", + items__order__name="My Order", + items__status=ObjectId("6891ff7822e475eddc20f159"), + ) + ) + query = ctx.captured_queries[0]["sql"] + self.assertEqual( + query, + "db.queries__order.aggregate([{'$lookup': {'from': " + "'queries__orderitem', 'let': {'parent__field__0': '$_id'}, " + "'pipeline': [{'$match': {'$expr': {'$and': [{'$eq': " + "['$$parent__field__0', '$order_id']}, {'$eq': ['$status', " + "ObjectId('6891ff7822e475eddc20f159')]}]}}}], 'as': " + "'queries__orderitem'}}, {'$unwind': '$queries__orderitem'}, " + "{'$lookup': {'from': 'queries__order', 'let': " + "{'parent__field__0': '$queries__orderitem.order_id'}, " + "'pipeline': [{'$match': {'$expr': {'$and': [{'$eq': " + "['$$parent__field__0', '$_id']}, {'$eq': ['$name', 'My Order']}]}" + "}}], 'as': 'T3'}}, {'$unwind': '$T3'}, {'$match': {'$expr': " + "{'$and': [{'$eq': ['$T3.name', 'My Order']}, {'$eq': " + "['$queries__orderitem.status', " + "ObjectId('6891ff7822e475eddc20f159')]}, {'$eq': ['$name', " + "'My Order']}]}}}, {'$addFields': {'_id': '$_id'}}, " + "{'$sort': SON([('_id', 1)])}])", + ) + + def test_negated_related_filter_is_not_pushable(self): + with self.assertNumQueries(1) as ctx: + list(Book.objects.filter(~models.Q(author__name="John"))) + query = ctx.captured_queries[0]["sql"] + self.assertEqual( + query, + "db.queries__book.aggregate([{'$lookup': {'from': " + "'queries__author', 'let': {'parent__field__0': '$author_id'}, " + "'pipeline': [{'$match': {'$expr': {'$and': [{'$eq': " + "['$$parent__field__0', '$_id']}]}}}], 'as': 'queries__author'}}, " + "{'$unwind': '$queries__author'}, {'$match': {'$expr': " + "{'$not': {'$eq': ['$queries__author.name', 'John']}}}}])", + ) + + def test_or_on_local_fields_only(self): + with self.assertNumQueries(1) as ctx: + list(Order.objects.filter(models.Q(name="A") | models.Q(name="B"))) + query = ctx.captured_queries[0]["sql"] + self.assertEqual( + query, + "db.queries__order.aggregate([{'$match': {'$expr': {'$or': " + "[{'$eq': ['$name', 'A']}, {'$eq': ['$name', 'B']}]}}}, " + "{'$addFields': {'_id': '$_id'}}, {'$sort': SON([('_id', 1)])}])", + ) + + def test_or_with_mixed_pushable_and_non_pushable_fields(self): + with self.assertNumQueries(1) as ctx: + list(Book.objects.filter(models.Q(author__name="John") | models.Q(title="Don"))) + query = ctx.captured_queries[0]["sql"] + self.assertEqual( + query, + "db.queries__book.aggregate([{'$lookup': {'from': " + "'queries__author', 'let': {'parent__field__0': '$author_id'}, " + "'pipeline': [{'$match': {'$expr': {'$and': [{'$eq': " + "['$$parent__field__0', '$_id']}]}}}], 'as': 'queries__author'}}, " + "{'$unwind': '$queries__author'}, {'$match': {'$expr': {'$or': " + "[{'$eq': ['$queries__author.name', 'John']}, {'$eq': ['$title', " + "'Don']}]}}}])", + ) + + def test_push_equality_between_parent_and_child_fields(self): + with self.assertNumQueries(1) as ctx: + list(Order.objects.filter(items__status=models.F("id"))) + query = ctx.captured_queries[0]["sql"] + self.assertEqual( + query, + "db.queries__order.aggregate([{'$lookup': {'from': 'queries__orderitem', " + "'let': {'parent__field__0': '$_id', 'parent__field__1': '$_id'}, " + "'pipeline': [{'$match': {'$expr': {'$and': [{'$eq': " + "['$$parent__field__0', '$order_id']}, {'$eq': ['$status', " + "'$$parent__field__1']}]}}}], 'as': 'queries__orderitem'}}, " + "{'$unwind': '$queries__orderitem'}, {'$match': {'$expr': " + "{'$eq': ['$queries__orderitem.status', '$_id']}}}, " + "{'$addFields': {'_id': '$_id'}}, {'$sort': SON([('_id', 1)])}])", + ) + + +class M2MLookupConditionPushdownTests(TestCase): + def test_simple_related_filter_is_pushed(self): + with self.assertNumQueries(1) as ctx: + list(Library.objects.filter(readers__name="Alice")) + query = ctx.captured_queries[0]["sql"] + expected_query = """ + db.queries__library.aggregate([{ + "$lookup": { + "from": "queries__library_readers", + "let": { + "parent__field__0": "$_id" + }, + "pipeline": [{ + "$match": { + "$expr": { + "$and": [{ + "$eq": [ + "$$parent__field__0", + "$library_id" + ]} + ] + } + } + }], + "as": "queries__library_readers" + } + }, + {"$unwind": "$queries__library_readers"}, + {"$lookup": { + "from": "queries__reader", + "let": { + "parent__field__0": "$queries__library_readers.reader_id" + }, + "pipeline": [ + { + "$match": { + "$expr": { + "$and": [ + {"$eq": ["$$parent__field__0", "$_id"]}, + {"$eq": ["$name", "Alice"]} + ] + } + } + } + ], "as": "queries__reader" + }}, + {"$unwind": "$queries__reader"}, + {"$match": {"$expr": {"$eq": ["$queries__reader.name", "Alice"]}}} + ]) + """ + self.assertEqual(query, uglify_mongo_aggregate(expected_query)) + + def test_subquery_join_is_pushed(self): + with self.assertNumQueries(1) as ctx: + list(Library.objects.filter(~models.Q(readers__name="Alice"))) + query = ctx.captured_queries[0]["sql"] + expected_query = """ +db.queries__library.aggregate([ + { + "$lookup": { + "as": "__subquery0", + "from": "queries__library_readers", + "let": {"parent__field__0": "$_id"}, + "pipeline": [ + { + "$lookup": { + "from": "queries__reader", + "let": {"parent__field__0": "$reader_id"}, + "pipeline": [ + { + "$match": { + "$expr": { + "$and": [ + {"$eq": ["$$parent__field__0", "$_id"]}, + {"$eq": ["$name", "Alice"]} + ] + } + } + } + ], + "as": "U2" + } + }, + {"$unwind": "$U2"}, + { + "$match": { + "$expr": { + "$and": [ + {"$eq": ["$U2.name", "Alice"]}, + {"$eq": ["$library_id","$$parent__field__0"]} + ] + } + } + }, + {"$project": {"a": {"$literal": 1}}}, + {"$limit": 1} + ] + } + }, + { + "$set": { + "__subquery0": { + "$cond": { + "if": { + "$or": [ + {"$eq": [{"$type": "$__subquery0"}, "missing"]}, + {"$eq": [{"$size": "$__subquery0"}, 0]} + ] + }, + "then": {}, + "else": {"$arrayElemAt": ["$__subquery0",0]} + } + } + } + }, + { + "$match": { + "$expr": { + "$not": { + "$eq": [ + { + "$not": { + "$or": [ + {"$eq": [{"$type": "$__subquery0.a"}, "missing"]}, + {"$eq": ["$__subquery0.a", null]} + ] + } + }, + true + ] + } + } + } + } +]) +""" + self.assertEqual(query, uglify_mongo_aggregate(expected_query)) + + def test_filter_on_local_and_related_fields(self): + with self.assertNumQueries(1) as ctx: + list(Library.objects.filter(name="Central", readers__name="Alice")) + query = ctx.captured_queries[0]["sql"] + expected_query = """ + db.queries__library.aggregate( +[ + { + "$lookup": { + "from": "queries__library_readers", + "let": {"parent__field__0": "$_id"}, + "pipeline": [ + { + "$match": { + "$expr": { + "$and": [{"$eq": ["$$parent__field__0", "$library_id"]}] + } + } + } + ], + "as": "queries__library_readers" + } + }, + { + "$unwind": "$queries__library_readers" + }, + { + "$lookup": { + "from": "queries__reader", + "let": {"parent__field__0": "$queries__library_readers.reader_id"}, + "pipeline": [ + { + "$match": { + "$expr": { + "$and": [ + {"$eq": ["$$parent__field__0", "$_id"]}, + {"$eq": ["$name", "Alice"]} + ] + } + } + } + ], + "as": "queries__reader" + } + }, + {"$unwind": "$queries__reader"}, + { + "$match": { + "$expr": { + "$and": [ + {"$eq": ["$name", "Central"]}, + {"$eq": ["$queries__reader.name", "Alice"]} + ] + } + } + } +] +) +""" + self.assertEqual(query, uglify_mongo_aggregate(expected_query)) + + def test_or_on_local_fields_only(self): + with self.assertNumQueries(1) as ctx: + list( + Library.objects.annotate(foreing_field=models.F("readers__name")).filter( + name="Ateneo" + ) + ) + query = ctx.captured_queries[0]["sql"] + expected_query = """ + db.queries__library.aggregate( +[ + { + "$lookup": { + "from": "queries__library_readers", + "let": {"parent__field__0": "$_id"}, + "pipeline": [ + { + "$match": { + "$expr": {"$and": [{"$eq": ["$$parent__field__0","$library_id"]}]} + } + } + ], + "as": "queries__library_readers" + } + }, + { + "$set": { + "queries__library_readers": { + "$cond": { + "if": { + "$or": [ + {"$eq": [{"$type": "$queries__library_readers"}, "missing"]}, + {"$eq": [{"$size": "$queries__library_readers"}, 0]} + ] + }, + "then": [{}], + "else": "$queries__library_readers" + } + } + } + }, + {"$unwind": "$queries__library_readers"}, + { + "$lookup": { + "from": "queries__reader", + "let": { + "parent__field__0": "$queries__library_readers.reader_id" + }, + "pipeline": [ + { + "$match": { + "$expr": {"$and": [{"$eq": ["$$parent__field__0", "$_id"]}]} + } + } + ], + "as": "queries__reader" + } + }, + { + "$set": { + "queries__reader": { + "$cond": { + "if": { + "$or": [ + {"$eq": [{"$type": "$queries__reader"}, "missing"]}, + {"$eq": [{"$size": "$queries__reader"}, 0]} + ] + }, + "then": [{}], + "else": "$queries__reader" + } + } + } + }, + {"$unwind": "$queries__reader"}, + {"$match": {"$expr": {"$eq": ["$name", "Ateneo"]}}}, + { + "$project": { + "queries__reader": {"foreing_field": "$queries__reader.name"}, + "_id": 1, + "name": 1 + } + } +]) +""" + self.assertEqual(query, uglify_mongo_aggregate(expected_query)) + + def test_or_with_mixed_pushable_and_non_pushable_fields(self): + with self.assertNumQueries(1) as ctx: + list(Library.objects.filter(models.Q(readers__name="Alice") | models.Q(name="Central"))) + query = ctx.captured_queries[0]["sql"] + expected_query = """ +db.queries__library.aggregate([ + { + "$lookup": { + "from": "queries__library_readers", + "let": {"parent__field__0": "$_id"}, + "pipeline": [ + { + "$match": { + "$expr": {"$and": [{"$eq": ["$$parent__field__0", "$library_id"]}] + } + } + } + ], + "as": "queries__library_readers" + } + }, + { + "$set": { + "queries__library_readers": { + "$cond": { + "if": { + "$or": [ + {"$eq": [{"$type": "$queries__library_readers"}, "missing"]}, + {"$eq": [{"$size": "$queries__library_readers"}, 0]} + ] + }, + "then": [{}], + "else": "$queries__library_readers" + } + } + } + }, + {"$unwind": "$queries__library_readers"}, + { + "$lookup": { + "from": "queries__reader", + "let": {"parent__field__0": "$queries__library_readers.reader_id"}, + "pipeline": [ + { + "$match": { + "$expr": {"$and": [{"$eq": ["$$parent__field__0", "$_id"]}]} + } + } + ], + "as": "queries__reader" + } + }, + { + "$set": { + "queries__reader": { + "$cond": { + "if": { + "$or": [ + {"$eq": [{"$type": "$queries__reader"}, "missing"]}, + {"$eq": [{"$size": "$queries__reader"}, 0]} + ] + }, + "then": [{}], + "else": "$queries__reader" + } + } + } + }, + {"$unwind": "$queries__reader"}, + { + "$match": { + "$expr": { + "$or": [ + {"$eq": ["$queries__reader.name", "Alice"]}, + {"$eq": ["$name", "Central"]} + ] + } + } + } +]) +""" + self.assertEqual(query, uglify_mongo_aggregate(expected_query))