Skip to content

Commit 5a370b6

Browse files
committed
fix: dereference & aggregation issues
- Add async_select_related() method for async QuerySets - Fix AsyncDeReference to handle AsyncCursor and AsyncReferenceProxy - Make async_aggregate() return AsyncAggregationIterator for direct async for usage - Add async_in_bulk() method for efficient bulk fetching - Fix variable reuse bug in _attach_objects method
1 parent e247d17 commit 5a370b6

File tree

5 files changed

+683
-93
lines changed

5 files changed

+683
-93
lines changed

mongoengine/async_iterators.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""Async iterator wrappers for MongoDB operations."""
2+
3+
4+
class AsyncAggregationIterator:
5+
"""Wrapper to make async_aggregate work with async for directly."""
6+
7+
def __init__(self, queryset, pipeline, kwargs):
8+
self.queryset = queryset
9+
self.pipeline = pipeline
10+
self.kwargs = kwargs
11+
self._cursor = None
12+
13+
def __aiter__(self):
14+
"""Return self as async iterator."""
15+
return self
16+
17+
async def __anext__(self):
18+
"""Get next item from the aggregation cursor."""
19+
if self._cursor is None:
20+
# Lazy initialization - execute the aggregation on first iteration
21+
self._cursor = await self._execute_aggregation()
22+
23+
return await self._cursor.__anext__()
24+
25+
async def _execute_aggregation(self):
26+
"""Execute the actual aggregation and return the cursor."""
27+
from mongoengine.async_utils import (
28+
_get_async_session,
29+
ensure_async_connection,
30+
)
31+
from mongoengine.connection import DEFAULT_CONNECTION_NAME
32+
33+
alias = self.queryset._document._meta.get("db_alias", DEFAULT_CONNECTION_NAME)
34+
ensure_async_connection(alias)
35+
36+
if not isinstance(self.pipeline, (tuple, list)):
37+
raise TypeError(
38+
f"Starting from 1.0 release pipeline must be a list/tuple, received: {type(self.pipeline)}"
39+
)
40+
41+
initial_pipeline = []
42+
if self.queryset._none or self.queryset._empty:
43+
initial_pipeline.append({"$limit": 1})
44+
initial_pipeline.append({"$match": {"$expr": False}})
45+
46+
if self.queryset._query:
47+
initial_pipeline.append({"$match": self.queryset._query})
48+
49+
if self.queryset._ordering:
50+
initial_pipeline.append({"$sort": dict(self.queryset._ordering)})
51+
52+
if self.queryset._limit is not None:
53+
initial_pipeline.append(
54+
{"$limit": self.queryset._limit + (self.queryset._skip or 0)}
55+
)
56+
57+
if self.queryset._skip is not None:
58+
initial_pipeline.append({"$skip": self.queryset._skip})
59+
60+
# geoNear and collStats must be the first stages in the pipeline if present
61+
first_step = []
62+
new_user_pipeline = []
63+
for step in self.pipeline:
64+
if "$geoNear" in step:
65+
first_step.append(step)
66+
elif "$collStats" in step:
67+
first_step.append(step)
68+
else:
69+
new_user_pipeline.append(step)
70+
71+
final_pipeline = first_step + initial_pipeline + new_user_pipeline
72+
73+
collection = await self.queryset._async_get_collection()
74+
if (
75+
self.queryset._read_preference is not None
76+
or self.queryset._read_concern is not None
77+
):
78+
collection = collection.with_options(
79+
read_preference=self.queryset._read_preference,
80+
read_concern=self.queryset._read_concern,
81+
)
82+
83+
if self.queryset._hint not in (-1, None):
84+
self.kwargs.setdefault("hint", self.queryset._hint)
85+
if self.queryset._collation:
86+
self.kwargs.setdefault("collation", self.queryset._collation)
87+
if self.queryset._comment:
88+
self.kwargs.setdefault("comment", self.queryset._comment)
89+
90+
# Get async session if available
91+
session = await _get_async_session()
92+
if session:
93+
self.kwargs["session"] = session
94+
95+
return await collection.aggregate(
96+
final_pipeline,
97+
cursor={},
98+
**self.kwargs,
99+
)

0 commit comments

Comments
 (0)