Skip to content

Commit c13ec18

Browse files
bambrizsimorenoh
andauthored
Add Concurrency Sample (#33637)
* create concurrency sample This adds a new sample file to showcase using concurrency to improve performance. * update changelog * update Changelog and Sample Comments Removed changelog entry as this is just a sample file. I also added a note to remove confusion between the use technical use of the word batch and the feature of batch operations in cosmos db. * update readme Added an entry for the sample in the README. Also included a warning that the sample will consume a lot of RUs * Update sdk/cosmos/azure-cosmos/README.md Co-authored-by: Simon Moreno <[email protected]> --------- Co-authored-by: Simon Moreno <[email protected]>
1 parent 75d6c26 commit c13ec18

File tree

2 files changed

+116
-0
lines changed

2 files changed

+116
-0
lines changed

sdk/cosmos/azure-cosmos/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,13 @@ Streamable queries like `SELECT * FROM WHERE` *do* support continuation tokens.
178178

179179
Typically, you can use [Azure Portal](https://portal.azure.com/), [Azure Cosmos DB Resource Provider REST API](https://docs.microsoft.com/rest/api/cosmos-db-resource-provider), [Azure CLI](https://docs.microsoft.com/cli/azure/azure-cli-reference-for-cosmos-db) or [PowerShell](https://docs.microsoft.com/azure/cosmos-db/manage-with-powershell) for the control plane unsupported limitations.
180180

181+
### Using The Async Client as a Workaround to Bulk
182+
While the SDK supports transactional batch, support for bulk requests is not yet implemented in the Python SDK. You can use the async client along with this [concurrency sample][concurrency_sample] we have developed as a reference for a possible workaround.
183+
>[WARNING]
184+
> Using the asynchronous client for concurrent operations like shown in this sample will consume a lot of RUs very fast. We **strongly recommend** testing this out against the cosmos emulator first to verify your code works well and avoid incurring charges.
185+
186+
187+
181188
## Boolean Data Type
182189

183190
While the Python language [uses](https://docs.python.org/3/library/stdtypes.html?highlight=boolean#truth-value-testing) "True" and "False" for boolean types, Cosmos DB [accepts](https://docs.microsoft.com/azure/cosmos-db/sql-query-is-bool) "true" and "false" only. In other words, the Python language uses Boolean values with the first uppercase letter and all other lowercase letters, while Cosmos DB and its SQL language use only lowercase letters for those same Boolean values. How to deal with this challenge?
@@ -757,6 +764,7 @@ For more extensive documentation on the Cosmos DB service, see the [Azure Cosmos
757764
[telemetry_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/tracing_open_telemetry.py
758765
[timeouts_document]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/docs/TimeoutAndRetriesConfig.md
759766
[cosmos_transactional_batch]: https://learn.microsoft.com/azure/cosmos-db/nosql/transactional-batch
767+
[cosmos_concurrency_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/concurrency_sample.py
760768

761769
## Contributing
762770

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# -------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See LICENSE.txt in the project root for
4+
# license information.
5+
# -------------------------------------------------------------------------
6+
# These examples are ingested by the documentation system, and are
7+
# displayed in the SDK reference documentation. When editing these
8+
# example snippets, take into consideration how this might affect
9+
# the readability and usability of the reference documentation.
10+
11+
import os
12+
from azure.cosmos import PartitionKey, ThroughputProperties
13+
from azure.cosmos.aio import CosmosClient
14+
import asyncio
15+
import time
16+
17+
# Specify information to connect to the client.
18+
CLEAR_DATABASE = True
19+
CONN_STR = os.environ['CONN_STR']
20+
# Specify information for Database and container.
21+
DB_ID = "Cosmos_Concurrency_DB"
22+
CONT_ID = "Cosmos_Concurrency_Cont"
23+
# specify partition key for the container
24+
pk = PartitionKey(path="/id")
25+
26+
# Batch the creation of items for better optimization on performance.
27+
# Note: Error handling should be in the method being batched. As you will get
28+
# an error for each failed Cosmos DB Operation.
29+
# Note: While the Word `Batch` here is used to describe the subsets of data being created, it is not referring
30+
# to batch operations such as `Transactional Batching` which is a feature of Cosmos DB.
31+
async def create_all_the_items(prefix, c, i):
32+
await asyncio.wait(
33+
[asyncio.create_task(c.create_item({"id": prefix + str(j)})) for j in range(100)]
34+
)
35+
print(f"Batch {i} done!")
36+
37+
# The following demonstrates the performance difference between using sequential item creation,
38+
# sequential item creation in batches, and concurrent item creation in batches. This is to show best practice
39+
# in using Cosmos DB for performance.
40+
# It’s important to note that batching a bunch of operations can affect throughput/RUs.
41+
# To avoid using resources, it’s recommended to test things on the emulator of Cosmos DB first.
42+
# The performance improvement shown on the emulator is relative to what you will see on a live account
43+
async def main():
44+
try:
45+
async with CosmosClient.from_connection_string(CONN_STR) as client:
46+
# For emulator: default Throughput needs to be increased
47+
# throughput_properties = ThroughputProperties(auto_scale_max_throughput=5000)
48+
# db = await client.create_database_if_not_exists(id=DB_ID, offer_throughput=throughput_properties)
49+
db = await client.create_database_if_not_exists(id=DB_ID)
50+
container = await db.create_container_if_not_exists(CONT_ID, partition_key=pk)
51+
52+
# A: Sequential without batching
53+
timer = time.time()
54+
print("Starting Sequential Item Creation.")
55+
for i in range(20):
56+
for j in range(100):
57+
await container.create_item({"id": f"{i}-sequential-{j}"})
58+
print(f"{(i + 1) * 100} items created!")
59+
sequential_item_time = time.time() - timer
60+
print("Time taken: " + str(sequential_item_time))
61+
62+
63+
# B: Sequential batches
64+
# Batching operations can improve performance by dealing with multiple operations at a time.
65+
timer = time.time()
66+
print("Starting Sequential Batched Item Creation.")
67+
for i in range(20):
68+
await create_all_the_items(f"{i}-sequential-Batch-", container, i)
69+
sequential_batch_time = time.time() - timer
70+
print("Time taken: " + str(sequential_batch_time))
71+
72+
# C: Concurrent batches
73+
# By using asyncio with batching, we can create multiple batches of items concurrently, which means that
74+
# while one connection is waiting for IO (like waiting for data to arrive),
75+
# Python can switch context to another connection and make progress there.
76+
# This can lead to better utilization of system resources and can give the appearance of parallelism,
77+
# as multiple connections are making progress seemingly at the same time
78+
timer = time.time()
79+
print("Starting Concurrent Batched Item Creation.")
80+
await asyncio.wait(
81+
[asyncio.create_task(create_all_the_items(f"{i}-concurrent-Batch", container, i)) for i in range(20)]
82+
)
83+
concurrent_batch_time = time.time() - timer
84+
print("Time taken: " + str(concurrent_batch_time))
85+
86+
# Calculate performance improvement on time metrics.
87+
sequential_per = round((sequential_item_time - sequential_batch_time / sequential_item_time) * 100, 2)
88+
print(f"Sequential Batching is {sequential_per}% faster than Sequential Item Creation")
89+
concurrent_per = round((sequential_item_time - concurrent_batch_time / sequential_item_time) * 100, 2)
90+
print(f"Concurrent Batching is {concurrent_per}% faster than Sequential Item Creation")
91+
92+
item_list = [i async for i in container.read_all_items()]
93+
print(f"End of the test. Read {len(item_list)} items.")
94+
95+
finally:
96+
if CLEAR_DATABASE:
97+
await clear_database()
98+
99+
100+
async def clear_database():
101+
async with CosmosClient.from_connection_string(CONN_STR) as client:
102+
await asyncio.create_task(client.delete_database(DB_ID))
103+
print(f"Deleted {DB_ID} database.")
104+
105+
106+
if __name__ == "__main__":
107+
asyncio.run(main())
108+

0 commit comments

Comments
 (0)