Skip to content

Commit a9e3419

Browse files
Copilotbambriz
andcommitted
Implement max_concurrency optional and fix database method overloads
- Make max_concurrency Optional[int] in read_items (container.py) - Update default to None to use Python's ThreadPoolExecutor default - Update _read_items_helper.py to accept Optional[int] for max_concurrency - Update _cosmos_client_connection.py to default to None - Fix create_database overloads (sync): remove populate_query_metrics, make offer_throughput keyword-only, add response_hook - Fix create_database_if_not_exists overloads (sync): same changes - Fix create_database overloads (async): add response_hook parameter - Fix create_database_if_not_exists overloads (async): add response_hook parameter - Update implementations to handle response_hook and remove positional offer_throughput/populate_query_metrics Co-authored-by: bambriz <[email protected]>
1 parent 4947d41 commit a9e3419

File tree

5 files changed

+36
-21
lines changed

5 files changed

+36
-21
lines changed

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1081,7 +1081,7 @@ def read_items(
10811081
raise ValueError("Could not find partition key definition for collection.")
10821082

10831083
# Extract and remove max_concurrency from kwargs
1084-
max_concurrency = kwargs.pop('max_concurrency', 10)
1084+
max_concurrency = kwargs.pop('max_concurrency', None)
10851085

10861086
helper = ReadItemsHelperSync(
10871087
client=self,

sdk/cosmos/azure-cosmos/azure/cosmos/_read_items_helper.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def __init__(
4848
partition_key_definition: Dict[str, Any],
4949
*,
5050
executor: Optional[ThreadPoolExecutor] = None,
51-
max_concurrency: int = 10,
51+
max_concurrency: Optional[int] = None,
5252
**kwargs: Any
5353
):
5454
self.client = client
@@ -81,6 +81,7 @@ def read_items(self) -> CosmosList:
8181
if self.executor is not None:
8282
return self._execute_with_executor(self.executor, query_chunks)
8383

84+
# Create a new executor; if max_concurrency is None, use ThreadPoolExecutor's default
8485
with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
8586
return self._execute_with_executor(executor, query_chunks)
8687

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ async def create_database(
263263
*,
264264
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
265265
initial_headers: Optional[Dict[str, str]] = None,
266+
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
266267
throughput_bucket: Optional[int] = None,
267268
return_properties: Literal[False] = False,
268269
**kwargs: Any
@@ -302,6 +303,7 @@ async def create_database(
302303
*,
303304
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
304305
initial_headers: Optional[Dict[str, str]] = None,
306+
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
305307
throughput_bucket: Optional[int] = None,
306308
return_properties: Literal[True],
307309
**kwargs: Any
@@ -392,11 +394,14 @@ async def create_database( # pylint:disable=docstring-should-be-keyword
392394

393395
offer_throughput = kwargs.pop("offer_throughput", None)
394396
return_properties = kwargs.pop("return_properties", False)
397+
response_hook = kwargs.pop("response_hook", None)
395398

396399
request_options = _build_options(kwargs)
397400
_set_throughput_options(offer=offer_throughput, request_options=request_options)
398401

399402
result = await self.client_connection.CreateDatabase(database={"id": id}, options=request_options, **kwargs)
403+
if response_hook:
404+
response_hook(self.client_connection.last_response_headers)
400405
if not return_properties:
401406
return DatabaseProxy(self.client_connection, id=result["id"], properties=result)
402407
return DatabaseProxy(self.client_connection, id=result["id"], properties=result), result
@@ -408,6 +413,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
408413
*,
409414
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
410415
initial_headers: Optional[Dict[str, str]] = None,
416+
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
411417
throughput_bucket: Optional[int] = None,
412418
return_properties: Literal[False] = False,
413419
**kwargs: Any
@@ -443,6 +449,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
443449
*,
444450
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
445451
initial_headers: Optional[Dict[str, str]] = None,
452+
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
446453
throughput_bucket: Optional[int] = None,
447454
return_properties: Literal[True],
448455
**kwargs: Any
@@ -527,10 +534,13 @@ async def create_database_if_not_exists( # pylint:disable=docstring-should-be-ke
527534

528535
offer_throughput = kwargs.pop("offer_throughput", None)
529536
return_properties = kwargs.pop("return_properties", False)
537+
response_hook = kwargs.pop("response_hook", None)
530538

531539
try:
532540
database_proxy = self.get_database_client(id)
533541
result = await database_proxy.read(**kwargs)
542+
if response_hook:
543+
response_hook(self.client_connection.last_response_headers)
534544
if not return_properties:
535545
return database_proxy
536546
return database_proxy, result
@@ -539,6 +549,7 @@ async def create_database_if_not_exists( # pylint:disable=docstring-should-be-ke
539549
id,
540550
offer_throughput=offer_throughput,
541551
return_properties=return_properties,
552+
response_hook=response_hook,
542553
**kwargs
543554
)
544555

sdk/cosmos/azure-cosmos/azure/cosmos/container.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ def read_items(
293293
items: Sequence[Tuple[str, _PartitionKeyType]],
294294
*,
295295
executor: Optional[ThreadPoolExecutor] = None,
296-
max_concurrency: int = 10,
296+
max_concurrency: Optional[int] = None,
297297
consistency_level: Optional[str] = None,
298298
session_token: Optional[str] = None,
299299
initial_headers: Optional[Dict[str, str]] = None,
@@ -312,7 +312,8 @@ def read_items(
312312
:keyword executor: Optional ThreadPoolExecutor for handling concurrent operations.
313313
If not provided, a new executor will be created as needed.
314314
:keyword int max_concurrency: The maximum number of concurrent operations for the
315-
items request. This value is ignored if an executor is provided. Defaults to 10.
315+
items request. This value is ignored if an executor is provided. If not specified,
316+
defaults to Python's ThreadPoolExecutor default of min(32, (os.cpu_count() or 1) + 4).
316317
:keyword str consistency_level: The consistency level to use for the request.
317318
:keyword str session_token: Token for use with Session consistency.
318319
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.

sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -264,13 +264,13 @@ def from_connection_string(
264264
**kwargs
265265
)
266266

267+
@overload
267268
@overload
268269
def create_database( # pylint:disable=docstring-missing-param
269270
self,
270271
id: str,
271-
populate_query_metrics: Optional[bool] = None,
272-
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
273272
*,
273+
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
274274
initial_headers: Optional[Dict[str, str]] = None,
275275
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
276276
throughput_bucket: Optional[int] = None,
@@ -307,9 +307,8 @@ def create_database( # pylint:disable=docstring-missing-param
307307
def create_database( # pylint:disable=docstring-missing-param
308308
self,
309309
id: str,
310-
populate_query_metrics: Optional[bool] = None,
311-
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
312310
*,
311+
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
313312
initial_headers: Optional[Dict[str, str]] = None,
314313
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
315314
throughput_bucket: Optional[int] = None,
@@ -394,10 +393,10 @@ def create_database( # pylint:disable=docstring-missing-param, docstring-should
394393
UserWarning)
395394

396395
id = args[0] if args else kwargs.pop("id")
397-
populate_query_metrics = args[1] if len(args) > 1 else kwargs.pop("populate_query_metrics", None)
398-
offer_throughput = args[2] if len(args) > 2 else kwargs.pop("offer_throughput", None)
399-
if len(args) > 3:
400-
raise TypeError(f"Unexpected positional arguments: {args[3:]}")
396+
if len(args) > 1:
397+
raise TypeError(f"Unexpected positional arguments: {args[1:]}")
398+
populate_query_metrics = kwargs.pop("populate_query_metrics", None)
399+
offer_throughput = kwargs.pop("offer_throughput", None)
401400
return_properties = kwargs.pop("return_properties", False)
402401
response_hook = kwargs.pop("response_hook", None)
403402

@@ -421,10 +420,10 @@ def create_database( # pylint:disable=docstring-missing-param, docstring-should
421420
def create_database_if_not_exists( # pylint:disable=docstring-missing-param
422421
self,
423422
id: str,
424-
populate_query_metrics: Optional[bool] = None,
425-
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
426423
*,
424+
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
427425
initial_headers: Optional[Dict[str, str]] = None,
426+
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
428427
throughput_bucket: Optional[int] = None,
429428
return_properties: Literal[False] = False,
430429
**kwargs: Any
@@ -455,10 +454,10 @@ def create_database_if_not_exists( # pylint:disable=docstring-missing-param
455454
def create_database_if_not_exists( # pylint:disable=docstring-missing-param
456455
self,
457456
id: str,
458-
populate_query_metrics: Optional[bool] = None,
459-
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
460457
*,
458+
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
461459
initial_headers: Optional[Dict[str, str]] = None,
460+
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
462461
throughput_bucket: Optional[int] = None,
463462
return_properties: Literal[True],
464463
**kwargs: Any
@@ -534,26 +533,29 @@ def create_database_if_not_exists( # pylint:disable=docstring-missing-param, do
534533
UserWarning)
535534

536535
id = args[0] if args else kwargs.pop("id")
537-
populate_query_metrics = args[1] if len(args) > 1 else kwargs.pop("populate_query_metrics", None)
538-
offer_throughput = args[2] if len(args) > 2 else kwargs.pop("offer_throughput", None)
539-
if len(args) > 3:
540-
raise TypeError(f"Unexpected positional arguments: {args[3:]}")
536+
if len(args) > 1:
537+
raise TypeError(f"Unexpected positional arguments: {args[1:]}")
538+
populate_query_metrics = kwargs.pop("populate_query_metrics", None)
539+
offer_throughput = kwargs.pop("offer_throughput", None)
541540
return_properties = kwargs.pop("return_properties", False)
541+
response_hook = kwargs.pop("response_hook", None)
542542
try:
543543
database_proxy = self.get_database_client(id)
544544
result = database_proxy.read(
545545
populate_query_metrics=populate_query_metrics,
546546
**kwargs
547547
)
548+
if response_hook:
549+
response_hook(self.client_connection.last_response_headers)
548550
if not return_properties:
549551
return database_proxy
550552
return database_proxy, result
551553
except CosmosResourceNotFoundError:
552554
return self.create_database(
553555
id,
554-
populate_query_metrics=populate_query_metrics,
555556
offer_throughput=offer_throughput,
556557
return_properties=return_properties,
558+
response_hook=response_hook,
557559
**kwargs
558560
)
559561

0 commit comments

Comments
 (0)