Skip to content

Commit 7e59645

Browse files
Copilotbambrizallenkim0129simorenoh
authored
Fix Issue #43211: Make max_concurrency optional (sync only) and fix create_database overloads (#43269)
* Initial plan * Implement max_concurrency optional and fix database method overloads - Make max_concurrency Optional[int] in read_items (container.py) - Update default to None to use Python's ThreadPoolExecutor default - Update _read_items_helper.py to accept Optional[int] for max_concurrency - Update _cosmos_client_connection.py to default to None - Fix create_database overloads (sync): remove populate_query_metrics, make offer_throughput keyword-only, add response_hook - Fix create_database_if_not_exists overloads (sync): same changes - Fix create_database overloads (async): add response_hook parameter - Fix create_database_if_not_exists overloads (async): add response_hook parameter - Update implementations to handle response_hook and remove positional offer_throughput/populate_query_metrics Co-authored-by: bambriz <[email protected]> * Add unit tests for max_concurrency optional behavior Co-authored-by: bambriz <[email protected]> * updated docstrings for create_Database Updated docstrings for create_Database in sync and async versions. * Address review comments: update docstrings, add tests, fix async version - Add max_concurrency to docstring in _cosmos_client_connection.py - Update docstring to use generic language about ThreadPoolExecutor default - Add tests for max_concurrency ignored with executor and Python default behavior - Update async version of read_items to also use Optional[int] for max_concurrency - Update async _cosmos_client_connection_async.py to use None as default Co-authored-by: allenkim0129 <[email protected]> * Revert async changes - only sync path should be modified - Revert aio/_container.py to keep max_concurrency: int = 10 - Revert aio/_cosmos_client_connection_async.py to keep default of 10 - Async uses asyncio.Semaphore instead of ThreadPoolExecutor - Only sync path uses ThreadPoolExecutor and benefits from Optional behavior Co-authored-by: simorenoh <[email protected]> * update changes * max concurrency update updated default of async max currency to be None in public facing api, internally we set to 5. * pylint and test fixes * Update _container.py fix docstring indentation * update quick fixes --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: bambriz <[email protected]> Co-authored-by: bambriz <[email protected]> Co-authored-by: allenkim0129 <[email protected]> Co-authored-by: simorenoh <[email protected]> Co-authored-by: bambriz <[email protected]>
1 parent 7dfe33b commit 7e59645

File tree

8 files changed

+73
-67
lines changed

8 files changed

+73
-67
lines changed

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1078,7 +1078,7 @@ def read_items(
10781078
raise ValueError("Could not find partition key definition for collection.")
10791079

10801080
# Extract and remove max_concurrency from kwargs
1081-
max_concurrency = kwargs.pop('max_concurrency', 10)
1081+
max_concurrency = kwargs.pop('max_concurrency', None)
10821082

10831083
helper = ReadItemsHelperSync(
10841084
client=self,

sdk/cosmos/azure-cosmos/azure/cosmos/_read_items_helper.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def __init__(
4848
partition_key_definition: dict[str, Any],
4949
*,
5050
executor: Optional[ThreadPoolExecutor] = None,
51-
max_concurrency: int = 10,
51+
max_concurrency: Optional[int] = None,
5252
**kwargs: Any
5353
):
5454
self.client = client
@@ -81,6 +81,7 @@ def read_items(self) -> CosmosList:
8181
if self.executor is not None:
8282
return self._execute_with_executor(self.executor, query_chunks)
8383

84+
# Create a new executor; if max_concurrency is None, use ThreadPoolExecutor's default
8485
with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
8586
return self._execute_with_executor(executor, query_chunks)
8687

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ async def read_items(
433433
self,
434434
items: Sequence[Tuple[str, PartitionKeyType]],
435435
*,
436-
max_concurrency: int = 10,
436+
max_concurrency: Optional[int] = None,
437437
consistency_level: Optional[str] = None,
438438
session_token: Optional[str] = None,
439439
initial_headers: Optional[dict[str, str]] = None,
@@ -449,8 +449,9 @@ async def read_items(
449449
450450
:param items: A list of tuples, where each tuple contains an item's ID and partition key.
451451
:type items: Sequence[Tuple[str, PartitionKeyType]]
452-
:keyword int max_concurrency: The maximum number of concurrent operations for the read_items
453-
request. Defaults to 10.
452+
:keyword int max_concurrency: Specifies the maximum number of concurrent operations for the
453+
`read_items` request. If not provided or set to None, the internal default value will be
454+
used when passed to `asyncio.Semaphore`.
454455
:keyword str consistency_level: The consistency level to use for the request.
455456
:keyword str session_token: Token for use with Session consistency.
456457
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ async def create_database(
263263
*,
264264
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
265265
initial_headers: Optional[dict[str, str]] = None,
266+
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
266267
throughput_bucket: Optional[int] = None,
267268
return_properties: Literal[False] = False,
268269
**kwargs: Any
@@ -271,12 +272,12 @@ async def create_database(
271272
Create a new database with the given ID (name).
272273
273274
:param str id: ID (name) of the database to create.
274-
:keyword offer_throughput: The provisioned throughput for this offer.
275-
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
275+
:keyword Union[int, ~azure.cosmos.ThroughputProperties] offer_throughput: The provisioned throughput
276+
for this database.
276277
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
277-
:keyword response_hook: A callable invoked with the response metadata.
278+
:keyword Callable[[dict[str, str], dict[str, Any]], None] response_hook: A callable invoked with
279+
the response metadata.
278280
:keyword int throughput_bucket: The desired throughput bucket for the client
279-
:paramtype response_hook: Callable[[dict[str, str], dict[str, Any]], None]
280281
:keyword bool return_properties: Specifies whether to return either a DatabaseProxy
281282
or a Tuple containing a DatabaseProxy and the associated database properties.
282283
:raises ~azure.cosmos.exceptions.CosmosResourceExistsError: Database with the given ID already exists.
@@ -302,6 +303,7 @@ async def create_database(
302303
*,
303304
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
304305
initial_headers: Optional[dict[str, str]] = None,
306+
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
305307
throughput_bucket: Optional[int] = None,
306308
return_properties: Literal[True],
307309
**kwargs: Any
@@ -310,12 +312,12 @@ async def create_database(
310312
Create a new database with the given ID (name).
311313
312314
:param str id: ID (name) of the database to create.
313-
:keyword offer_throughput: The provisioned throughput for this offer.
314-
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
315+
:keyword Union[int, ~azure.cosmos.ThroughputProperties] offer_throughput: The provisioned throughput
316+
for this database.
315317
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
316-
:keyword response_hook: A callable invoked with the response metadata.
318+
:keyword Callable[[dict[str, str], dict[str, Any]], None] response_hook: A callable invoked with
319+
the response metadata.
317320
:keyword int throughput_bucket: The desired throughput bucket for the client
318-
:paramtype response_hook: Callable[[dict[str, str], dict[str, Any]], None]
319321
:keyword bool return_properties: Specifies whether to return either a DatabaseProxy
320322
or a Tuple containing a DatabaseProxy and the associated database properties.
321323
:raises ~azure.cosmos.exceptions.CosmosResourceExistsError: Database with the given ID already exists.
@@ -345,12 +347,12 @@ async def create_database( # pylint:disable=docstring-should-be-keyword
345347
346348
:param Any args: args
347349
:param str id: ID (name) of the database to read or create.
348-
:keyword offer_throughput: The provisioned throughput for this offer.
349-
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
350+
:keyword Union[int, ~azure.cosmos.ThroughputProperties] offer_throughput: The provisioned throughput
351+
for this database.
350352
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
351-
:keyword response_hook: A callable invoked with the response metadata.
353+
:keyword Callable[[dict[str, str], dict[str, Any]], None] response_hook: A callable invoked with
354+
the response metadata.
352355
:keyword int throughput_bucket: The desired throughput bucket for the client
353-
:paramtype response_hook: Callable[[dict[str, str], dict[str, Any]], None]
354356
:keyword bool return_properties: Specifies whether to return either a DatabaseProxy
355357
or a Tuple containing a DatabaseProxy and the associated database properties.
356358
:raises ~azure.cosmos.exceptions.CosmosResourceExistsError: Database with the given ID already exists.
@@ -392,11 +394,14 @@ async def create_database( # pylint:disable=docstring-should-be-keyword
392394

393395
offer_throughput = kwargs.pop("offer_throughput", None)
394396
return_properties = kwargs.pop("return_properties", False)
397+
response_hook = kwargs.pop("response_hook", None)
395398

396399
request_options = _build_options(kwargs)
397400
_set_throughput_options(offer=offer_throughput, request_options=request_options)
398401

399402
result = await self.client_connection.CreateDatabase(database={"id": id}, options=request_options, **kwargs)
403+
if response_hook:
404+
response_hook(self.client_connection.last_response_headers)
400405
if not return_properties:
401406
return DatabaseProxy(self.client_connection, id=result["id"], properties=result)
402407
return DatabaseProxy(self.client_connection, id=result["id"], properties=result), result
@@ -408,6 +413,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
408413
*,
409414
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
410415
initial_headers: Optional[dict[str, str]] = None,
416+
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
411417
throughput_bucket: Optional[int] = None,
412418
return_properties: Literal[False] = False,
413419
**kwargs: Any
@@ -422,12 +428,12 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
422428
offer throughput if they differ from what is passed in.
423429
424430
:param str id: ID (name) of the database to read or create.
425-
:keyword offer_throughput: The provisioned throughput for this offer.
426-
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
431+
:keyword Union[int, ~azure.cosmos.ThroughputProperties] offer_throughput: The provisioned throughput
432+
for this database.
427433
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
428-
:keyword response_hook: A callable invoked with the response metadata.
434+
:keyword Callable[[dict[str, str], dict[str, Any]], None] response_hook: A callable invoked with
435+
the response metadata.
429436
:keyword int throughput_bucket: The desired throughput bucket for the client
430-
:paramtype response_hook: Callable[[dict[str, str], dict[str, Any]], None]
431437
:keyword bool return_properties: Specifies whether to return either a DatabaseProxy
432438
or a Tuple containing a DatabaseProxy and the associated database properties.
433439
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The database read or creation failed.
@@ -443,6 +449,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
443449
*,
444450
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
445451
initial_headers: Optional[dict[str, str]] = None,
452+
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
446453
throughput_bucket: Optional[int] = None,
447454
return_properties: Literal[True],
448455
**kwargs: Any
@@ -457,12 +464,12 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
457464
offer throughput if they differ from what is passed in.
458465
459466
:param str id: ID (name) of the database to read or create.
460-
:keyword offer_throughput: The provisioned throughput for this offer.
461-
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
467+
:keyword Union[int, ~azure.cosmos.ThroughputProperties] offer_throughput: The provisioned throughput
468+
for this database.
462469
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
463-
:keyword response_hook: A callable invoked with the response metadata.
470+
:keyword Callable[[dict[str, str], dict[str, Any]], None] response_hook: A callable invoked with
471+
the response metadata.
464472
:keyword int throughput_bucket: The desired throughput bucket for the client
465-
:paramtype response_hook: Callable[[dict[str, str], dict[str, Any]], None]
466473
:keyword bool return_properties: Specifies whether to return either a DatabaseProxy
467474
or a Tuple containing a DatabaseProxy and the associated database properties.
468475
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The database read or creation failed.
@@ -488,12 +495,12 @@ async def create_database_if_not_exists( # pylint:disable=docstring-should-be-ke
488495
489496
:param Any args: args
490497
:param str id: ID (name) of the database to read or create.
491-
:keyword offer_throughput: The provisioned throughput for this offer.
492-
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
498+
:keyword Union[int, ~azure.cosmos.ThroughputProperties] offer_throughput: The provisioned throughput
499+
for this database.
493500
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
494-
:keyword response_hook: A callable invoked with the response metadata.
501+
:keyword Callable[[dict[str, str], dict[str, Any]], None] response_hook: A callable invoked with
502+
the response metadata.
495503
:keyword int throughput_bucket: The desired throughput bucket for the client
496-
:paramtype response_hook: Callable[[dict[str, str], dict[str, Any]], None]
497504
:keyword bool return_properties: Specifies whether to return either a DatabaseProxy
498505
or a Tuple containing a DatabaseProxy and the associated database properties.
499506
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The database read or creation failed.

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2284,7 +2284,7 @@ async def read_items(
22842284
raise ValueError("Could not find partition key definition for collection.")
22852285

22862286
# Extract and remove max_concurrency from kwargs
2287-
max_concurrency = kwargs.pop('max_concurrency', 10)
2287+
max_concurrency = kwargs.pop('max_concurrency', None)
22882288
helper = ReadItemsHelperAsync(
22892289
client=self,
22902290
collection_link=collection_link,

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_read_items_helper_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def __init__(
4848
items: Sequence[Tuple[str, PartitionKeyType]],
4949
options: Optional[Mapping[str, Any]],
5050
partition_key_definition: dict[str, Any],
51-
max_concurrency: int = 10,
51+
max_concurrency: int = 5,
5252
**kwargs: Any
5353
):
5454
self.client = client
@@ -57,7 +57,7 @@ def __init__(
5757
self.options = dict(options) if options is not None else {}
5858
self.partition_key_definition = partition_key_definition
5959
self.kwargs = kwargs
60-
self.max_concurrency = max_concurrency
60+
self.max_concurrency = max_concurrency if max_concurrency and max_concurrency > 0 else 5
6161
self.max_items_per_query = 1000
6262

6363
async def read_items(self) -> 'CosmosList':

sdk/cosmos/azure-cosmos/azure/cosmos/container.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ def read_items(
292292
items: Sequence[Tuple[str, PartitionKeyType]],
293293
*,
294294
executor: Optional[ThreadPoolExecutor] = None,
295-
max_concurrency: int = 10,
295+
max_concurrency: Optional[int] = None,
296296
consistency_level: Optional[str] = None,
297297
session_token: Optional[str] = None,
298298
initial_headers: Optional[dict[str, str]] = None,
@@ -311,7 +311,8 @@ def read_items(
311311
:keyword executor: Optional ThreadPoolExecutor for handling concurrent operations.
312312
If not provided, a new executor will be created as needed.
313313
:keyword int max_concurrency: The maximum number of concurrent operations for the
314-
items request. This value is ignored if an executor is provided. Defaults to 10.
314+
items request. This value is ignored if an executor is provided. If not specified,
315+
the default max_concurrency defined by Python's ThreadPoolExecutor will be applied.
315316
:keyword str consistency_level: The consistency level to use for the request.
316317
:keyword str session_token: Token for use with Session consistency.
317318
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.

0 commit comments

Comments
 (0)