Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3f9b65f
chore: updated gapic layer for execute_query
daniel-sanche May 8, 2025
dc808b5
Merge branch 'main' into pipeline_queries_approved
daniel-sanche Jun 9, 2025
17e71b9
feat: add pipelines structure (#1046)
daniel-sanche Jun 17, 2025
646c32f
Merge branch 'main' into pipeline_queries_approved
daniel-sanche Jun 23, 2025
6e06336
feat: add primary pipeline stages (#1048)
daniel-sanche Jul 16, 2025
5fde194
chore: Merge branch 'main' into pipeline_queries_approved
daniel-sanche Oct 17, 2025
cd578c1
chore: update and refactor pipeline expressions (#1111)
daniel-sanche Oct 22, 2025
fed7af2
feat: query to pipeline conversion (#1071)
daniel-sanche Oct 23, 2025
643f014
feat: Additional Pipeline Expressions (#1115)
daniel-sanche Oct 29, 2025
c62f3d9
chore: improve pipelines tests (#1116)
daniel-sanche Oct 30, 2025
b5b0bd7
chore: Pipeline queries cleanup (#1118)
daniel-sanche Oct 30, 2025
9a35dfe
feat: replace_with pipeline stage (#1121)
daniel-sanche Oct 31, 2025
8e83a40
chore: remove is_nan and is_null (#1123)
daniel-sanche Nov 6, 2025
b77002d
feat: pipelines create_from() (#1124)
daniel-sanche Nov 7, 2025
aef4391
feat: pipeline read time (#1125)
daniel-sanche Nov 7, 2025
2d3ed73
feat: improve pipeline expressions (#1126)
daniel-sanche Nov 11, 2025
4848fbe
feat: pipeline explain stats and index mode (#1128)
daniel-sanche Nov 11, 2025
18dfc6a
chore: update docstring for pipelines array (#1129)
daniel-sanche Dec 16, 2025
8026ced
chore: import main back into pipeline_preview branch
daniel-sanche Jan 9, 2026
631bda8
updated filtered warnings
daniel-sanche Jan 9, 2026
4ee909a
removed duplicate _BaseExecutePipeline
daniel-sanche Jan 10, 2026
bbe8f45
chore(tests): re-enable pipeline system tests on kokoro (#1153)
daniel-sanche Jan 13, 2026
1e0ec96
chore: rename Function to FunctionExpression (#1155)
daniel-sanche Jan 14, 2026
b3f53fa
Merge branch 'main' into pipeline-preview-public
daniel-sanche Jan 14, 2026
88e6dfe
chore: revert generated files
daniel-sanche Jan 14, 2026
987c923
chore: fix typing for 3.8
daniel-sanche Jan 14, 2026
27af228
chore(tests): updated generated tests
daniel-sanche Jan 14, 2026
f112a85
chore(tests): improve client mocking in unit tests
daniel-sanche Jan 14, 2026
4c33ae8
chore: fix mypy check
daniel-sanche Jan 14, 2026
61ae07a
chore(tests): disable pipeline tests in emulator
daniel-sanche Jan 14, 2026
71baf58
chore(tests): disable pipeline read time test in emulator
daniel-sanche Jan 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions google/cloud/firestore_v1/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def __ne__(self, other):
else:
return not equality_val

def __repr__(self):
return f"{type(self).__name__}(latitude={self.latitude}, longitude={self.longitude})"


def verify_path(path, is_collection) -> None:
"""Verifies that a ``path`` has the correct form.
Expand Down
9 changes: 9 additions & 0 deletions google/cloud/firestore_v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
from google.cloud.firestore_v1.services.firestore.transports import (
grpc_asyncio as firestore_grpc_transport,
)
from google.cloud.firestore_v1.async_pipeline import AsyncPipeline
from google.cloud.firestore_v1.pipeline_source import PipelineSource

if TYPE_CHECKING: # pragma: NO COVER
import datetime
Expand Down Expand Up @@ -438,3 +440,10 @@ def transaction(
A transaction attached to this client.
"""
return AsyncTransaction(self, max_attempts=max_attempts, read_only=read_only)

@property
def _pipeline_cls(self):
return AsyncPipeline

def pipeline(self) -> PipelineSource:
return PipelineSource(self)
134 changes: 134 additions & 0 deletions google/cloud/firestore_v1/async_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
.. warning::
**Preview API**: Firestore Pipelines is currently in preview and is
subject to potential breaking changes in future releases
"""

from __future__ import annotations
from typing import TYPE_CHECKING
from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream
from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot
from google.cloud.firestore_v1.pipeline_result import PipelineResult

if TYPE_CHECKING: # pragma: NO COVER
import datetime
from google.cloud.firestore_v1.async_client import AsyncClient
from google.cloud.firestore_v1.async_transaction import AsyncTransaction
from google.cloud.firestore_v1.pipeline_expressions import Constant
from google.cloud.firestore_v1.types.document import Value
from google.cloud.firestore_v1.query_profile import PipelineExplainOptions


class AsyncPipeline(_BasePipeline):
"""
Pipelines allow for complex data transformations and queries involving
multiple stages like filtering, projection, aggregation, and vector search.

This class extends `_BasePipeline` and provides methods to execute the
defined pipeline stages using an asynchronous `AsyncClient`.

Usage Example:
>>> from google.cloud.firestore_v1.pipeline_expressions import Field
>>>
>>> async def run_pipeline():
... client = AsyncClient(...)
... pipeline = client.pipeline()
... .collection("books")
... .where(Field.of("published").gt(1980))
... .select("title", "author")
... async for result in pipeline.stream():
... print(result)

Use `client.pipeline()` to create instances of this class.

.. warning::
**Preview API**: Firestore Pipelines is currently in preview and is
subject to potential breaking changes in future releases
"""

def __init__(self, client: AsyncClient, *stages: stages.Stage):
"""
Initializes an asynchronous Pipeline.

Args:
client: The asynchronous `AsyncClient` instance to use for execution.
*stages: Initial stages for the pipeline.
"""
super().__init__(client, *stages)

async def execute(
self,
*,
transaction: "AsyncTransaction" | None = None,
read_time: datetime.datetime | None = None,
explain_options: PipelineExplainOptions | None = None,
additional_options: dict[str, Value | Constant] = {},
) -> PipelineSnapshot[PipelineResult]:
"""
Executes this pipeline and returns results as a list

Args:
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a microsecond precision timestamp within the past one hour, or
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
within the past 7 days. For the most accurate results, use UTC timezone.
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned list.
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
These options will take precedence over method argument if there is a conflict (e.g. explain_options)
"""
kwargs = {k: v for k, v in locals().items() if k != "self"}
stream = AsyncPipelineStream(PipelineResult, self, **kwargs)
results = [result async for result in stream]
return PipelineSnapshot(results, stream)

def stream(
self,
*,
read_time: datetime.datetime | None = None,
transaction: "AsyncTransaction" | None = None,
explain_options: PipelineExplainOptions | None = None,
additional_options: dict[str, Value | Constant] = {},
) -> AsyncPipelineStream[PipelineResult]:
"""
Process this pipeline as a stream, providing results through an AsyncIterable

Args:
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a microsecond precision timestamp within the past one hour, or
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
within the past 7 days. For the most accurate results, use UTC timezone.
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
These options will take precedence over method argument if there is a conflict (e.g. explain_options)
"""
kwargs = {k: v for k, v in locals().items() if k != "self"}
return AsyncPipelineStream(PipelineResult, self, **kwargs)
59 changes: 58 additions & 1 deletion google/cloud/firestore_v1/base_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
from __future__ import annotations

import abc
import itertools

from abc import ABC
from typing import TYPE_CHECKING, Any, Coroutine, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Coroutine, List, Optional, Tuple, Union, Iterable

from google.api_core import gapic_v1
from google.api_core import retry as retries
Expand All @@ -33,6 +34,10 @@
from google.cloud.firestore_v1.types import (
StructuredAggregationQuery,
)
from google.cloud.firestore_v1.pipeline_expressions import AggregateFunction
from google.cloud.firestore_v1.pipeline_expressions import Count
from google.cloud.firestore_v1.pipeline_expressions import AliasedExpression
from google.cloud.firestore_v1.pipeline_expressions import Field

# Types needed only for Type Hints
if TYPE_CHECKING: # pragma: NO COVER
Expand All @@ -43,6 +48,7 @@
from google.cloud.firestore_v1.stream_generator import (
StreamGenerator,
)
from google.cloud.firestore_v1.pipeline_source import PipelineSource

import datetime

Expand All @@ -66,6 +72,9 @@ def __init__(self, alias: str, value: float, read_time=None):
def __repr__(self):
return f"<Aggregation alias={self.alias}, value={self.value}, readtime={self.read_time}>"

def _to_dict(self):
return {self.alias: self.value}


class BaseAggregation(ABC):
def __init__(self, alias: str | None = None):
Expand All @@ -75,6 +84,27 @@ def __init__(self, alias: str | None = None):
def _to_protobuf(self):
"""Convert this instance to the protobuf representation"""

@abc.abstractmethod
def _to_pipeline_expr(
self, autoindexer: Iterable[int]
) -> AliasedExpression[AggregateFunction]:
"""
Convert this instance to a pipeline expression for use with pipeline.aggregate()

Args:
autoindexer: If an alias isn't supplied, one should be created with the format "field_n"
The autoindexer is an iterable that provides the `n` value to use for each expression
"""

def _pipeline_alias(self, autoindexer):
"""
Helper to build the alias for the pipeline expression
"""
if self.alias is not None:
return self.alias
else:
return f"field_{next(autoindexer)}"


class CountAggregation(BaseAggregation):
def __init__(self, alias: str | None = None):
Expand All @@ -88,6 +118,9 @@ def _to_protobuf(self):
aggregation_pb.count = StructuredAggregationQuery.Aggregation.Count()
return aggregation_pb

def _to_pipeline_expr(self, autoindexer: Iterable[int]):
return Count().as_(self._pipeline_alias(autoindexer))


class SumAggregation(BaseAggregation):
def __init__(self, field_ref: str | FieldPath, alias: str | None = None):
Expand All @@ -107,6 +140,9 @@ def _to_protobuf(self):
aggregation_pb.sum.field.field_path = self.field_ref
return aggregation_pb

def _to_pipeline_expr(self, autoindexer: Iterable[int]):
return Field.of(self.field_ref).sum().as_(self._pipeline_alias(autoindexer))


class AvgAggregation(BaseAggregation):
def __init__(self, field_ref: str | FieldPath, alias: str | None = None):
Expand All @@ -126,6 +162,9 @@ def _to_protobuf(self):
aggregation_pb.avg.field.field_path = self.field_ref
return aggregation_pb

def _to_pipeline_expr(self, autoindexer: Iterable[int]):
return Field.of(self.field_ref).average().as_(self._pipeline_alias(autoindexer))


def _query_response_to_result(
response_pb,
Expand Down Expand Up @@ -317,3 +356,21 @@ def stream(
StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator[List[AggregationResult]]:
A generator of the query results.
"""

def _build_pipeline(self, source: "PipelineSource"):
"""
Convert this query into a Pipeline

Queries containing a `cursor` or `limit_to_last` are not currently supported

Args:
source: the PipelineSource to build the pipeline off of
Raises:
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
Returns:
a Pipeline representing the query
"""
# use autoindexer to keep track of which field number to use for un-aliased fields
autoindexer = itertools.count(start=1)
exprs = [a._to_pipeline_expr(autoindexer) for a in self._aggregations]
return self._nested_query._build_pipeline(source).aggregate(*exprs)
17 changes: 17 additions & 0 deletions google/cloud/firestore_v1/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
Optional,
Tuple,
Union,
Type,
)

import google.api_core.client_options
Expand All @@ -61,6 +62,8 @@
from google.cloud.firestore_v1.bulk_writer import BulkWriter, BulkWriterOptions
from google.cloud.firestore_v1.field_path import render_field_path
from google.cloud.firestore_v1.services.firestore import client as firestore_client
from google.cloud.firestore_v1.pipeline_source import PipelineSource
from google.cloud.firestore_v1.base_pipeline import _BasePipeline

DEFAULT_DATABASE = "(default)"
"""str: The default database used in a :class:`~google.cloud.firestore_v1.client.Client`."""
Expand Down Expand Up @@ -502,6 +505,20 @@ def transaction(
) -> BaseTransaction:
raise NotImplementedError

def pipeline(self) -> PipelineSource:
"""
Start a pipeline with this client.

Returns:
:class:`~google.cloud.firestore_v1.pipeline_source.PipelineSource`:
A pipeline that uses this client`
"""
raise NotImplementedError

@property
def _pipeline_cls(self) -> Type["_BasePipeline"]:
raise NotImplementedError


def _reference_info(references: list) -> Tuple[list, dict]:
"""Get information about document references.
Expand Down
16 changes: 16 additions & 0 deletions google/cloud/firestore_v1/base_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from google.cloud.firestore_v1.async_document import AsyncDocumentReference
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.field_path import FieldPath
from google.cloud.firestore_v1.pipeline_source import PipelineSource
from google.cloud.firestore_v1.query_profile import ExplainOptions
from google.cloud.firestore_v1.query_results import QueryResultsList
from google.cloud.firestore_v1.stream_generator import StreamGenerator
Expand Down Expand Up @@ -603,6 +604,21 @@ def find_nearest(
distance_threshold=distance_threshold,
)

def _build_pipeline(self, source: "PipelineSource"):
"""
Convert this query into a Pipeline

Queries containing a `cursor` or `limit_to_last` are not currently supported

Args:
source: the PipelineSource to build the pipeline off o
Raises:
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
Returns:
a Pipeline representing the query
"""
return self._query()._build_pipeline(source)


def _auto_id() -> str:
"""Generate a "random" automatically generated ID.
Expand Down
Loading
Loading