Skip to content

Commit e18d002

Browse files
authored
fix: fixing 410 SDK refresh for feed ranges (#44425)
* fix: fixing 410 for feed ranges * fix: adding sync support for handling 410 bug * fix: fixing Pylint errors * fix: addressing comments * fix: adding tests * fix: addressing comments * fix: fixing pylint warnings * fix: updating version date * fix: addressing MyPy errors * fix: combining tests to avoid timeout pipeline errors * fix: fixing pipeline timeout issue * fix: fixing infinite loop bug
1 parent 8ac638c commit e18d002

File tree

8 files changed

+649
-37
lines changed

8 files changed

+649
-37
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
## Release History
22

3-
### 4.15.0b2 (Unreleased)
4-
5-
#### Features Added
6-
7-
#### Breaking Changes
3+
### 4.15.0b2 (2025-12-16)
84

95
#### Bugs Fixed
10-
11-
#### Other Changes
6+
* Fixed bug where sdk was not properly retrying requests in some edge cases after partition splits.See [PR 44425](https://github.com/Azure/azure-sdk-for-python/pull/44425)
127

138
### 4.15.0b1 (2025-11-26)
149

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,115 @@ def build_options(kwargs: dict[str, Any]) -> dict[str, Any]:
129129
options['accessCondition'] = {'type': 'IfNoneMatch', 'condition': if_none_match}
130130
return options
131131

132+
def _merge_query_results(
133+
results: dict[str, Any],
134+
partial_result: dict[str, Any],
135+
query: Optional[Union[str, dict[str, Any]]]
136+
) -> dict[str, Any]:
137+
"""Merges partial query results from different partitions.
138+
139+
This method is required for queries that are manually fanned out to multiple
140+
partitions or ranges within the SDK, such as prefix partition key queries.
141+
For non-aggregated queries, results from each partition are simply concatenated.
142+
However, for aggregate queries (COUNT, SUM, MIN, MAX, AVG), each partition
143+
returns a partial aggregate. This method merges these partial results to compute
144+
the final, correct aggregate value.
145+
146+
TODO:This client-side aggregation is a temporary workaround. Ideally, this logic
147+
should be integrated into the core pipeline as aggregate queries are handled by DefaultExecutionContext,
148+
not MultiAggregatorExecutionContext, which is not split proof until the logic is moved to the core pipeline.
149+
This method handles the aggregation of results when a query spans multiple
150+
partitions. It specifically handles:
151+
1. Standard queries: Appends documents from partial_result to results.
152+
2. Aggregate queries that return a JSON object (e.g., `SELECT COUNT(1) FROM c`, `SELECT MIN(c.field) FROM c`).
153+
3. VALUE queries with aggregation that return a scalar value (e.g., `SELECT VALUE COUNT(1) FROM c`).
154+
155+
:param dict[str, Any] results: The accumulated result's dictionary.
156+
:param dict[str, Any] partial_result: The new partial result dictionary to merge.
157+
:param query: The query being executed.
158+
:type query: str or dict[str, Any]
159+
:return: The merged result's dictionary.
160+
:rtype: dict[str, Any]
161+
"""
162+
if not results:
163+
return partial_result
164+
165+
partial_docs = partial_result.get("Documents")
166+
if not partial_docs:
167+
return results
168+
169+
results_docs = results.get("Documents")
170+
171+
# Check if both results are aggregate queries
172+
is_partial_agg = (
173+
isinstance(partial_docs, list)
174+
and len(partial_docs) == 1
175+
and isinstance(partial_docs[0], dict)
176+
and partial_docs[0].get("_aggregate") is not None
177+
)
178+
is_results_agg = (
179+
results_docs
180+
and isinstance(results_docs, list)
181+
and len(results_docs) == 1
182+
and isinstance(results_docs[0], dict)
183+
and results_docs[0].get("_aggregate") is not None
184+
)
185+
186+
if is_partial_agg and is_results_agg:
187+
agg_results = results_docs[0]["_aggregate"] # type: ignore[index]
188+
agg_partial = partial_docs[0]["_aggregate"]
189+
for key in agg_partial:
190+
if key not in agg_results:
191+
agg_results[key] = agg_partial[key]
192+
elif isinstance(agg_partial.get(key), dict) and "count" in agg_partial[key]: # AVG
193+
if isinstance(agg_results.get(key), dict):
194+
agg_results[key]["sum"] += agg_partial[key]["sum"]
195+
agg_results[key]["count"] += agg_partial[key]["count"]
196+
elif key.lower().startswith("min"):
197+
agg_results[key] = min(agg_results[key], agg_partial[key])
198+
elif key.lower().startswith("max"):
199+
agg_results[key] = max(agg_results[key], agg_partial[key])
200+
else: # COUNT, SUM
201+
agg_results[key] += agg_partial[key]
202+
return results
203+
204+
# Check if both are VALUE aggregate queries
205+
is_partial_value_agg = (
206+
isinstance(partial_docs, list)
207+
and len(partial_docs) == 1
208+
and isinstance(partial_docs[0], (int, float))
209+
)
210+
is_results_value_agg = (
211+
results_docs
212+
and isinstance(results_docs, list)
213+
and len(results_docs) == 1
214+
and isinstance(results_docs[0], (int, float))
215+
)
216+
217+
if is_partial_value_agg and is_results_value_agg:
218+
query_text = query.get("query") if isinstance(query, dict) else query
219+
if query_text:
220+
query_upper = query_text.upper()
221+
# For MIN/MAX, we find the min/max of the partial results.
222+
# For COUNT/SUM, we sum the partial results.
223+
# Without robust query parsing, we can't distinguish them reliably.
224+
# Defaulting to sum for COUNT/SUM. MIN/MAX VALUE queries are not fully supported client-side.
225+
if " SELECT VALUE MIN" in query_upper:
226+
results_docs[0] = min(results_docs[0], partial_docs[0]) # type: ignore[index]
227+
elif " SELECT VALUE MAX" in query_upper:
228+
results_docs[0] = max(results_docs[0], partial_docs[0]) # type: ignore[index]
229+
else: # For COUNT/SUM, we sum the partial results
230+
results_docs[0] += partial_docs[0] # type: ignore[index]
231+
return results
232+
233+
# Standard query, append documents
234+
if results_docs is None:
235+
results["Documents"] = partial_docs
236+
elif isinstance(results_docs, list) and isinstance(partial_docs, list):
237+
results_docs.extend(partial_docs)
238+
results["_count"] = len(results["Documents"])
239+
return results
240+
132241

133242
def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
134243
cosmos_client_connection: Union["CosmosClientConnection", "AsyncClientConnection"],

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3353,11 +3353,16 @@ def __GetBodiesFromQueryResult(result: dict[str, Any]) -> list[dict[str, Any]]:
33533353
)
33543354
self.last_response_headers = last_response_headers
33553355
self._UpdateSessionIfRequired(req_headers, partial_result, last_response_headers)
3356-
if results:
3357-
# add up all the query results from all over lapping ranges
3358-
results["Documents"].extend(partial_result["Documents"])
3359-
else:
3360-
results = partial_result
3356+
# Introducing a temporary complex function into a critical path to handle aggregated queries
3357+
# during splits, as a precaution falling back to the original logic if anything goes wrong
3358+
try:
3359+
results = base._merge_query_results(results, partial_result, query)
3360+
except Exception: # pylint: disable=broad-exception-caught
3361+
# If the new merge logic fails, fall back to the original logic.
3362+
if results:
3363+
results["Documents"].extend(partial_result["Documents"])
3364+
else:
3365+
results = partial_result
33613366
if response_hook:
33623367
response_hook(last_response_headers, partial_result)
33633368
# if the prefix partition query has results lets return it

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
import copy
2828

2929
from ...aio import _retry_utility_async
30-
from ... import http_constants
30+
from ... import http_constants, exceptions
31+
3132

3233
# pylint: disable=protected-access
3334

@@ -136,12 +137,30 @@ async def _fetch_items_helper_with_retries(self, fetch_function):
136137
# ExecuteAsync passes retry context parameters (timeout, operation start time, logger, etc.)
137138
# The callback need to accept these parameters even if unused
138139
# Removing **kwargs results in a TypeError when ExecuteAsync tries to pass these parameters
139-
async def callback(**kwargs): # pylint: disable=unused-argument
140-
return await self._fetch_items_helper_no_retries(fetch_function)
141-
142-
return await _retry_utility_async.ExecuteAsync(
143-
self._client, self._client._global_endpoint_manager, callback, **self._options
144-
)
140+
async def execute_fetch():
141+
async def callback(**kwargs): # pylint: disable=unused-argument
142+
return await self._fetch_items_helper_no_retries(fetch_function)
143+
144+
return await _retry_utility_async.ExecuteAsync(
145+
self._client, self._client._global_endpoint_manager, callback, **self._options
146+
)
147+
148+
max_retries = 3
149+
attempt = 0
150+
while attempt <= max_retries:
151+
try:
152+
return await execute_fetch()
153+
except exceptions.CosmosHttpResponseError as e:
154+
if exceptions._partition_range_is_gone(e):
155+
attempt += 1
156+
if attempt > max_retries:
157+
raise # Exhausted retries, propagate error
158+
159+
# Refresh routing map to get new partition key ranges
160+
self._client.refresh_routing_map_provider()
161+
# Retry immediately (no backoff needed for partition splits)
162+
continue
163+
raise # Not a partition split error, propagate immediately
145164

146165

147166
class _DefaultQueryExecutionContext(_QueryExecutionContextBase):

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525

2626
from collections import deque
2727
import copy
28-
from .. import _retry_utility, http_constants
28+
from .. import _retry_utility, http_constants, exceptions
29+
2930

3031
# pylint: disable=protected-access
3132

@@ -131,14 +132,34 @@ def _fetch_items_helper_no_retries(self, fetch_function):
131132
def _fetch_items_helper_with_retries(self, fetch_function):
132133
# TODO: Properly propagate kwargs from retry utility to fetch function
133134
# the callback keep the **kwargs parameter to maintain compatibility with the retry utility's execution pattern.
134-
# ExecuteAsync passes retry context parameters (timeout, operation start time, logger, etc.)
135+
# Execute passes retry context parameters (timeout, operation start time, logger, etc.)
135136
# The callback need to accept these parameters even if unused
136-
# Removing **kwargs results in a TypeError when ExecuteAsync tries to pass these parameters
137-
def callback(**kwargs): # pylint: disable=unused-argument
138-
return self._fetch_items_helper_no_retries(fetch_function)
139-
140-
return _retry_utility.Execute(self._client, self._client._global_endpoint_manager, callback, **self._options)
141-
137+
# Removing **kwargs results in a TypeError when Execute tries to pass these parameters
138+
def execute_fetch():
139+
def callback(**kwargs): # pylint: disable=unused-argument
140+
return self._fetch_items_helper_no_retries(fetch_function)
141+
142+
return _retry_utility.Execute(
143+
self._client, self._client._global_endpoint_manager, callback, **self._options
144+
)
145+
146+
max_retries = 3
147+
attempt = 0
148+
149+
while attempt <= max_retries:
150+
try:
151+
return execute_fetch()
152+
except exceptions.CosmosHttpResponseError as e:
153+
if exceptions._partition_range_is_gone(e):
154+
attempt += 1
155+
if attempt > max_retries:
156+
raise # Exhausted retries, propagate error
157+
158+
# Refresh routing map to get new partition key ranges
159+
self._client.refresh_routing_map_provider()
160+
# Retry immediately (no backoff needed for partition splits)
161+
continue
162+
raise # Not a partition split error, propagate immediately
142163
next = __next__ # Python 2 compatibility.
143164

144165

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3149,11 +3149,18 @@ def __GetBodiesFromQueryResult(result: dict[str, Any]) -> list[dict[str, Any]]:
31493149
)
31503150
self.last_response_headers = last_response_headers
31513151
self._UpdateSessionIfRequired(req_headers, partial_result, last_response_headers)
3152-
if results:
3153-
# add up all the query results from all over lapping ranges
3154-
results["Documents"].extend(partial_result["Documents"])
3155-
else:
3156-
results = partial_result
3152+
3153+
# Introducing a temporary complex function into a critical path to handle aggregated queries,
3154+
# during splits as a precaution falling back to the original logic if anything goes wrong
3155+
try:
3156+
results = base._merge_query_results(results, partial_result, query)
3157+
except Exception: # pylint: disable=broad-exception-caught
3158+
# If the new merge logic fails, fall back to the original logic.
3159+
if results:
3160+
results["Documents"].extend(partial_result["Documents"])
3161+
else:
3162+
results = partial_result
3163+
31573164
if response_hook:
31583165
response_hook(self.last_response_headers, partial_result)
31593166
# if the prefix partition query has results lets return it

0 commit comments

Comments
 (0)