Skip to content

Commit 060a3ef

Browse files
authored
feat: query profiling part 2: asynchronous (#961)
* feat: support query profiling * collection * fix unit tests * unit tests * vector get and stream, unit tests * aggregation get and stream, unit tests * docstring * query profile unit tests * update base classes' method signature * documentsnapshotlist unit tests * func signatures * undo client.py change * transaction.get() * lint * system test * fix shim test * fix sys test * fix sys test * system test * another system test * skip system test in emulator * stream generator unit tests * coverage * add system tests * small fixes * undo document change * add system tests * vector query system tests * format * fix system test * comments * add system tests * improve stream generator * type checking * adding stars * delete comment * remove coverage requirements for type checking part * add explain_options to StreamGenerator * yield tuple instead * raise exception when explain_metrics is absent * refactor documentsnapshotlist into queryresultslist * add comment * improve type hint * lint * move QueryResultsList to stream_generator.py * aggregation related type annotation * transaction return type hint * refactor QueryResultsList * change stream generator to return ExplainMetrics instead of yield * update aggregation query to use the new generator * update query to use the new generator * update vector query to use the new generator * lint * type annotations * fix type annotation to be python 3.9 compatible * fix type hint for python 3.8 * fix system test * add test coverage * use class method get_explain_metrics() instead of property explain_metrics * feat: add explain_metrics to async generator * async support for query * system tests for query * query profile for async vector query * vector query system test * async transaction * async transaction system test * async collection * fix system test * test coverage * test coverage * collection system test * async aggregation * lint * cover * lint * aggregation system tests * cover and fix system test * delete type ignore * improve type annotation * mypy * mypy * address comments * delete comments * address comments
1 parent 1d2a494 commit 060a3ef

25 files changed

+2189
-276
lines changed

google/cloud/firestore_v1/async_aggregation.py

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,23 @@
2020
"""
2121
from __future__ import annotations
2222

23-
from typing import TYPE_CHECKING, AsyncGenerator, List, Optional, Union
23+
from typing import TYPE_CHECKING, Any, AsyncGenerator, List, Optional, Union
2424

2525
from google.api_core import gapic_v1
2626
from google.api_core import retry_async as retries
2727

2828
from google.cloud.firestore_v1 import transaction
2929
from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
3030
from google.cloud.firestore_v1.base_aggregation import (
31-
AggregationResult,
3231
BaseAggregationQuery,
3332
_query_response_to_result,
3433
)
34+
from google.cloud.firestore_v1.query_results import QueryResultsList
3535

3636
if TYPE_CHECKING: # pragma: NO COVER
37-
from google.cloud.firestore_v1.base_document import DocumentSnapshot
37+
from google.cloud.firestore_v1.base_aggregation import AggregationResult
38+
from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions
39+
import google.cloud.firestore_v1.types.query_profile as query_profile_pb
3840

3941

4042
class AsyncAggregationQuery(BaseAggregationQuery):
@@ -53,7 +55,9 @@ async def get(
5355
retries.AsyncRetry, None, gapic_v1.method._MethodDefault
5456
] = gapic_v1.method.DEFAULT,
5557
timeout: float | None = None,
56-
) -> List[List[AggregationResult]]:
58+
*,
59+
explain_options: Optional[ExplainOptions] = None,
60+
) -> QueryResultsList[List[AggregationResult]]:
5761
"""Runs the aggregation query.
5862
5963
This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
@@ -69,23 +73,39 @@ async def get(
6973
should be retried. Defaults to a system-specified policy.
7074
timeout (float): The timeout for this request. Defaults to a
7175
system-specified value.
76+
explain_options
77+
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
78+
Options to enable query profiling for this query. When set,
79+
explain_metrics will be available on the returned generator.
7280
7381
Returns:
74-
List[List[AggregationResult]]: The aggregation query results
82+
QueryResultsList[List[AggregationResult]]: The aggregation query results.
7583
7684
"""
85+
explain_metrics: ExplainMetrics | None = None
86+
7787
stream_result = self.stream(
78-
transaction=transaction, retry=retry, timeout=timeout
88+
transaction=transaction,
89+
retry=retry,
90+
timeout=timeout,
91+
explain_options=explain_options,
7992
)
8093
result = [aggregation async for aggregation in stream_result]
81-
return result # type: ignore
94+
95+
if explain_options is None:
96+
explain_metrics = None
97+
else:
98+
explain_metrics = await stream_result.get_explain_metrics()
99+
100+
return QueryResultsList(result, explain_options, explain_metrics)
82101

83102
async def _make_stream(
84103
self,
85104
transaction: Optional[transaction.Transaction] = None,
86105
retry: Optional[retries.AsyncRetry] = gapic_v1.method.DEFAULT,
87106
timeout: Optional[float] = None,
88-
) -> Union[AsyncGenerator[List[AggregationResult], None]]:
107+
explain_options: Optional[ExplainOptions] = None,
108+
) -> AsyncGenerator[List[AggregationResult] | query_profile_pb.ExplainMetrics, Any]:
89109
"""Internal method for stream(). Runs the aggregation query.
90110
91111
This sends a ``RunAggregationQuery`` RPC and then returns a generator which
@@ -105,15 +125,23 @@ async def _make_stream(
105125
system-specified policy.
106126
timeout (Optional[float]): The timeout for this request. Defaults
107127
to a system-specified value.
128+
explain_options
129+
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
130+
Options to enable query profiling for this query. When set,
131+
explain_metrics will be available on the returned generator.
108132
109133
Yields:
110-
:class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`:
111-
The result of aggregations of this query
134+
List[AggregationResult] | query_profile_pb.ExplainMetrics:
135+
The result of aggregations of this query. Query results will be
136+
yielded as `List[AggregationResult]`. When the result contains
137+
returned explain metrics, yield `query_profile_pb.ExplainMetrics`
138+
individually.
112139
"""
113140
request, kwargs = self._prep_stream(
114141
transaction,
115142
retry,
116143
timeout,
144+
explain_options,
117145
)
118146

119147
response_iterator = await self._client._firestore_api.run_aggregation_query(
@@ -124,14 +152,21 @@ async def _make_stream(
124152

125153
async for response in response_iterator:
126154
result = _query_response_to_result(response)
127-
yield result
155+
if result:
156+
yield result
157+
158+
if response.explain_metrics:
159+
metrics = response.explain_metrics
160+
yield metrics
128161

129162
def stream(
130163
self,
131164
transaction: Optional[transaction.Transaction] = None,
132165
retry: Optional[retries.AsyncRetry] = gapic_v1.method.DEFAULT,
133166
timeout: Optional[float] = None,
134-
) -> "AsyncStreamGenerator[DocumentSnapshot]":
167+
*,
168+
explain_options: Optional[ExplainOptions] = None,
169+
) -> AsyncStreamGenerator[List[AggregationResult]]:
135170
"""Runs the aggregation query.
136171
137172
This sends a ``RunAggregationQuery`` RPC and then returns a generator
@@ -150,15 +185,20 @@ def stream(
150185
system-specified policy.
151186
timeout (Optional[float]): The timeout for this request. Defaults
152187
to a system-specified value.
188+
explain_options
189+
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
190+
Options to enable query profiling for this query. When set,
191+
explain_metrics will be available on the returned generator.
153192
154193
Returns:
155-
`AsyncStreamGenerator[DocumentSnapshot]`:
194+
`AsyncStreamGenerator[List[AggregationResult]]`:
156195
A generator of the query results.
157196
"""
158197

159198
inner_generator = self._make_stream(
160199
transaction=transaction,
161200
retry=retry,
162201
timeout=timeout,
202+
explain_options=explain_options,
163203
)
164-
return AsyncStreamGenerator(inner_generator)
204+
return AsyncStreamGenerator(inner_generator, explain_options)

google/cloud/firestore_v1/async_collection.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
"""Classes for representing collections for the Google Cloud Firestore API."""
16+
from __future__ import annotations
1617

1718
from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional, Tuple
1819

@@ -35,6 +36,8 @@
3536
if TYPE_CHECKING: # pragma: NO COVER
3637
from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
3738
from google.cloud.firestore_v1.base_document import DocumentSnapshot
39+
from google.cloud.firestore_v1.query_profile import ExplainOptions
40+
from google.cloud.firestore_v1.query_results import QueryResultsList
3841

3942

4043
class AsyncCollectionReference(BaseCollectionReference[async_query.AsyncQuery]):
@@ -192,7 +195,9 @@ async def get(
192195
transaction: Optional[transaction.Transaction] = None,
193196
retry: Optional[retries.AsyncRetry] = gapic_v1.method.DEFAULT,
194197
timeout: Optional[float] = None,
195-
) -> list:
198+
*,
199+
explain_options: Optional[ExplainOptions] = None,
200+
) -> QueryResultsList[DocumentSnapshot]:
196201
"""Read the documents in this collection.
197202
198203
This sends a ``RunQuery`` RPC and returns a list of documents
@@ -207,14 +212,21 @@ async def get(
207212
system-specified policy.
208213
timeout (Otional[float]): The timeout for this request. Defaults
209214
to a system-specified value.
215+
explain_options
216+
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
217+
Options to enable query profiling for this query. When set,
218+
explain_metrics will be available on the returned generator.
210219
211220
If a ``transaction`` is used and it already has write operations added,
212221
this method cannot be used (i.e. read-after-write is not allowed).
213222
214223
Returns:
215-
list: The documents in this collection that match the query.
224+
QueryResultsList[DocumentSnapshot]:
225+
The documents in this collection that match the query.
216226
"""
217227
query, kwargs = self._prep_get_or_stream(retry, timeout)
228+
if explain_options is not None:
229+
kwargs["explain_options"] = explain_options
218230

219231
return await query.get(transaction=transaction, **kwargs)
220232

@@ -223,7 +235,9 @@ def stream(
223235
transaction: Optional[transaction.Transaction] = None,
224236
retry: Optional[retries.AsyncRetry] = gapic_v1.method.DEFAULT,
225237
timeout: Optional[float] = None,
226-
) -> "AsyncStreamGenerator[DocumentSnapshot]":
238+
*,
239+
explain_options: Optional[ExplainOptions] = None,
240+
) -> AsyncStreamGenerator[DocumentSnapshot]:
227241
"""Read the documents in this collection.
228242
229243
This sends a ``RunQuery`` RPC and then returns a generator which
@@ -250,11 +264,17 @@ def stream(
250264
system-specified policy.
251265
timeout (Optional[float]): The timeout for this request. Defaults
252266
to a system-specified value.
267+
explain_options
268+
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
269+
Options to enable query profiling for this query. When set,
270+
explain_metrics will be available on the returned generator.
253271
254272
Returns:
255273
`AsyncStreamGenerator[DocumentSnapshot]`: A generator of the query
256274
results.
257275
"""
258276
query, kwargs = self._prep_get_or_stream(retry, timeout)
277+
if explain_options:
278+
kwargs["explain_options"] = explain_options
259279

260280
return query.stream(transaction=transaction, **kwargs)

0 commit comments

Comments
 (0)