Skip to content
Open
4 changes: 0 additions & 4 deletions google/cloud/firestore_v1/base_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,8 @@ def _build_pipeline(self, source: "PipelineSource"):
"""
Convert this query into a Pipeline

Queries containing a `cursor` or `limit_to_last` are not currently supported

Args:
source: the PipelineSource to build the pipeline off of
Raises:
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
Returns:
a Pipeline representing the query
"""
Expand Down
4 changes: 0 additions & 4 deletions google/cloud/firestore_v1/base_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,8 @@ def _build_pipeline(self, source: "PipelineSource"):
"""
Convert this query into a Pipeline

Queries containing a `cursor` or `limit_to_last` are not currently supported

Args:
source: the PipelineSource to build the pipeline off o
Raises:
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
Returns:
a Pipeline representing the query
"""
Expand Down
169 changes: 136 additions & 33 deletions google/cloud/firestore_v1/base_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,12 +1134,8 @@ def _build_pipeline(self, source: "PipelineSource"):
"""
Convert this query into a Pipeline

Queries containing a `cursor` or `limit_to_last` are not currently supported

Args:
source: the PipelineSource to build the pipeline off of
Raises:
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
Returns:
a Pipeline representing the query
"""
Expand All @@ -1161,39 +1157,61 @@ def _build_pipeline(self, source: "PipelineSource"):
ppl = ppl.select(*[field.field_path for field in self._projection.fields])

# Orders
orders = self._normalize_orders()
if orders:
exists = []
orderings = []
for order in orders:
field = pipeline_expressions.Field.of(order.field.field_path)
exists.append(field.exists())
direction = (
"ascending"
if order.direction == StructuredQuery.Direction.ASCENDING
else "descending"
)
orderings.append(pipeline_expressions.Ordering(field, direction))

# Add exists filters to match Query's implicit orderby semantics.
if len(exists) == 1:
ppl = ppl.where(exists[0])
else:
ppl = ppl.where(pipeline_expressions.And(*exists))
# "explicit_orders" are only those explicitly added by the user via order_by().
# We only generate existence filters for these fields.
if self._orders:
exists = [
pipeline_expressions.Field.of(o.field.field_path).exists()
for o in self._orders
]
ppl = ppl.where(
pipeline_expressions.And(*exists) if len(exists) > 1 else exists[0]
)

# "normalized_orders" includes both user-specified orders and implicit orders
# (e.g. __name__ or inequality fields) required by Firestore semantics.
normalized_orders = self._normalize_orders()
orderings = [
pipeline_expressions.Ordering(
pipeline_expressions.Field.of(o.field.field_path),
"ascending"
if o.direction == StructuredQuery.Direction.ASCENDING
else "descending",
)
for o in normalized_orders
]

# Apply cursors as filters.
if orderings:
for cursor, is_start in [(self._start_at, True), (self._end_at, False)]:
cursor = self._normalize_cursor(cursor, normalized_orders)
if cursor:
ppl = ppl.where(
_where_conditions_from_cursor(cursor, orderings, is_start)
)

# Handle sort and limit, including limit_to_last semantics.
is_limit_to_last = self._limit_to_last and bool(orderings)

# Add sort orderings
if is_limit_to_last:
# If limit_to_last is set, we need to reverse the orderings to find the
# "last" N documents (which effectively become the "first" N in reverse order).
ppl = ppl.sort(*_reverse_orderings(orderings))
elif orderings:
ppl = ppl.sort(*orderings)

# Cursors, Limit and Offset
if self._start_at or self._end_at or self._limit_to_last:
raise NotImplementedError(
"Query to Pipeline conversion: cursors and limit_to_last is not supported yet."
)
else: # Limit & Offset without cursors
if self._offset:
ppl = ppl.offset(self._offset)
if self._limit:
ppl = ppl.limit(self._limit)
if self._limit is not None and (not self._limit_to_last or orderings):
ppl = ppl.limit(self._limit)

if is_limit_to_last:
# If we reversed the orderings for limit_to_last, we must now re-sort
# using the original orderings to return the results in the user-requested order.
ppl = ppl.sort(*orderings)

# Offset
if self._offset:
ppl = ppl.offset(self._offset)

return ppl

Expand Down Expand Up @@ -1366,6 +1384,91 @@ def _cursor_pb(cursor_pair: Optional[Tuple[list, bool]]) -> Optional[Cursor]:
return None


def _get_cursor_exclusive_condition(
is_start_cursor: bool,
ordering: pipeline_expressions.Ordering,
value: pipeline_expressions.Constant,
) -> pipeline_expressions.BooleanExpression:
"""
Helper to determine the correct comparison operator (greater_than or less_than)
based on the cursor type (start/end) and the sort direction (ascending/descending).
"""
field = ordering.expr
if (
is_start_cursor
and ordering.order_dir == pipeline_expressions.Ordering.Direction.ASCENDING
) or (
not is_start_cursor
and ordering.order_dir == pipeline_expressions.Ordering.Direction.DESCENDING
):
return field.greater_than(value)
else:
return field.less_than(value)


def _where_conditions_from_cursor(
cursor: Tuple[List, bool],
orderings: List[pipeline_expressions.Ordering],
is_start_cursor: bool,
) -> pipeline_expressions.BooleanExpression:
"""
Converts a cursor into a filter condition for the pipeline.

Args:
cursor: The cursor values and the 'before' flag.
orderings: The list of ordering expressions used in the query.
is_start_cursor: True if this is a start_at/start_after cursor, False if it is an end_at/end_before cursor.
Returns:
A BooleanExpression representing the cursor condition.
"""
cursor_values, before = cursor
size = len(cursor_values)

ordering = orderings[size - 1]
field = ordering.expr
value = pipeline_expressions.Constant(cursor_values[size - 1])

# Add condition for last bound
condition = _get_cursor_exclusive_condition(is_start_cursor, ordering, value)

if (is_start_cursor and before) or (not is_start_cursor and not before):
# When the cursor bound is inclusive, then the last bound
# can be equal to the value, otherwise it's not equal
condition = pipeline_expressions.Or(condition, field.equal(value))

# Iterate backwards over the remaining bounds, adding a condition for each one
for i in range(size - 2, -1, -1):
ordering = orderings[i]
field = ordering.expr
value = pipeline_expressions.Constant(cursor_values[i])

# For each field in the orderings, the condition is either
# a) lessThan|greaterThan the cursor value,
# b) or equal the cursor value and lessThan|greaterThan the cursor values for other fields
exclusive_condition = _get_cursor_exclusive_condition(
is_start_cursor, ordering, value
)
condition = pipeline_expressions.Or(
exclusive_condition,
pipeline_expressions.And(field.equal(value), condition),
)

return condition


def _reverse_orderings(
orderings: List[pipeline_expressions.Ordering],
) -> List[pipeline_expressions.Ordering]:
reversed_orderings = []
for o in orderings:
if o.order_dir == pipeline_expressions.Ordering.Direction.ASCENDING:
new_dir = "descending"
else:
new_dir = "ascending"
reversed_orderings.append(pipeline_expressions.Ordering(o.expr, new_dir))
return reversed_orderings


def _query_response_to_snapshot(
response_pb: RunQueryResponse, collection, expected_prefix: str
) -> Optional[document.DocumentSnapshot]:
Expand Down
10 changes: 8 additions & 2 deletions google/cloud/firestore_v1/pipeline_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1833,15 +1833,21 @@ def _from_query_filter_pb(filter_pb, client):
elif filter_pb.op == Query_pb.FieldFilter.Operator.EQUAL:
return And(field.exists(), field.equal(value))
elif filter_pb.op == Query_pb.FieldFilter.Operator.NOT_EQUAL:
return And(field.exists(), field.not_equal(value))
# In Enterprise DBs NOT_EQUAL will match a field that does not exist,
# therefore we do not want an existence filter for the NOT_EQUAL conversion
# so the Query and Pipeline behavior are consistent in Enterprise.
return field.not_equal(value)
if filter_pb.op == Query_pb.FieldFilter.Operator.ARRAY_CONTAINS:
return And(field.exists(), field.array_contains(value))
elif filter_pb.op == Query_pb.FieldFilter.Operator.ARRAY_CONTAINS_ANY:
return And(field.exists(), field.array_contains_any(value))
elif filter_pb.op == Query_pb.FieldFilter.Operator.IN:
return And(field.exists(), field.equal_any(value))
elif filter_pb.op == Query_pb.FieldFilter.Operator.NOT_IN:
return And(field.exists(), field.not_equal_any(value))
# In Enterprise DBs NOT_IN will match a field that does not exist,
# therefore we do not want an existence filter for the NOT_IN conversion
# so the Query and Pipeline behavior are consistent in Enterprise.
return field.not_equal_any(value)
else:
raise TypeError(f"Unexpected FieldFilter operator type: {filter_pb.op}")
elif isinstance(filter_pb, Query_pb.Filter):
Expand Down
4 changes: 0 additions & 4 deletions google/cloud/firestore_v1/pipeline_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,8 @@ def create_from(
"""
Create a pipeline from an existing query

Queries containing a `cursor` or `limit_to_last` are not currently supported

Args:
query: the query to build the pipeline off of
Raises:
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
Returns:
a new pipeline instance representing the query
"""
Expand Down
Loading
Loading