Skip to content

Commit 210ab0b

Browse files
authored
Separate out Cosmos Python transactional batch sample (#36726)
* Separate out Cosmos Python transactional batch sample * revert document_management.py * add async transactional batch sample * revert document_management.py
1 parent 90bb6da commit 210ab0b

File tree

2 files changed

+278
-0
lines changed

2 files changed

+278
-0
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# -------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See LICENSE.txt in the project root for
4+
# license information.
5+
# -------------------------------------------------------------------------
6+
import azure.cosmos.cosmos_client as cosmos_client
7+
import azure.cosmos.exceptions as exceptions
8+
from azure.cosmos.http_constants import StatusCodes
9+
from azure.cosmos.partition_key import PartitionKey
10+
import datetime
11+
12+
import config
13+
14+
# ----------------------------------------------------------------------------------------------------------
15+
# Prerequisites -
16+
#
17+
# 1. An Azure Cosmos account -
18+
# https://learn.microsoft.com/azure/cosmos-db/nosql/quickstart-portal#create-account
19+
#
20+
# 2. Microsoft Azure Cosmos PyPi package -
21+
# https://pypi.python.org/pypi/azure-cosmos/
22+
# ----------------------------------------------------------------------------------------------------------
23+
# Sample - demonstrates Transactional Batch for Azure Cosmos DB Python SDK
24+
# ----------------------------------------------------------------------------------------------------------
25+
26+
HOST = config.settings['host']
27+
MASTER_KEY = config.settings['master_key']
28+
DATABASE_ID = config.settings['database_id']
29+
CONTAINER_ID = "batch_container"
30+
31+
32+
def execute_item_batch(database, container):
33+
print('\n1.11 Executing Batch Item operations\n')
34+
35+
# We create three items to use for the sample. These are not part of the batch operations
36+
container.create_item(get_sales_order("read_item"))
37+
container.create_item(get_sales_order("delete_item"))
38+
container.create_item(get_sales_order("replace_item"))
39+
40+
# We create our batch operations
41+
create_item_operation = ("create", (get_sales_order("create_item"),))
42+
upsert_item_operation = ("upsert", (get_sales_order("upsert_item"),))
43+
read_item_operation = ("read", ("read_item",))
44+
delete_item_operation = ("delete", ("delete_item",))
45+
replace_item_operation = ("replace", ("replace_item", {"id": "replace_item", 'account_number': 'Account1',
46+
"message": "item was replaced"}))
47+
replace_item_if_match_operation = ("replace",
48+
("replace_item", {"id": "replace_item", 'account_number': 'Account1',
49+
"message": "item was replaced"}),
50+
{"if_match_etag": container.client_connection.last_response_headers.get("etag")})
51+
replace_item_if_none_match_operation = ("replace",
52+
("replace_item", {"id": "replace_item", 'account_number': 'Account1',
53+
"message": "item was replaced"}),
54+
{"if_none_match_etag":
55+
container.client_connection.last_response_headers.get("etag")})
56+
57+
# Put our operations into a list
58+
batch_operations = [
59+
create_item_operation,
60+
upsert_item_operation,
61+
read_item_operation,
62+
delete_item_operation,
63+
replace_item_operation,
64+
# This below operation fails with status code 412, causing batch to fail and all operations to roll back
65+
replace_item_if_match_operation, # -> Comment this line out to see batch operations succeeding.
66+
replace_item_if_none_match_operation]
67+
68+
# Run that list of operations
69+
try:
70+
# Batch results are returned as a list of item operation results - or raise a CosmosBatchOperationError if
71+
# one of the operations failed within your batch request.
72+
batch_results = container.execute_item_batch(batch_operations=batch_operations, partition_key="Account1")
73+
print("\nResults for the batch operations: {}\n".format(batch_results))
74+
75+
# For error handling, use try/ except with CosmosBatchOperationError and use the information in the
76+
# error returned for your application debugging, making it easy to pinpoint the failing operation
77+
except exceptions.CosmosBatchOperationError as e:
78+
error_operation_index = e.error_index
79+
error_operation_response = e.operation_responses[error_operation_index]
80+
error_operation = batch_operations[error_operation_index]
81+
print("\nError operation: {}, error operation response: {}\n".format(error_operation, error_operation_response))
82+
print("\nAn error occurred in the batch operation. All operations have been rolled back.\n")
83+
84+
85+
# You can also use this logic to read directly from a file into the batch you'd like to create:
86+
# with open("file_name.txt", "r") as data_file:
87+
# container.execute_item_batch([("upsert", (t,)) for t in data_file.readlines()])
88+
89+
90+
def get_sales_order(item_id):
91+
order1 = {'id': item_id,
92+
'account_number': 'Account1',
93+
'purchase_order_number': 'PO18009186470',
94+
'order_date': datetime.date(2005, 1, 10).strftime('%c'),
95+
'subtotal': 419.4589,
96+
'tax_amount': 12.5838,
97+
'freight': 472.3108,
98+
'total_due': 985.018,
99+
'items': [
100+
{'order_qty': 1,
101+
'product_id': 100,
102+
'unit_price': 418.4589,
103+
'line_price': 418.4589
104+
}
105+
],
106+
'ttl': 60 * 60 * 24 * 30
107+
}
108+
109+
return order1
110+
111+
112+
def run_sample():
113+
client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY})
114+
try:
115+
# setup database for this sample
116+
db = client.create_database_if_not_exists(id=DATABASE_ID)
117+
# setup container for this sample
118+
container = db.create_container_if_not_exists(id=CONTAINER_ID,
119+
partition_key=PartitionKey(path='/account_number'))
120+
execute_item_batch(db, container)
121+
122+
# cleanup database after sample
123+
try:
124+
client.delete_database(db)
125+
126+
except exceptions.CosmosResourceNotFoundError:
127+
pass
128+
129+
except exceptions.CosmosHttpResponseError as e:
130+
print('\nrun_sample has caught an error. {0}'.format(e.message))
131+
132+
finally:
133+
print("\nrun_sample done")
134+
135+
136+
if __name__ == '__main__':
137+
run_sample()
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# -------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See LICENSE.txt in the project root for
4+
# license information.
5+
# -------------------------------------------------------------------------
6+
from azure.cosmos.aio import CosmosClient
7+
import azure.cosmos.exceptions as exceptions
8+
from azure.cosmos.http_constants import StatusCodes
9+
from azure.cosmos.partition_key import PartitionKey
10+
import datetime
11+
12+
import asyncio
13+
import config
14+
15+
# ----------------------------------------------------------------------------------------------------------
16+
# Prerequisites -
17+
#
18+
# 1. An Azure Cosmos account -
19+
# https://learn.microsoft.com/azure/cosmos-db/nosql/quickstart-portal#create-account
20+
#
21+
# 2. Microsoft Azure Cosmos PyPi package -
22+
# https://pypi.python.org/pypi/azure-cosmos/
23+
# ----------------------------------------------------------------------------------------------------------
24+
# Sample - demonstrates Transactional Batch for Azure Cosmos DB Python SDK async
25+
# ----------------------------------------------------------------------------------------------------------
26+
27+
HOST = config.settings['host']
28+
MASTER_KEY = config.settings['master_key']
29+
DATABASE_ID = config.settings['database_id']
30+
CONTAINER_ID = "batch_container"
31+
32+
33+
async def execute_item_batch(database, container):
34+
print('\n1.11 Executing Batch Item operations\n')
35+
36+
# We create three items to use for the sample. These are not part of the batch operations
37+
await container.create_item(get_sales_order("read_item"))
38+
await container.create_item(get_sales_order("delete_item"))
39+
await container.create_item(get_sales_order("replace_item"))
40+
41+
# We create our batch operations
42+
create_item_operation = ("create", (get_sales_order("create_item"),))
43+
upsert_item_operation = ("upsert", (get_sales_order("upsert_item"),))
44+
read_item_operation = ("read", ("read_item",))
45+
delete_item_operation = ("delete", ("delete_item",))
46+
replace_item_operation = ("replace", ("replace_item", {"id": "replace_item", 'account_number': 'Account1',
47+
"message": "item was replaced"}))
48+
replace_item_if_match_operation = ("replace",
49+
("replace_item", {"id": "replace_item", 'account_number': 'Account1',
50+
"message": "item was replaced"}),
51+
{"if_match_etag": container.client_connection.last_response_headers.get("etag")})
52+
replace_item_if_none_match_operation = ("replace",
53+
("replace_item", {"id": "replace_item", 'account_number': 'Account1',
54+
"message": "item was replaced"}),
55+
{"if_none_match_etag":
56+
container.client_connection.last_response_headers.get("etag")})
57+
58+
# Put our operations into a list
59+
batch_operations = [
60+
create_item_operation,
61+
upsert_item_operation,
62+
read_item_operation,
63+
delete_item_operation,
64+
replace_item_operation,
65+
# This below operation fails with status code 412, causing batch to fail and all operations to roll back
66+
replace_item_if_match_operation, # -> Comment this line out to see batch operations succeeding.
67+
replace_item_if_none_match_operation]
68+
69+
# Run that list of operations
70+
try:
71+
# Batch results are returned as a list of item operation results - or raise a CosmosBatchOperationError if
72+
# one of the operations failed within your batch request.
73+
batch_results = await container.execute_item_batch(batch_operations=batch_operations, partition_key="Account1")
74+
print("\nResults for the batch operations: {}\n".format(batch_results))
75+
76+
# For error handling, use try/ except with CosmosBatchOperationError and use the information in the
77+
# error returned for your application debugging, making it easy to pinpoint the failing operation
78+
except exceptions.CosmosBatchOperationError as e:
79+
error_operation_index = e.error_index
80+
error_operation_response = e.operation_responses[error_operation_index]
81+
error_operation = batch_operations[error_operation_index]
82+
print("\nError operation: {}, error operation response: {}\n".format(error_operation, error_operation_response))
83+
print("\nAn error occurred in the batch operation. All operations have been rolled back.\n")
84+
85+
86+
# You can also use this logic to read directly from a file into the batch you'd like to create:
87+
# with open("file_name.txt", "r") as data_file:
88+
# container.execute_item_batch([("upsert", (t,)) for t in data_file.readlines()])
89+
90+
91+
def get_sales_order(item_id):
92+
order1 = {'id': item_id,
93+
'account_number': 'Account1',
94+
'purchase_order_number': 'PO18009186470',
95+
'order_date': datetime.date(2005, 1, 10).strftime('%c'),
96+
'subtotal': 419.4589,
97+
'tax_amount': 12.5838,
98+
'freight': 472.3108,
99+
'total_due': 985.018,
100+
'items': [
101+
{'order_qty': 1,
102+
'product_id': 100,
103+
'unit_price': 418.4589,
104+
'line_price': 418.4589
105+
}
106+
],
107+
'ttl': 60 * 60 * 24 * 30
108+
}
109+
110+
return order1
111+
112+
113+
async def run_sample():
114+
async with CosmosClient(HOST, {'masterKey': MASTER_KEY}) as client:
115+
try:
116+
# setup database for this sample
117+
db = await client.create_database_if_not_exists(id=DATABASE_ID)
118+
119+
# setup container for this sample
120+
container = await db.create_container_if_not_exists(id="batch_container",
121+
partition_key=PartitionKey(path='/account_number'))
122+
print('Container with id \'{0}\' created'.format(CONTAINER_ID))
123+
124+
await execute_item_batch(db, container)
125+
126+
# cleanup database after sample
127+
try:
128+
await client.delete_database(db)
129+
130+
except exceptions.CosmosResourceNotFoundError:
131+
pass
132+
133+
except exceptions.CosmosHttpResponseError as e:
134+
print('\nrun_sample has caught an error. {0}'.format(e.message))
135+
136+
finally:
137+
print("\nrun_sample done")
138+
139+
140+
if __name__ == '__main__':
141+
asyncio.run(run_sample())

0 commit comments

Comments
 (0)