Skip to content

Commit 2cf408d

Browse files
Copilotallenkim0129
andcommitted
Address review comments: update docstrings, add tests, fix async version
- Add max_concurrency to docstring in _cosmos_client_connection.py - Update docstring to use generic language about ThreadPoolExecutor default - Add tests for max_concurrency ignored with executor and Python default behavior - Update async version of read_items to also use Optional[int] for max_concurrency - Update async _cosmos_client_connection_async.py to use None as default Co-authored-by: allenkim0129 <[email protected]>
1 parent 58fa2d9 commit 2cf408d

File tree

5 files changed

+72
-5
lines changed

5 files changed

+72
-5
lines changed

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,6 +1068,9 @@ def read_items(
10681068
:param dict options: The request options for the request.
10691069
:keyword executor: Optional ThreadPoolExecutor for thread management
10701070
:paramtype executor: Optional[ThreadPoolExecutor]
1071+
:keyword int max_concurrency: The maximum number of concurrent operations for the items request.
1072+
This value is ignored if an executor is provided. If not specified, the default max_concurrency
1073+
defined by Python's ThreadPoolExecutor will be applied.
10711074
:return: The list of read items.
10721075
:rtype: CosmosList
10731076
"""

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ async def read_items(
437437
self,
438438
items: Sequence[Tuple[str, PartitionKeyType]],
439439
*,
440-
max_concurrency: int = 10,
440+
max_concurrency: Optional[int] = None,
441441
consistency_level: Optional[str] = None,
442442
session_token: Optional[str] = None,
443443
initial_headers: Optional[Dict[str, str]] = None,
@@ -454,7 +454,8 @@ async def read_items(
454454
:param items: A list of tuples, where each tuple contains an item's ID and partition key.
455455
:type items: Sequence[Tuple[str, PartitionKeyType]]
456456
:keyword int max_concurrency: The maximum number of concurrent operations for the read_items
457-
request. Defaults to 10.
457+
request. If not specified, the default max_concurrency defined by Python's ThreadPoolExecutor
458+
will be applied.
458459
:keyword str consistency_level: The consistency level to use for the request.
459460
:keyword str session_token: Token for use with Session consistency.
460461
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2279,6 +2279,8 @@ async def read_items(
22792279
:param items: A list of tuples, where each tuple contains an item's ID and partition key.
22802280
:type items: Sequence[Tuple[str, _PartitionKeyType]]
22812281
:param dict options: The request options for the request.
2282+
:keyword int max_concurrency: The maximum number of concurrent operations for the items request.
2283+
If not specified, the default max_concurrency defined by Python's ThreadPoolExecutor will be applied.
22822284
:return: The list of read items.
22832285
:rtype: CosmosList
22842286
"""
@@ -2292,7 +2294,7 @@ async def read_items(
22922294
raise ValueError("Could not find partition key definition for collection.")
22932295

22942296
# Extract and remove max_concurrency from kwargs
2295-
max_concurrency = kwargs.pop('max_concurrency', 10)
2297+
max_concurrency = kwargs.pop('max_concurrency', None)
22962298
helper = ReadItemsHelperAsync(
22972299
client=self,
22982300
collection_link=collection_link,

sdk/cosmos/azure-cosmos/azure/cosmos/container.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ def read_items(
313313
If not provided, a new executor will be created as needed.
314314
:keyword int max_concurrency: The maximum number of concurrent operations for the
315315
items request. This value is ignored if an executor is provided. If not specified,
316-
defaults to Python's ThreadPoolExecutor default of min(32, (os.cpu_count() or 1) + 4).
316+
the default max_concurrency defined by Python's ThreadPoolExecutor will be applied.
317317
:keyword str consistency_level: The consistency level to use for the request.
318318
:keyword str session_token: Token for use with Session consistency.
319319
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.

sdk/cosmos/azure-cosmos/tests/test_max_concurrency.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
# -*- coding: utf-8 -*-
22
# The MIT License (MIT)
33
# Copyright (c) Microsoft Corporation. All rights reserved.
4+
import os
45
import unittest
56
from concurrent.futures import ThreadPoolExecutor
6-
from unittest.mock import Mock
7+
from unittest.mock import Mock, patch
78
from azure.cosmos._read_items_helper import ReadItemsHelperSync
89

910

@@ -49,6 +50,66 @@ def test_max_concurrency_value_is_preserved(self):
4950
# Verify that max_concurrency is set correctly
5051
self.assertEqual(helper.max_concurrency, 5)
5152

53+
def test_max_concurrency_ignored_with_executor(self):
54+
"""Test that max_concurrency is ignored if an executor is provided."""
55+
# Mock client connection
56+
mock_client = Mock()
57+
mock_client._routing_map_provider = Mock()
58+
mock_client._routing_map_provider.get_overlapping_ranges = Mock(return_value=[])
59+
60+
# Create a mock executor
61+
mock_executor = Mock(spec=ThreadPoolExecutor)
62+
63+
# Create helper with both executor and max_concurrency
64+
helper = ReadItemsHelperSync(
65+
client=mock_client,
66+
collection_link="/dbs/test/colls/test",
67+
items=[],
68+
options={},
69+
partition_key_definition={"paths": ["/id"], "kind": "Hash"},
70+
executor=mock_executor,
71+
max_concurrency=5
72+
)
73+
74+
# Verify that executor is set and max_concurrency is preserved
75+
self.assertEqual(helper.executor, mock_executor)
76+
self.assertEqual(helper.max_concurrency, 5)
77+
78+
# When read_items is called with empty items, it returns early
79+
# But we can verify the executor is stored
80+
result = helper.read_items()
81+
self.assertIsNotNone(result)
82+
83+
def test_default_max_concurrency_uses_python_default(self):
84+
"""Test that if max_concurrency not specified, defaults to Python's ThreadPoolExecutor default."""
85+
# Get Python's default calculation
86+
cpu_count = os.cpu_count() or 1
87+
expected_default = min(32, cpu_count + 4)
88+
89+
# Create a ThreadPoolExecutor with None to see what Python uses
90+
with ThreadPoolExecutor(max_workers=None) as executor:
91+
actual_workers = executor._max_workers
92+
93+
# Verify the expected default matches Python's calculation
94+
self.assertEqual(actual_workers, expected_default)
95+
96+
# Now verify that our helper will pass None to ThreadPoolExecutor
97+
mock_client = Mock()
98+
mock_client._routing_map_provider = Mock()
99+
mock_client._routing_map_provider.get_overlapping_ranges = Mock(return_value=[])
100+
101+
helper = ReadItemsHelperSync(
102+
client=mock_client,
103+
collection_link="/dbs/test/colls/test",
104+
items=[],
105+
options={},
106+
partition_key_definition={"paths": ["/id"], "kind": "Hash"},
107+
max_concurrency=None
108+
)
109+
110+
# Verify None is passed through
111+
self.assertIsNone(helper.max_concurrency)
112+
52113

53114
if __name__ == '__main__':
54115
unittest.main()

0 commit comments

Comments
 (0)