Skip to content

Commit f32da5e

Browse files
andrewmathew1Andrew Mathew
andauthored
Adding Throughput Bucket Header (#40340)
* inital commit for throughput bucket * added test for throughput bucket * got rid of extra comment * sync part of throughput headers * added async part with new test file * added async part with tests * got rid of extra space * added _async label to test_headers_async * removed extra line * removed unnecessary syntax * made requested changes on pr, mostly for async tests * added another finally block to test_headers * edited replace container test * added sample files and section to the readme * got rid of trailing whitespace * added negative test, added to changelog * added header to changelog * got rid of extra , * edited readme for new sample file names * marked negative test as TODO --------- Co-authored-by: Andrew Mathew <[email protected]>
1 parent 1d19d94 commit f32da5e

16 files changed

+1039
-8
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 4.10.0b5 (Unreleased)
44

55
#### Features Added
6+
* Added ability to set `throughput_bucket` header at the client level and for all requests. See [PR 40340](https://github.com/Azure/azure-sdk-for-python/pull/40340).
67
* Added ability to use Filters from Logging module on Diagnostics Logging based on Http request/response related attributes. See [PR 39897](https://github.com/Azure/azure-sdk-for-python/pull/39897)
78

89
#### Breaking Changes

sdk/cosmos/azure-cosmos/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -874,6 +874,20 @@ may have additional latencies associated with searching in the service.
874874

875875
You can find our sync samples [here][cosmos_index_sample] and our async samples [here][cosmos_index_sample_async] as well for additional guidance.
876876

877+
### Public Preview - Throughput Buckets
878+
When multiple workloads share the same Azure Cosmos DB container, resource contention can lead to throttling, increased latency, and potential business impact.
879+
To address this, Cosmos DB allows you to allocate throughput buckets, which help manage resource consumption for workloads sharing a Cosmos DB container by limiting the maximum throughput a bucket can consume.
880+
However, throughput isn't reserved for any bucket, it remains shared across all workloads.
881+
882+
Up to five (5) throughput buckets can be configured per container, with an ID ranging from 1-5. Each bucket has a maximum throughput percentage, capping the fraction of the container’s total throughput that it can consume.
883+
Requests assigned to a bucket can consume throughput only up to this limit. If the bucket exceeds its configured limit, subsequent requests are throttled.
884+
This ensures that no single workload consumes excessive throughput and impacts others.
885+
886+
Throughput bucket configurations can be changed once every 10 minutes, otherwise the request is throttled with an HTTP 429 status code and substatus code 3213.
887+
Also, requests with an invalid bucket ID (less than 1 or greater than 5) results in an error, as only bucket IDs 1 to 5 are valid.
888+
889+
You can find our sync samples [here][cosmos_throughput_bucket_sample] and our async samples [here][cosmos_throughput_bucket_sample_async] as well for additional guidance.
890+
877891
## Troubleshooting
878892

879893
### General
@@ -1046,6 +1060,8 @@ For more extensive documentation on the Cosmos DB service, see the [Azure Cosmos
10461060
[BM25]: https://learn.microsoft.com/azure/search/index-similarity-and-scoring
10471061
[cosmos_fts]: https://aka.ms/cosmosfulltextsearch
10481062
[cosmos_index_policy_change]: https://learn.microsoft.com/azure/cosmos-db/index-policy#modifying-the-indexing-policy
1063+
[cosmos_throughput_bucket_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/throughput_bucket_management.py
1064+
[cosmos_throughput_bucket_sample_async]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/throughput_bucket_management_async.py
10491065
[cosmos_diagnostics_filter_sample]: https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/cosmos/azure-cosmos/samples/diagnostics_filter_sample.py
10501066

10511067
## Contributing

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
'priority': 'priorityLevel',
6464
'no_response': 'responsePayloadOnWriteDisabled',
6565
'max_item_count': 'maxItemCount',
66-
'excluded_locations': 'excludedLocations',
66+
'throughput_bucket': 'throughputBucket',
67+
'excluded_locations': 'excludedLocations'
6768
}
6869

6970
# Cosmos resource ID validation regex breakdown:
@@ -318,6 +319,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
318319
if options.get("correlatedActivityId"):
319320
headers[http_constants.HttpHeaders.CorrelatedActivityId] = options["correlatedActivityId"]
320321

322+
if options.get("throughputBucket"):
323+
headers[http_constants.HttpHeaders.ThroughputBucket] = options["throughputBucket"]
324+
321325
if resource_type == "docs" and verb != "get":
322326
if "responsePayloadOnWriteDisabled" in options:
323327
responsePayloadOnWriteDisabled = options["responsePayloadOnWriteDisabled"]

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ def __init__( # pylint: disable=too-many-statements
160160
http_constants.HttpHeaders.IsContinuationExpected: False,
161161
}
162162

163+
throughput_bucket = kwargs.pop('throughput_bucket', None)
164+
if throughput_bucket:
165+
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket
166+
163167
# Keeps the latest response headers from the server.
164168
self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()
165169

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py

Lines changed: 44 additions & 0 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword
171171
level (to log all requests) or at a single request level. Requests will be logged at INFO level.
172172
:keyword bool no_response_on_write: Indicates whether service should be instructed to skip sending
173173
response payloads for write operations on items by default unless specified differently per operation.
174+
:keyword int throughput_bucket: The desired throughput bucket for the client
174175
175176
.. admonition:: Example:
176177
@@ -255,6 +256,7 @@ async def create_database(
255256
*,
256257
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
257258
initial_headers: Optional[Dict[str, str]] = None,
259+
throughput_bucket: Optional[int] = None,
258260
**kwargs: Any
259261
) -> DatabaseProxy:
260262
"""
@@ -265,6 +267,7 @@ async def create_database(
265267
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
266268
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
267269
:keyword response_hook: A callable invoked with the response metadata.
270+
:keyword int throughput_bucket: The desired throughput bucket for the client
268271
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
269272
:raises ~azure.cosmos.exceptions.CosmosResourceExistsError: Database with the given ID already exists.
270273
:returns: A DatabaseProxy instance representing the new database.
@@ -300,6 +303,8 @@ async def create_database(
300303
DeprecationWarning)
301304
if initial_headers is not None:
302305
kwargs["initial_headers"] = initial_headers
306+
if throughput_bucket is not None:
307+
kwargs["throughput_bucket"] = throughput_bucket
303308
request_options = _build_options(kwargs)
304309
_set_throughput_options(offer=offer_throughput, request_options=request_options)
305310

@@ -313,6 +318,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
313318
*,
314319
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
315320
initial_headers: Optional[Dict[str, str]] = None,
321+
throughput_bucket: Optional[int] = None,
316322
**kwargs: Any
317323
) -> DatabaseProxy:
318324
"""
@@ -329,6 +335,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
329335
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
330336
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
331337
:keyword response_hook: A callable invoked with the response metadata.
338+
:keyword int throughput_bucket: The desired throughput bucket for the client
332339
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
333340
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The database read or creation failed.
334341
:returns: A DatabaseProxy instance representing the database.
@@ -352,6 +359,8 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
352359
"The 'match_condition' flag does not apply to this method and is always ignored even if passed."
353360
" It will now be removed in the future.",
354361
DeprecationWarning)
362+
if throughput_bucket is not None:
363+
kwargs["throughput_bucket"] = throughput_bucket
355364
if initial_headers is not None:
356365
kwargs["initial_headers"] = initial_headers
357366
try:
@@ -389,6 +398,7 @@ def list_databases(
389398
max_item_count: Optional[int] = None,
390399
initial_headers: Optional[Dict[str, str]] = None,
391400
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
401+
throughput_bucket: Optional[int] = None,
392402
**kwargs: Any
393403
) -> AsyncItemPaged[Dict[str, Any]]:
394404
"""List the databases in a Cosmos DB SQL database account.
@@ -397,6 +407,7 @@ def list_databases(
397407
:keyword str session_token: Token for use with Session consistency.
398408
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
399409
:keyword response_hook: A callable invoked with the response metadata.
410+
:keyword int throughput_bucket: The desired throughput bucket for the client
400411
:paramtype response_hook: Callable[[Mapping[str, Any]], None]
401412
:returns: An AsyncItemPaged of database properties (dicts).
402413
:rtype: AsyncItemPaged[Dict[str, str]]
@@ -409,6 +420,8 @@ def list_databases(
409420
DeprecationWarning)
410421
if initial_headers is not None:
411422
kwargs["initial_headers"] = initial_headers
423+
if throughput_bucket is not None:
424+
kwargs["throughput_bucket"] = throughput_bucket
412425
feed_options = _build_options(kwargs)
413426
if max_item_count is not None:
414427
feed_options["maxItemCount"] = max_item_count
@@ -427,6 +440,7 @@ def query_databases(
427440
max_item_count: Optional[int] = None,
428441
initial_headers: Optional[Dict[str, str]] = None,
429442
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
443+
throughput_bucket: Optional[int] = None,
430444
**kwargs: Any
431445
) -> AsyncItemPaged[Dict[str, Any]]:
432446
"""Query the databases in a Cosmos DB SQL database account.
@@ -439,6 +453,7 @@ def query_databases(
439453
:keyword str session_token: Token for use with Session consistency.
440454
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
441455
:keyword response_hook: A callable invoked with the response metadata.
456+
:keyword int throughput_bucket: The desired throughput bucket for the client
442457
:paramtype response_hook: Callable[[Mapping[str, Any]], None]
443458
:returns: An AsyncItemPaged of database properties (dicts).
444459
:rtype: AsyncItemPaged[Dict[str, str]]
@@ -451,6 +466,8 @@ def query_databases(
451466
DeprecationWarning)
452467
if initial_headers is not None:
453468
kwargs["initial_headers"] = initial_headers
469+
if throughput_bucket is not None:
470+
kwargs['throughput_bucket'] = throughput_bucket
454471
feed_options = _build_options(kwargs)
455472
if max_item_count is not None:
456473
feed_options["maxItemCount"] = max_item_count
@@ -470,6 +487,7 @@ async def delete_database(
470487
*,
471488
initial_headers: Optional[Dict[str, str]] = None,
472489
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
490+
throughput_bucket: Optional[int] = None,
473491
**kwargs: Any
474492
) -> None:
475493
"""Delete the database with the given ID (name).
@@ -479,6 +497,7 @@ async def delete_database(
479497
:type database: Union[str, ~azure.cosmos.DatabaseProxy, Dict[str, Any]]
480498
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
481499
:keyword response_hook: A callable invoked with the response metadata.
500+
:keyword int throughput_bucket: The desired throughput bucket for the client
482501
:paramtype response_hook: Callable[[Mapping[str, Any]], None]
483502
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the database couldn't be deleted.
484503
:rtype: None
@@ -501,7 +520,8 @@ async def delete_database(
501520
"The 'match_condition' flag does not apply to this method and is always ignored even if passed."
502521
" It will now be removed in the future.",
503522
DeprecationWarning)
504-
523+
if throughput_bucket is not None:
524+
kwargs['throughput_bucket'] = throughput_bucket
505525
if initial_headers is not None:
506526
kwargs["initial_headers"] = initial_headers
507527
request_options = _build_options(kwargs)

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ def __init__( # pylint: disable=too-many-statements
163163
http_constants.HttpHeaders.IsContinuationExpected: False,
164164
}
165165

166+
throughput_bucket = kwargs.pop('throughput_bucket', None)
167+
if throughput_bucket:
168+
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket
169+
166170
if consistency_level is not None:
167171
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level
168172

0 commit comments

Comments
 (0)