Skip to content

Commit a298356

Browse files
dibahlfiCopilot
andauthored
ReadManyItems API (#42167)
* feature:read_many_api first iteration * read_many_items - adding logic for chunking/semaphores * read_many_items - code refactor * read_many_items - created a new helper class for chunking/concurrency * fix: adding test cases * read_many_items - refactoring * read_many_items - refactoring * read_many_items - clean up * Update sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py Co-authored-by: Copilot <[email protected]> * Update sdk/cosmos/azure-cosmos/tests/test_read_many_items_partition_split.py Co-authored-by: Copilot <[email protected]> * read_many_items - adding code for the sync flow. * fix: addressing comments * fix: add support for aggregated request charges in the header * fix: fixing typos * fix: fixing split tests * fix: fixing linting issues * fix: addressing comments * fix: linting errors * fix: linting errors * fix: adding order * fix: cleaning up * fix: cleaning up * fix: cleaning up * fix: bug fixing in the chunking logic * fix: addressing comments * fixing pylink comments * fixing pylint errors * fix: adding samples * fix: fixing tests --------- Co-authored-by: Copilot <[email protected]>
1 parent d285b26 commit a298356

17 files changed

+2606
-4
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 4.14.0b3 (Unreleased)
44

55
#### Features Added
6+
* Added read_items API to provide an efficient method for retrieving multiple items in a single request. See [PR 42167](https://github.com/Azure/azure-sdk-for-python/pull/42167).
67

78
#### Breaking Changes
89

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import os
2727
import urllib.parse
2828
import uuid
29+
from concurrent.futures.thread import ThreadPoolExecutor
2930
from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast
31+
3032
from typing_extensions import TypedDict
3133
from urllib3.util.retry import Retry
3234

@@ -65,6 +67,7 @@
6567
from ._cosmos_http_logging_policy import CosmosHttpLoggingPolicy
6668
from ._cosmos_responses import CosmosDict, CosmosList
6769
from ._range_partition_resolver import RangePartitionResolver
70+
from ._read_items_helper import ReadItemsHelperSync
6871
from ._request_object import RequestObject
6972
from ._retry_utility import ConnectionRetryPolicy
7073
from ._routing import routing_map_provider, routing_range
@@ -1039,6 +1042,49 @@ def DeletePermission(
10391042
self.DeleteResource(path, http_constants.ResourceType.Permission, permission_id, None, options,
10401043
**kwargs)
10411044

1045+
def read_items(
1046+
self,
1047+
collection_link: str,
1048+
items: Sequence[Tuple[str, _PartitionKeyType]],
1049+
options: Optional[Mapping[str, Any]] = None,
1050+
*,
1051+
executor: Optional[ThreadPoolExecutor] = None,
1052+
**kwargs: Any
1053+
) -> CosmosList:
1054+
"""Reads many items.
1055+
1056+
:param str collection_link: The link to the document collection.
1057+
:param items: A list of tuples, where each tuple contains an item's ID and partition key.
1058+
:type items: Sequence[Tuple[str, _PartitionKeyType]]
1059+
:param dict options: The request options for the request.
1060+
:keyword executor: Optional ThreadPoolExecutor for thread management
1061+
:paramtype executor: Optional[ThreadPoolExecutor]
1062+
:return: The list of read items.
1063+
:rtype: CosmosList
1064+
"""
1065+
if options is None:
1066+
options = {}
1067+
if not items:
1068+
return CosmosList([], response_headers=CaseInsensitiveDict())
1069+
1070+
partition_key_definition = self._get_partition_key_definition(collection_link, options)
1071+
if not partition_key_definition:
1072+
raise ValueError("Could not find partition key definition for collection.")
1073+
1074+
# Extract and remove max_concurrency from kwargs
1075+
max_concurrency = kwargs.pop('max_concurrency', 10)
1076+
1077+
helper = ReadItemsHelperSync(
1078+
client=self,
1079+
collection_link=collection_link,
1080+
items=items,
1081+
options=options,
1082+
partition_key_definition=partition_key_definition,
1083+
executor=executor,
1084+
max_concurrency=max_concurrency,
1085+
**kwargs)
1086+
return helper.read_items()
1087+
10421088
def ReadItems(
10431089
self,
10441090
collection_link: str,

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ def _fetch_items_helper_no_retries(self, fetch_function):
114114
fetched_items = []
115115
new_options = copy.deepcopy(self._options)
116116
while self._continuation or not self._has_started:
117-
if not self._has_started:
118-
self._has_started = True
119117
new_options["continuation"] = self._continuation
120118

121119
response_headers = {}
122120
(fetched_items, response_headers) = fetch_function(new_options)
121+
if not self._has_started:
122+
self._has_started = True
123123

124124
continuation_key = http_constants.HttpHeaders.Continuation
125125
self._continuation = response_headers.get(continuation_key)
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2014 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
21+
22+
"""Internal query builder for multi-item operations."""
23+
24+
from typing import Dict, Tuple, Any, TYPE_CHECKING, Sequence
25+
26+
from azure.cosmos.partition_key import _Undefined, _Empty, NonePartitionKeyValue
27+
if TYPE_CHECKING:
28+
from azure.cosmos._cosmos_client_connection import _PartitionKeyType
29+
30+
31+
class _QueryBuilder:
32+
"""Internal class for building optimized queries for multi-item operations."""
33+
34+
@staticmethod
35+
def _get_field_expression(path: str) -> str:
36+
"""Converts a path string into a query field expression.
37+
38+
:param str path: The path string to convert.
39+
:return: The query field expression.
40+
:rtype: str
41+
"""
42+
field_name = path.lstrip("/")
43+
if "/" in field_name:
44+
# Handle nested paths like "a/b" -> c["a"]["b"]
45+
field_parts = field_name.split("/")
46+
return "c" + "".join(f'["{part}"]' for part in field_parts)
47+
# Handle simple paths like "pk" -> c.pk or c["non-identifier-pk"]
48+
return f"c.{field_name}" if field_name.isidentifier() else f'c["{field_name}"]'
49+
50+
@staticmethod
51+
def is_id_partition_key_query(
52+
items: Sequence[Tuple[str, "_PartitionKeyType"]],
53+
partition_key_definition: Dict[str, Any]
54+
) -> bool:
55+
"""Check if we can use the optimized ID IN query.
56+
57+
:param Sequence[tuple[str, any]] items: The list of items to check.
58+
:param dict[str, any] partition_key_definition: The partition key definition of the container.
59+
:return: True if the optimized ID IN query can be used, False otherwise.
60+
:rtype: bool
61+
"""
62+
partition_key_paths = partition_key_definition.get("paths", [])
63+
if len(partition_key_paths) != 1 or partition_key_paths[0] != "/id":
64+
return False
65+
66+
for item_id, partition_key_value in items:
67+
pk_val = partition_key_value[0] if isinstance(partition_key_value, list) else partition_key_value
68+
if pk_val != item_id:
69+
return False
70+
return True
71+
72+
@staticmethod
73+
def is_single_logical_partition_query(
74+
items: Sequence[Tuple[str, "_PartitionKeyType"]]
75+
) -> bool:
76+
"""Check if all items in a chunk belong to the same logical partition.
77+
78+
This is used to determine if an optimized query with an IN clause can be used.
79+
80+
:param Sequence[tuple[str, any]] items: The list of items to check.
81+
:return: True if all items belong to the same logical partition, False otherwise.
82+
:rtype: bool
83+
"""
84+
if not items or len(items) <= 1:
85+
return False
86+
first_pk = items[0][1]
87+
return all(item[1] == first_pk for item in items)
88+
89+
@staticmethod
90+
def build_pk_and_id_in_query(
91+
items: Sequence[Tuple[str, "_PartitionKeyType"]],
92+
partition_key_definition: Dict[str, Any]
93+
) -> Dict[str, Any]:
94+
"""Build a query for items in a single logical partition using an IN clause for IDs.
95+
96+
e.g., SELECT * FROM c WHERE c.pk = @pk AND c.id IN (@id1, @id2)
97+
98+
:param Sequence[tuple[str, any]] items: The list of items to build the query for.
99+
:param dict[str, any] partition_key_definition: The partition key definition of the container.
100+
:return: A dictionary containing the query text and parameters.
101+
:rtype: dict[str, any]
102+
"""
103+
partition_key_path = partition_key_definition['paths'][0].lstrip('/')
104+
partition_key_value = items[0][1]
105+
106+
id_params = {f"@id{i}": item[0] for i, item in enumerate(items)}
107+
id_param_names = ", ".join(id_params.keys())
108+
109+
query_text = f"SELECT * FROM c WHERE c.{partition_key_path} = @pk AND c.id IN ({id_param_names})"
110+
111+
parameters = [{"name": "@pk", "value": partition_key_value}]
112+
parameters.extend([{"name": name, "value": value} for name, value in id_params.items()])
113+
114+
return {"query": query_text, "parameters": parameters}
115+
116+
@staticmethod
117+
def build_id_in_query(items: Sequence[Tuple[str, "_PartitionKeyType"]]) -> Dict[str, Any]:
118+
"""Build optimized query using ID IN clause when ID equals partition key.
119+
120+
:param Sequence[tuple[str, any]] items: The list of items to build the query for.
121+
:return: A dictionary containing the query text and parameters.
122+
:rtype: dict[str, any]
123+
"""
124+
id_params = {f"@param_id{i}": item_id for i, (item_id, _) in enumerate(items)}
125+
param_names = ", ".join(id_params.keys())
126+
parameters = [{"name": name, "value": value} for name, value in id_params.items()]
127+
128+
query_string = f"SELECT * FROM c WHERE c.id IN ( {param_names} )"
129+
130+
return {"query": query_string, "parameters": parameters}
131+
132+
@staticmethod
133+
def build_parameterized_query_for_items(
134+
items_by_partition: Dict[str, Sequence[Tuple[str, "_PartitionKeyType"]]],
135+
partition_key_definition: Dict[str, Any]
136+
) -> Dict[str, Any]:
137+
"""Builds a parameterized SQL query for reading multiple items.
138+
139+
:param dict[str, Sequence[tuple[str, any]]] items_by_partition: A dictionary of items grouped by partition key.
140+
:param dict[str, any] partition_key_definition: The partition key definition of the container.
141+
:return: A dictionary containing the query text and parameters.
142+
:rtype: dict[str, any]
143+
"""
144+
all_items = [item for partition_items in items_by_partition.values() for item in partition_items]
145+
146+
if not all_items:
147+
return {"query": "SELECT * FROM c WHERE false", "parameters": []}
148+
149+
partition_key_paths = partition_key_definition.get("paths", [])
150+
query_parts = []
151+
parameters = []
152+
153+
for i, (item_id, partition_key_value) in enumerate(all_items):
154+
id_param_name = f"@param_id{i}"
155+
parameters.append({"name": id_param_name, "value": item_id})
156+
condition_parts = [f"c.id = {id_param_name}"]
157+
158+
pk_values = []
159+
if partition_key_value is not None and not isinstance(partition_key_value, type(NonePartitionKeyValue)):
160+
pk_values = partition_key_value if isinstance(partition_key_value, list) else [partition_key_value]
161+
if len(pk_values) != len(partition_key_paths):
162+
raise ValueError(
163+
f"Number of components in partition key value ({len(pk_values)}) "
164+
f"does not match definition ({len(partition_key_paths)})"
165+
)
166+
167+
for j, path in enumerate(partition_key_paths):
168+
field_expr = _QueryBuilder._get_field_expression(path)
169+
pk_value = pk_values[j] if j < len(pk_values) else None
170+
171+
if pk_value is None or isinstance(pk_value, (_Undefined, _Empty)):
172+
condition_parts.append(f"IS_DEFINED({field_expr}) = false")
173+
else:
174+
pk_param_name = f"@param_pk{i}{j}"
175+
parameters.append({"name": pk_param_name, "value": pk_value})
176+
condition_parts.append(f"{field_expr} = {pk_param_name}")
177+
178+
query_parts.append(f"( {' AND '.join(condition_parts)} )")
179+
180+
query_string = f"SELECT * FROM c WHERE ( {' OR '.join(query_parts)} )"
181+
return {"query": query_string, "parameters": parameters}

0 commit comments

Comments
 (0)