From d992344ba3b48940b3cbfebb6e82eedfefd080f4 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Sat, 1 Feb 2025 09:34:15 -0800 Subject: [PATCH 01/17] Logging changes --- .../cosmos/_cosmos_http_logging_policy.py | 39 ++++++++++++------- .../azure/cosmos/_location_cache.py | 4 ++ .../aio/_cosmos_client_connection_async.py | 4 ++ 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py index 29d2ccee8452..656ca95a1b73 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py @@ -121,9 +121,9 @@ def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: resource_type = self._resource_map['docs'] if self._should_log(verb=verb,database_name=database_name,collection_name=collection_name, resource_type=resource_type, is_request=True): - self._log_client_settings() - self._log_database_account_settings() - super().on_request(request) + # self._log_client_settings() + # self._log_database_account_settings() + # super().on_request(request) self.__request_already_logged = True # pylint: disable=too-many-statements @@ -173,26 +173,39 @@ def on_response( if self._should_log(duration=duration, status_code=status_code, sub_status_code=sub_status_code, verb=verb, http_version=http_version_obj, database_name=database_name, collection_name=collection_name, resource_type=resource_type, is_request=False): + ThinClientProxyResourceType = "x-ms-thinclient-proxy-resource-type" if not self.__request_already_logged: - self._log_client_settings() - self._log_database_account_settings() - super().on_request(request) + # self._log_client_settings() + # self._log_database_account_settings() + # super().on_request(request) + self.__request_already_logged = True else: self.__request_already_logged = False - super().on_response(request, response) + # super().on_response(request, response) if self._enable_diagnostics_logging: http_response = response.http_response options = response.context.options logger = request.context.setdefault("logger", options.pop("logger", self.logger)) try: - if "start_time" in request.context: - logger.info("Elapsed time in seconds: {}".format(duration)) - else: - logger.info("Elapsed time in seconds: unknown") + logger.info("Response status code: {}".format(http_response.status_code)) + logger.info("Response URL: {}".format(request.http_request.url)) + # Thin Client headers + ThinClientProxyOperationType = "x-ms-thinclient-proxy-operation-type" + ThinClientProxyResourceType = "x-ms-thinclient-proxy-resource-type" + logger.info("Operation type: {}".format( + request.http_request.headers[ThinClientProxyOperationType])) + logger.info("Resource type: {}".format( + request.http_request.headers[ThinClientProxyResourceType])) + # if "start_time" in request.context: + # logger.info("Elapsed time in seconds: {}".format(duration)) + # else: + # logger.info("Elapsed time in seconds: unknown") if http_response.status_code >= 400: - logger.info("Response error message: %r", _format_error(http_response.text())) + logger.info("Response error message: %r", + _format_error(http_response.text())) except Exception as err: # pylint: disable=broad-except - logger.warning("Failed to log request: %s", repr(err)) # pylint: disable=do-not-log-exceptions + logger.warning("Failed to log request: %s", + repr(err)) # pylint: disable=do-not-log-exceptions # pylint: disable=unused-argument def _default_should_log( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index f6e2d5cb5e48..79271fce72f2 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -90,11 +90,13 @@ def get_endpoints_by_location(new_locations, parsed_locations.append(new_location["name"]) if new_location["name"] in old_endpoints_by_location: regional_object = old_endpoints_by_location[new_location["name"]] + logger.info("In location cache: Existing regional object: %s", str(regional_object)) current = regional_object.get_current() # swap the previous with current and current with new region_uri received from the gateway if current != region_uri: regional_object.set_previous(current) regional_object.set_current(region_uri) + logger.info("In location cache: Updated regional object: %s", str(regional_object)) # This is the bootstrapping condition else: regional_object = RegionalEndpoint(region_uri, region_uri) @@ -102,6 +104,8 @@ def get_endpoints_by_location(new_locations, if writes and not use_multiple_write_locations: regional_object = RegionalEndpoint(region_uri, default_regional_endpoint.get_current()) + logger.info("In location cache: This is regional object on initialization: %s", str(regional_object)) + # pass in object with region uri , last known good, curr etc endpoints_by_location.update({new_location["name"]: regional_object}) except Exception as e: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index cbd38a2f86b8..30b74ab62f59 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -23,6 +23,7 @@ """Document client class for the Azure Cosmos database service. """ +import logging import os from urllib.parse import urlparse import uuid @@ -81,6 +82,7 @@ PartitionKeyType = Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]] # pylint: disable=line-too-long +logger = logging.getLogger("azure.cosmos.CosmosClientConnectionAsync") class CredentialDict(TypedDict, total=False): masterKey: str @@ -444,6 +446,8 @@ async def GetDatabaseAccount( self.UseMultipleWriteLocations = ( self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations ) + logger.info("Database account - writable locations: %s", database_account.WritableLocations) + logger.info("Database account - readable locations: %s", database_account.ReadableLocations) return database_account async def CreateDatabase( From 8300626e3629d7092dadc5098792ed024c3c2be3 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Sat, 1 Feb 2025 21:09:53 -0800 Subject: [PATCH 02/17] Added timestamp logging changes --- .../cosmos/_cosmos_http_logging_policy.py | 11 ++++++---- .../azure/cosmos/_location_cache.py | 20 ++++++++++++++----- .../cosmos/_service_request_retry_policy.py | 10 ++++++++-- .../aio/_cosmos_client_connection_async.py | 9 +++++++-- 4 files changed, 37 insertions(+), 13 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py index 656ca95a1b73..385c9f299f0a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py @@ -26,6 +26,7 @@ import json import logging import time +from datetime import datetime from typing import Optional, Union, Dict, Any, TYPE_CHECKING, Callable, Mapping import types @@ -187,14 +188,16 @@ def on_response( options = response.context.options logger = request.context.setdefault("logger", options.pop("logger", self.logger)) try: - logger.info("Response status code: {}".format(http_response.status_code)) - logger.info("Response URL: {}".format(request.http_request.url)) + logger.info("{} - Response status code: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + http_response.status_code)) + logger.info("{} - Response URL: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + request.http_request.url)) # Thin Client headers ThinClientProxyOperationType = "x-ms-thinclient-proxy-operation-type" ThinClientProxyResourceType = "x-ms-thinclient-proxy-resource-type" - logger.info("Operation type: {}".format( + logger.info("{} - Operation type: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), request.http_request.headers[ThinClientProxyOperationType])) - logger.info("Resource type: {}".format( + logger.info("{} - Resource type: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), request.http_request.headers[ThinClientProxyResourceType])) # if "start_time" in request.context: # logger.info("Elapsed time in seconds: {}".format(duration)) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index 79271fce72f2..8327dfb3ba4e 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -25,6 +25,7 @@ import collections import logging import time +from datetime import datetime from . import documents from . import http_constants @@ -90,13 +91,16 @@ def get_endpoints_by_location(new_locations, parsed_locations.append(new_location["name"]) if new_location["name"] in old_endpoints_by_location: regional_object = old_endpoints_by_location[new_location["name"]] - logger.info("In location cache: Existing regional object: %s", str(regional_object)) + logger.info("%s - In location cache: Existing regional object: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), str(regional_object)) current = regional_object.get_current() # swap the previous with current and current with new region_uri received from the gateway if current != region_uri: regional_object.set_previous(current) regional_object.set_current(region_uri) - logger.info("In location cache: Updated regional object: %s", str(regional_object)) + logger.info("%s - In location cache: Updated regional object: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + str(regional_object)) # This is the bootstrapping condition else: regional_object = RegionalEndpoint(region_uri, region_uri) @@ -104,7 +108,9 @@ def get_endpoints_by_location(new_locations, if writes and not use_multiple_write_locations: regional_object = RegionalEndpoint(region_uri, default_regional_endpoint.get_current()) - logger.info("In location cache: This is regional object on initialization: %s", str(regional_object)) + logger.info("%s - In location cache: This is regional object on initialization: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + str(regional_object)) # pass in object with region uri , last known good, curr etc endpoints_by_location.update({new_location["name"]: regional_object}) @@ -193,9 +199,13 @@ def swap_regional_endpoint_values(self, request): ) regional_endpoint = regional_endpoints[location_index % len(regional_endpoints)] if request.location_endpoint_to_route == regional_endpoint.get_current(): - logger.warning("Swapping regional endpoint values: %s", str(regional_endpoint)) + logger.warning("%s - Swapping regional endpoint values: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + str(regional_endpoint)) regional_endpoint.swap() - logger.warning("Swapped regional endpoint values: %s", str(regional_endpoint)) + logger.warning("%s - Swapped regional endpoint values: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + str(regional_endpoint)) def resolve_service_endpoint(self, request): if request.location_endpoint_to_route: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py index 735591549efe..73b84c07fde4 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py @@ -8,6 +8,8 @@ """ import logging +from datetime import datetime + from azure.cosmos.documents import _OperationType from azure.cosmos.http_constants import ResourceType @@ -114,10 +116,14 @@ def resolve_next_region_service_endpoint(self): def mark_endpoint_unavailable(self, unavailable_endpoint, refresh_cache: bool): if _OperationType.IsReadOnlyOperation(self.request.operation_type): - self.logger.warning("Marking %s unavailable for read", unavailable_endpoint) + self.logger.warning("%s - Marking %s unavailable for read", + datetime.now().strftime("%Y%m%d-%H%M%S"), + unavailable_endpoint) self.global_endpoint_manager.mark_endpoint_unavailable_for_read(unavailable_endpoint, True) else: - self.logger.warning("Marking %s unavailable for write", unavailable_endpoint) + self.logger.warning("%s - Marking %s unavailable for write", + datetime.now().strftime("%Y%m%d-%H%M%S"), + unavailable_endpoint) self.global_endpoint_manager.mark_endpoint_unavailable_for_write(unavailable_endpoint, refresh_cache) def update_location_cache(self): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 30b74ab62f59..63242845a527 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -25,6 +25,7 @@ """ import logging import os +from datetime import datetime from urllib.parse import urlparse import uuid from typing import ( @@ -446,8 +447,12 @@ async def GetDatabaseAccount( self.UseMultipleWriteLocations = ( self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations ) - logger.info("Database account - writable locations: %s", database_account.WritableLocations) - logger.info("Database account - readable locations: %s", database_account.ReadableLocations) + logger.info("%s - Database account - writable locations: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + database_account.WritableLocations) + logger.info("%s - Database account - readable locations: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + database_account.ReadableLocations) return database_account async def CreateDatabase( From f48cf3d45e6c1e7498f6465d9c594affd2f5fd2f Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Sat, 1 Feb 2025 21:18:53 -0800 Subject: [PATCH 03/17] Added error logging --- .../azure/cosmos/aio/_retry_utility_async.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py index 3628e1ea1878..3124faacf9f3 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py @@ -23,7 +23,9 @@ """ import asyncio # pylint: disable=do-not-import-asyncio import json +import logging import time +from datetime import datetime from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError, ServiceResponseError from azure.core.pipeline.policies import AsyncRetryPolicy @@ -41,6 +43,7 @@ _handle_service_response_retries, _handle_service_request_retries) from ..http_constants import HttpHeaders, StatusCodes, SubStatusCodes +logger = logging.getLogger("azure.cosmos.RetryUtilityAsync") # pylint: disable=protected-access, disable=too-many-lines, disable=too-many-statements, disable=too-many-branches @@ -261,6 +264,8 @@ async def send(self, request): raise except ServiceRequestError as err: retry_error = err + logger.warning("{} - Received ServiceRequestError {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + str(retry_error))) # the request ran into a socket timeout or failed to establish a new connection # since request wasn't sent, raise exception immediately to be dealt with in client retry policies if retry_settings['connect'] > 0: @@ -271,6 +276,8 @@ async def send(self, request): raise err except ServiceResponseError as err: retry_error = err + logger.warning("{} - Received ServiceResponseError {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + str(retry_error))) # Since this is ClientConnectionError, it is safe to be retried on both read and write requests try: from aiohttp.client_exceptions import ( @@ -288,6 +295,8 @@ async def send(self, request): raise err except AzureError as err: retry_error = err + logger.warning("{} - Received AzureError {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + str(retry_error))) if self._is_method_retryable(retry_settings, request.http_request): retry_active = self.increment(retry_settings, response=request, error=err) if retry_active: From 325a9f9d4146f5ec4f2195620aeeb63b5faee349 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Sun, 2 Feb 2025 09:33:47 -0800 Subject: [PATCH 04/17] Updated default connection timeout to 5 seconds --- sdk/cosmos/azure-cosmos/azure/cosmos/documents.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py b/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py index 1af278596804..7e1ec72ad2b9 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py @@ -329,7 +329,7 @@ class ConnectionPolicy: # pylint: disable=too-many-instance-attributes Indicates whether service should be instructed to skip sending response payloads """ - __defaultRequestTimeout: int = 60 # seconds + __defaultRequestTimeout: int = 5 # seconds __defaultMaxBackoff: int = 1 # seconds def __init__(self) -> None: From dde37ee7e6beae6fc7e2cd21c5dd9a318934c036 Mon Sep 17 00:00:00 2001 From: Tomas Varon Saldarriaga Date: Sun, 2 Feb 2025 16:35:11 -0800 Subject: [PATCH 05/17] Add write files --- .../azure-cosmos/test/writes_concurrent.py | 149 ++++++++++++++++++ .../azure-cosmos/test/writes_concurrent1.py | 149 ++++++++++++++++++ .../azure-cosmos/test/writes_concurrent2.py | 149 ++++++++++++++++++ 3 files changed, 447 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/test/writes_concurrent.py create mode 100644 sdk/cosmos/azure-cosmos/test/writes_concurrent1.py create mode 100644 sdk/cosmos/azure-cosmos/test/writes_concurrent2.py diff --git a/sdk/cosmos/azure-cosmos/test/writes_concurrent.py b/sdk/cosmos/azure-cosmos/test/writes_concurrent.py new file mode 100644 index 000000000000..64049a0dfd70 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/writes_concurrent.py @@ -0,0 +1,149 @@ +import random +import sys +import os + +from aiohttp.hdrs import USER_AGENT + +sys.path.append(r"") + +import threading +import uuid +from azure.cosmos import CosmosClient, PartitionKey, exceptions, ThroughputProperties +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Create a logger for the "azure" SDK +# logger = logging.getLogger("azure") +# logger.setLevel(logging.DEBUG) +# handler = logging.StreamHandler(stream=sys.stdout) +# logger.addHandler(handler) + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2', 'North Central US'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + +async def write_item_concurrently_initial(container, num_upserts, initial): + tasks = [] + for i in range(initial, initial + num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(i), "pk": "pk-" + str(i)})) + await asyncio.gather(*tasks) + +async def write_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) + await asyncio.gather(*tasks) + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +def get_upsert_random_item(): + random_int = random.randint(1, 1000000000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_upsert_random_item())) + await asyncio.gather(*tasks) + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + print(item["id"], item["pk"]) + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def multi_region(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing_2.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Tomas") as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 100) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 100) # Number of concurrent reads + except Exception as e: + raise e + + +async def create_items(client_id): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas" + str(client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + try: + while True: + await write_item_concurrently(cont, 4) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + +async def create_items_initial(): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas-") as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + try: + for i in range (0, 10000, 1000): + await write_item_concurrently_initial(cont, 1000, i) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + +async def create_item(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, user_agent="-Concurrent-Write-Tomas") as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + try: + # await cont.create_item({"id": "Simon-4082", "pk": "pk-4082"}) + await cont.read_item("Simon-4082", "pk-4082") + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(create_items(0)) + # asyncio.run(create_items_initial()) \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/test/writes_concurrent1.py b/sdk/cosmos/azure-cosmos/test/writes_concurrent1.py new file mode 100644 index 000000000000..67d9790c7534 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/writes_concurrent1.py @@ -0,0 +1,149 @@ +import random +import sys +import os + +from aiohttp.hdrs import USER_AGENT + +sys.path.append(r"") + +import threading +import uuid +from azure.cosmos import CosmosClient, PartitionKey, exceptions, ThroughputProperties +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Create a logger for the "azure" SDK +# logger = logging.getLogger("azure") +# logger.setLevel(logging.DEBUG) +# handler = logging.StreamHandler(stream=sys.stdout) +# logger.addHandler(handler) + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2', 'North Central US'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + +async def write_item_concurrently_initial(container, num_upserts, initial): + tasks = [] + for i in range(initial, initial + num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(i), "pk": "pk-" + str(i)})) + await asyncio.gather(*tasks) + +async def write_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) + await asyncio.gather(*tasks) + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +def get_upsert_random_item(): + random_int = random.randint(1, 1000000000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_upsert_random_item())) + await asyncio.gather(*tasks) + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + print(item["id"], item["pk"]) + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def multi_region(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing_2.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Tomas") as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 100) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 100) # Number of concurrent reads + except Exception as e: + raise e + + +async def create_items(client_id): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas" + str(client_id)+ str(client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + try: + while True: + await write_item_concurrently(cont, 4) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + +async def create_items_initial(): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas-") as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + try: + for i in range (0, 10000, 1000): + await write_item_concurrently_initial(cont, 1000, i) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + +async def create_item(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, user_agent="-Concurrent-Write-Tomas") as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + try: + # await cont.create_item({"id": "Simon-4082", "pk": "pk-4082"}) + await cont.read_item("Simon-4082", "pk-4082") + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(create_items(1)) + # asyncio.run(create_items_initial()) \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/test/writes_concurrent2.py b/sdk/cosmos/azure-cosmos/test/writes_concurrent2.py new file mode 100644 index 000000000000..ea7dcb19b073 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/writes_concurrent2.py @@ -0,0 +1,149 @@ +import random +import sys +import os + +from aiohttp.hdrs import USER_AGENT + +sys.path.append(r"") + +import threading +import uuid +from azure.cosmos import CosmosClient, PartitionKey, exceptions, ThroughputProperties +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Create a logger for the "azure" SDK +# logger = logging.getLogger("azure") +# logger.setLevel(logging.DEBUG) +# handler = logging.StreamHandler(stream=sys.stdout) +# logger.addHandler(handler) + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2', 'North Central US'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + +async def write_item_concurrently_initial(container, num_upserts, initial): + tasks = [] + for i in range(initial, initial + num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(i), "pk": "pk-" + str(i)})) + await asyncio.gather(*tasks) + +async def write_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) + await asyncio.gather(*tasks) + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +def get_upsert_random_item(): + random_int = random.randint(1, 1000000000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_upsert_random_item())) + await asyncio.gather(*tasks) + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + print(item["id"], item["pk"]) + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def multi_region(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing_2.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Tomas") as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 100) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 100) # Number of concurrent reads + except Exception as e: + raise e + + +async def create_items(client_id): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas" + str(client_id)+ str(client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + try: + while True: + await write_item_concurrently(cont, 4) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + +async def create_items_initial(): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas-") as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + try: + for i in range (0, 10000, 1000): + await write_item_concurrently_initial(cont, 1000, i) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + +async def create_item(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, user_agent="-Concurrent-Write-Tomas") as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + time.sleep(1) + + try: + # await cont.create_item({"id": "Simon-4082", "pk": "pk-4082"}) + await cont.read_item("Simon-4082", "pk-4082") + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(create_items(2)) + # asyncio.run(create_items_initial()) \ No newline at end of file From 933d1b43fe44c9f4bc5b092d14a6b6f4263c7ae3 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Sun, 2 Feb 2025 17:29:48 -0800 Subject: [PATCH 06/17] Added sub status code logging --- .../azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py index 385c9f299f0a..4d698b57cbdd 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py @@ -188,8 +188,8 @@ def on_response( options = response.context.options logger = request.context.setdefault("logger", options.pop("logger", self.logger)) try: - logger.info("{} - Response status code: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), - http_response.status_code)) + logger.info("{} - Response status code: {}, sub status code: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + http_response.status_code, http_response.headers["x-ms-substatus"])) logger.info("{} - Response URL: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), request.http_request.url)) # Thin Client headers From 675c659762adedae5b07db9c7cbbc80c7c5118fb Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Sun, 2 Feb 2025 17:31:18 -0800 Subject: [PATCH 07/17] Added demo.py for stress testing --- sdk/cosmos/azure-cosmos/test/demo.py | 100 +++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/test/demo.py diff --git a/sdk/cosmos/azure-cosmos/test/demo.py b/sdk/cosmos/azure-cosmos/test/demo.py new file mode 100644 index 000000000000..5fe9d4c274a8 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo.py @@ -0,0 +1,100 @@ +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(2)) + # asyncio.run(create_item()) From e089117ade5627b0017be9b4b1d9dbff15575fca Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Sun, 2 Feb 2025 17:35:13 -0800 Subject: [PATCH 08/17] Added more tests for stress testing --- sdk/cosmos/azure-cosmos/test/demo-1.py | 100 ++++++++++++++++++ sdk/cosmos/azure-cosmos/test/demo-2.py | 100 ++++++++++++++++++ sdk/cosmos/azure-cosmos/test/demo-3.py | 100 ++++++++++++++++++ sdk/cosmos/azure-cosmos/test/demo-4.py | 100 ++++++++++++++++++ sdk/cosmos/azure-cosmos/test/demo-reads-1.py | 100 ++++++++++++++++++ sdk/cosmos/azure-cosmos/test/demo-reads-2.py | 100 ++++++++++++++++++ sdk/cosmos/azure-cosmos/test/demo-reads.py | 100 ++++++++++++++++++ .../azure-cosmos/test/writes_concurrent.py | 5 +- .../azure-cosmos/test/writes_concurrent1.py | 5 +- .../azure-cosmos/test/writes_concurrent2.py | 5 +- 10 files changed, 709 insertions(+), 6 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/test/demo-1.py create mode 100644 sdk/cosmos/azure-cosmos/test/demo-2.py create mode 100644 sdk/cosmos/azure-cosmos/test/demo-3.py create mode 100644 sdk/cosmos/azure-cosmos/test/demo-4.py create mode 100644 sdk/cosmos/azure-cosmos/test/demo-reads-1.py create mode 100644 sdk/cosmos/azure-cosmos/test/demo-reads-2.py create mode 100644 sdk/cosmos/azure-cosmos/test/demo-reads.py diff --git a/sdk/cosmos/azure-cosmos/test/demo-1.py b/sdk/cosmos/azure-cosmos/test/demo-1.py new file mode 100644 index 000000000000..5fe9d4c274a8 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-1.py @@ -0,0 +1,100 @@ +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(2)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-2.py b/sdk/cosmos/azure-cosmos/test/demo-2.py new file mode 100644 index 000000000000..5fe9d4c274a8 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-2.py @@ -0,0 +1,100 @@ +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(2)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-3.py b/sdk/cosmos/azure-cosmos/test/demo-3.py new file mode 100644 index 000000000000..5fe9d4c274a8 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-3.py @@ -0,0 +1,100 @@ +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(2)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-4.py b/sdk/cosmos/azure-cosmos/test/demo-4.py new file mode 100644 index 000000000000..5fe9d4c274a8 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-4.py @@ -0,0 +1,100 @@ +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(2)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-reads-1.py b/sdk/cosmos/azure-cosmos/test/demo-reads-1.py new file mode 100644 index 000000000000..13315176f9ac --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-reads-1.py @@ -0,0 +1,100 @@ +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-reads-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(2)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-reads-2.py b/sdk/cosmos/azure-cosmos/test/demo-reads-2.py new file mode 100644 index 000000000000..13315176f9ac --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-reads-2.py @@ -0,0 +1,100 @@ +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-reads-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(2)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-reads.py b/sdk/cosmos/azure-cosmos/test/demo-reads.py new file mode 100644 index 000000000000..13315176f9ac --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-reads.py @@ -0,0 +1,100 @@ +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-reads-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(2)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/writes_concurrent.py b/sdk/cosmos/azure-cosmos/test/writes_concurrent.py index 64049a0dfd70..a7aaa66bcc6e 100644 --- a/sdk/cosmos/azure-cosmos/test/writes_concurrent.py +++ b/sdk/cosmos/azure-cosmos/test/writes_concurrent.py @@ -25,7 +25,7 @@ # logger.addHandler(handler) # Replace with your Cosmos DB details -preferred_regions_2 = ['East US 2', 'North Central US'] +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] COSMOS_URI2 = "" COSMOS_KEY2 = "" @@ -127,7 +127,8 @@ async def create_item(): logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, - enable_diagnostics_logging=True, logger=logger, user_agent="-Concurrent-Write-Tomas") as client: + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) diff --git a/sdk/cosmos/azure-cosmos/test/writes_concurrent1.py b/sdk/cosmos/azure-cosmos/test/writes_concurrent1.py index 67d9790c7534..ef32cf05a97d 100644 --- a/sdk/cosmos/azure-cosmos/test/writes_concurrent1.py +++ b/sdk/cosmos/azure-cosmos/test/writes_concurrent1.py @@ -25,7 +25,7 @@ # logger.addHandler(handler) # Replace with your Cosmos DB details -preferred_regions_2 = ['East US 2', 'North Central US'] +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] COSMOS_URI2 = "" COSMOS_KEY2 = "" @@ -108,7 +108,8 @@ async def create_items(client_id): async def create_items_initial(): async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, - enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas-") as client: + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) diff --git a/sdk/cosmos/azure-cosmos/test/writes_concurrent2.py b/sdk/cosmos/azure-cosmos/test/writes_concurrent2.py index ea7dcb19b073..bd781c92ac1f 100644 --- a/sdk/cosmos/azure-cosmos/test/writes_concurrent2.py +++ b/sdk/cosmos/azure-cosmos/test/writes_concurrent2.py @@ -25,7 +25,7 @@ # logger.addHandler(handler) # Replace with your Cosmos DB details -preferred_regions_2 = ['East US 2', 'North Central US'] +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] COSMOS_URI2 = "" COSMOS_KEY2 = "" @@ -127,7 +127,8 @@ async def create_item(): logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, - enable_diagnostics_logging=True, logger=logger, user_agent="-Concurrent-Write-Tomas") as client: + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) From 7630f387b09a527f5389bb716f34ee60cb734e48 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Sun, 2 Feb 2025 17:48:01 -0800 Subject: [PATCH 09/17] Removed substatus code --- .../azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py index 4d698b57cbdd..385c9f299f0a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py @@ -188,8 +188,8 @@ def on_response( options = response.context.options logger = request.context.setdefault("logger", options.pop("logger", self.logger)) try: - logger.info("{} - Response status code: {}, sub status code: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), - http_response.status_code, http_response.headers["x-ms-substatus"])) + logger.info("{} - Response status code: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + http_response.status_code)) logger.info("{} - Response URL: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), request.http_request.url)) # Thin Client headers From e592b1126b84bdeb95a5315be0b29bebd6e0d211 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 3 Feb 2025 15:55:33 -0800 Subject: [PATCH 10/17] Improved logging --- sdk/cosmos/azure-cosmos/test/demo-1.py | 6 ++- sdk/cosmos/azure-cosmos/test/demo-2.py | 6 ++- sdk/cosmos/azure-cosmos/test/demo-3.py | 6 ++- sdk/cosmos/azure-cosmos/test/demo-4.py | 6 ++- .../azure-cosmos/test/{demo.py => demo-5.py} | 8 ++-- sdk/cosmos/azure-cosmos/test/demo-reads-1.py | 6 ++- sdk/cosmos/azure-cosmos/test/demo-reads-2.py | 6 ++- .../test/{demo-reads.py => demo-reads-3.py} | 6 ++- ...writes_concurrent2.py => demo-writes-1.py} | 44 ++++++++++--------- ...writes_concurrent1.py => demo-writes-2.py} | 44 ++++++++++--------- ...{writes_concurrent.py => demo-writes-3.py} | 40 +++++++++-------- 11 files changed, 103 insertions(+), 75 deletions(-) rename sdk/cosmos/azure-cosmos/test/{demo.py => demo-5.py} (92%) rename sdk/cosmos/azure-cosmos/test/{demo-reads.py => demo-reads-3.py} (93%) rename sdk/cosmos/azure-cosmos/test/{writes_concurrent2.py => demo-writes-1.py} (82%) rename sdk/cosmos/azure-cosmos/test/{writes_concurrent1.py => demo-writes-2.py} (82%) rename sdk/cosmos/azure-cosmos/test/{writes_concurrent.py => demo-writes-3.py} (84%) diff --git a/sdk/cosmos/azure-cosmos/test/demo-1.py b/sdk/cosmos/azure-cosmos/test/demo-1.py index 5fe9d4c274a8..aeb664dc5de2 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-1.py +++ b/sdk/cosmos/azure-cosmos/test/demo-1.py @@ -1,3 +1,4 @@ +import os import random import sys @@ -92,9 +93,10 @@ async def multi_region(client_id): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(multi_region(2)) + asyncio.run(multi_region(file_name)) # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-2.py b/sdk/cosmos/azure-cosmos/test/demo-2.py index 5fe9d4c274a8..aeb664dc5de2 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-2.py +++ b/sdk/cosmos/azure-cosmos/test/demo-2.py @@ -1,3 +1,4 @@ +import os import random import sys @@ -92,9 +93,10 @@ async def multi_region(client_id): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(multi_region(2)) + asyncio.run(multi_region(file_name)) # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-3.py b/sdk/cosmos/azure-cosmos/test/demo-3.py index 5fe9d4c274a8..aeb664dc5de2 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-3.py +++ b/sdk/cosmos/azure-cosmos/test/demo-3.py @@ -1,3 +1,4 @@ +import os import random import sys @@ -92,9 +93,10 @@ async def multi_region(client_id): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(multi_region(2)) + asyncio.run(multi_region(file_name)) # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-4.py b/sdk/cosmos/azure-cosmos/test/demo-4.py index 5fe9d4c274a8..aeb664dc5de2 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-4.py +++ b/sdk/cosmos/azure-cosmos/test/demo-4.py @@ -1,3 +1,4 @@ +import os import random import sys @@ -92,9 +93,10 @@ async def multi_region(client_id): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(multi_region(2)) + asyncio.run(multi_region(file_name)) # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo.py b/sdk/cosmos/azure-cosmos/test/demo-5.py similarity index 92% rename from sdk/cosmos/azure-cosmos/test/demo.py rename to sdk/cosmos/azure-cosmos/test/demo-5.py index 5fe9d4c274a8..e1d949af78ea 100644 --- a/sdk/cosmos/azure-cosmos/test/demo.py +++ b/sdk/cosmos/azure-cosmos/test/demo-5.py @@ -1,7 +1,8 @@ +import os.path import random import sys -sys.path.append(r"") +sys.path.append(r"") from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio @@ -92,9 +93,10 @@ async def multi_region(client_id): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(multi_region(2)) + asyncio.run(multi_region(file_name)) # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-reads-1.py b/sdk/cosmos/azure-cosmos/test/demo-reads-1.py index 13315176f9ac..34187f7370a9 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-reads-1.py +++ b/sdk/cosmos/azure-cosmos/test/demo-reads-1.py @@ -1,3 +1,4 @@ +import os import random import sys @@ -92,9 +93,10 @@ async def multi_region(client_id): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(multi_region(2)) + asyncio.run(multi_region(file_name)) # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-reads-2.py b/sdk/cosmos/azure-cosmos/test/demo-reads-2.py index 13315176f9ac..34187f7370a9 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-reads-2.py +++ b/sdk/cosmos/azure-cosmos/test/demo-reads-2.py @@ -1,3 +1,4 @@ +import os import random import sys @@ -92,9 +93,10 @@ async def multi_region(client_id): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(multi_region(2)) + asyncio.run(multi_region(file_name)) # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-reads.py b/sdk/cosmos/azure-cosmos/test/demo-reads-3.py similarity index 93% rename from sdk/cosmos/azure-cosmos/test/demo-reads.py rename to sdk/cosmos/azure-cosmos/test/demo-reads-3.py index 13315176f9ac..34187f7370a9 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-reads.py +++ b/sdk/cosmos/azure-cosmos/test/demo-reads-3.py @@ -1,3 +1,4 @@ +import os import random import sys @@ -92,9 +93,10 @@ async def multi_region(client_id): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(multi_region(2)) + asyncio.run(multi_region(file_name)) # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/writes_concurrent2.py b/sdk/cosmos/azure-cosmos/test/demo-writes-1.py similarity index 82% rename from sdk/cosmos/azure-cosmos/test/writes_concurrent2.py rename to sdk/cosmos/azure-cosmos/test/demo-writes-1.py index bd781c92ac1f..8e57f695d92e 100644 --- a/sdk/cosmos/azure-cosmos/test/writes_concurrent2.py +++ b/sdk/cosmos/azure-cosmos/test/demo-writes-1.py @@ -1,14 +1,11 @@ +import os import random import sys -import os - -from aiohttp.hdrs import USER_AGENT sys.path.append(r"") -import threading import uuid -from azure.cosmos import CosmosClient, PartitionKey, exceptions, ThroughputProperties +from azure.cosmos import PartitionKey from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio @@ -29,18 +26,22 @@ COSMOS_URI2 = "" COSMOS_KEY2 = "" + async def write_item_concurrently_initial(container, num_upserts, initial): tasks = [] for i in range(initial, initial + num_upserts): tasks.append(container.upsert_item({"id": "Simon-" + str(i), "pk": "pk-" + str(i)})) await asyncio.gather(*tasks) + async def write_item_concurrently(container, num_upserts): tasks = [] for _ in range(num_upserts): - tasks.append(container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) + tasks.append( + container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) await asyncio.gather(*tasks) + def get_random_item(): random_int = random.randint(1, 10000) return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} @@ -50,12 +51,14 @@ def get_upsert_random_item(): random_int = random.randint(1, 1000000000) return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + async def upsert_item_concurrently(container, num_upserts): tasks = [] for _ in range(num_upserts): tasks.append(container.upsert_item(get_upsert_random_item())) await asyncio.gather(*tasks) + async def read_item_concurrently(container, num_upserts): tasks = [] for _ in range(num_upserts): @@ -84,17 +87,14 @@ async def multi_region(): time.sleep(1) await read_item_concurrently(cont, 100) # Number of concurrent reads except Exception as e: - raise e + raise e async def create_items(client_id): - logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing.txt') - logger.setLevel(logging.DEBUG) - logger.addHandler(file_handler) - logger.addHandler(logging.StreamHandler(sys.stdout)) async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, - enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas" + str(client_id)+ str(client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-Tomas" + str(client_id) + str( + client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) @@ -106,29 +106,32 @@ async def create_items(client_id): except Exception as e: raise e + async def create_items_initial(): async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, - enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas-") as client: + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) try: - for i in range (0, 10000, 1000): + for i in range(0, 10000, 1000): await write_item_concurrently_initial(cont, 1000, i) # Number of concurrent upserts time.sleep(1) except Exception as e: raise e + async def create_item(): logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing.txt') + file_handler = logging.FileHandler('create_item_testing.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, enable_diagnostics_logging=True, logger=logger, - user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + user_agent="-Concurrent-Write-Tomas") as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) @@ -142,9 +145,10 @@ async def create_item(): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(create_items(2)) - # asyncio.run(create_items_initial()) \ No newline at end of file + asyncio.run(create_items(file_name)) + # asyncio.run(create_items_initial()) diff --git a/sdk/cosmos/azure-cosmos/test/writes_concurrent1.py b/sdk/cosmos/azure-cosmos/test/demo-writes-2.py similarity index 82% rename from sdk/cosmos/azure-cosmos/test/writes_concurrent1.py rename to sdk/cosmos/azure-cosmos/test/demo-writes-2.py index ef32cf05a97d..3cf992b6f9e8 100644 --- a/sdk/cosmos/azure-cosmos/test/writes_concurrent1.py +++ b/sdk/cosmos/azure-cosmos/test/demo-writes-2.py @@ -1,14 +1,11 @@ +import os import random import sys -import os - -from aiohttp.hdrs import USER_AGENT sys.path.append(r"") -import threading import uuid -from azure.cosmos import CosmosClient, PartitionKey, exceptions, ThroughputProperties +from azure.cosmos import PartitionKey from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio @@ -29,18 +26,22 @@ COSMOS_URI2 = "" COSMOS_KEY2 = "" + async def write_item_concurrently_initial(container, num_upserts, initial): tasks = [] for i in range(initial, initial + num_upserts): tasks.append(container.upsert_item({"id": "Simon-" + str(i), "pk": "pk-" + str(i)})) await asyncio.gather(*tasks) + async def write_item_concurrently(container, num_upserts): tasks = [] for _ in range(num_upserts): - tasks.append(container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) + tasks.append( + container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) await asyncio.gather(*tasks) + def get_random_item(): random_int = random.randint(1, 10000) return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} @@ -50,12 +51,14 @@ def get_upsert_random_item(): random_int = random.randint(1, 1000000000) return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + async def upsert_item_concurrently(container, num_upserts): tasks = [] for _ in range(num_upserts): tasks.append(container.upsert_item(get_upsert_random_item())) await asyncio.gather(*tasks) + async def read_item_concurrently(container, num_upserts): tasks = [] for _ in range(num_upserts): @@ -84,17 +87,14 @@ async def multi_region(): time.sleep(1) await read_item_concurrently(cont, 100) # Number of concurrent reads except Exception as e: - raise e + raise e async def create_items(client_id): - logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing.txt') - logger.setLevel(logging.DEBUG) - logger.addHandler(file_handler) - logger.addHandler(logging.StreamHandler(sys.stdout)) async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, - enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas" + str(client_id)+ str(client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-Tomas" + str(client_id) + str( + client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) @@ -106,29 +106,32 @@ async def create_items(client_id): except Exception as e: raise e + async def create_items_initial(): async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, enable_diagnostics_logging=True, logger=logger, - user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + user_agent="Concurrent-Write-Tomas-") as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) try: - for i in range (0, 10000, 1000): + for i in range(0, 10000, 1000): await write_item_concurrently_initial(cont, 1000, i) # Number of concurrent upserts time.sleep(1) except Exception as e: raise e + async def create_item(): logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing.txt') + file_handler = logging.FileHandler('create_item_testing.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, - enable_diagnostics_logging=True, logger=logger, user_agent="-Concurrent-Write-Tomas") as client: + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) @@ -142,9 +145,10 @@ async def create_item(): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(create_items(1)) - # asyncio.run(create_items_initial()) \ No newline at end of file + asyncio.run(create_items(file_name)) + # asyncio.run(create_items_initial()) diff --git a/sdk/cosmos/azure-cosmos/test/writes_concurrent.py b/sdk/cosmos/azure-cosmos/test/demo-writes-3.py similarity index 84% rename from sdk/cosmos/azure-cosmos/test/writes_concurrent.py rename to sdk/cosmos/azure-cosmos/test/demo-writes-3.py index a7aaa66bcc6e..eae8a26bfd4c 100644 --- a/sdk/cosmos/azure-cosmos/test/writes_concurrent.py +++ b/sdk/cosmos/azure-cosmos/test/demo-writes-3.py @@ -1,14 +1,11 @@ +import os import random import sys -import os - -from aiohttp.hdrs import USER_AGENT sys.path.append(r"") -import threading import uuid -from azure.cosmos import CosmosClient, PartitionKey, exceptions, ThroughputProperties +from azure.cosmos import PartitionKey from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio @@ -29,18 +26,22 @@ COSMOS_URI2 = "" COSMOS_KEY2 = "" + async def write_item_concurrently_initial(container, num_upserts, initial): tasks = [] for i in range(initial, initial + num_upserts): tasks.append(container.upsert_item({"id": "Simon-" + str(i), "pk": "pk-" + str(i)})) await asyncio.gather(*tasks) + async def write_item_concurrently(container, num_upserts): tasks = [] for _ in range(num_upserts): - tasks.append(container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) + tasks.append( + container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) await asyncio.gather(*tasks) + def get_random_item(): random_int = random.randint(1, 10000) return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} @@ -50,12 +51,14 @@ def get_upsert_random_item(): random_int = random.randint(1, 1000000000) return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + async def upsert_item_concurrently(container, num_upserts): tasks = [] for _ in range(num_upserts): tasks.append(container.upsert_item(get_upsert_random_item())) await asyncio.gather(*tasks) + async def read_item_concurrently(container, num_upserts): tasks = [] for _ in range(num_upserts): @@ -84,7 +87,7 @@ async def multi_region(): time.sleep(1) await read_item_concurrently(cont, 100) # Number of concurrent reads except Exception as e: - raise e + raise e async def create_items(client_id): @@ -94,7 +97,9 @@ async def create_items(client_id): logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, - enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas" + str(client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-Tomas" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) @@ -106,26 +111,24 @@ async def create_items(client_id): except Exception as e: raise e + async def create_items_initial(): async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, - enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas-") as client: + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-Tomas-") as client: db = await client.create_database_if_not_exists("SimonDB") cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) time.sleep(1) try: - for i in range (0, 10000, 1000): + for i in range(0, 10000, 1000): await write_item_concurrently_initial(cont, 1000, i) # Number of concurrent upserts time.sleep(1) except Exception as e: raise e + async def create_item(): - logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing.txt') - logger.setLevel(logging.DEBUG) - logger.addHandler(file_handler) - logger.addHandler(logging.StreamHandler(sys.stdout)) async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: @@ -142,9 +145,10 @@ async def create_item(): if __name__ == "__main__": logger = logging.getLogger('azure.cosmos') - file_handler = logging.FileHandler('fiddler_testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.txt') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(logging.StreamHandler(sys.stdout)) - asyncio.run(create_items(0)) - # asyncio.run(create_items_initial()) \ No newline at end of file + asyncio.run(create_items(file_name)) + # asyncio.run(create_items_initial()) From a317aecfcd795d34dc1d405352de21bf332b0d9d Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 3 Feb 2025 16:08:55 -0800 Subject: [PATCH 11/17] Moved connection timeout to 5 seconds --- sdk/cosmos/azure-cosmos/azure/cosmos/documents.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py b/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py index 9b0e1ea8ba4d..ca474552b275 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py @@ -329,7 +329,7 @@ class ConnectionPolicy: # pylint: disable=too-many-instance-attributes Indicates whether service should be instructed to skip sending response payloads """ - __defaultRequestTimeout: int = 60 # seconds + __defaultRequestTimeout: int = 5 # seconds __defaultRequestTimeoutGetDatabaseAccount: int = 5 # seconds __defaultMaxBackoff: int = 1 # seconds From eb23946a03ee54046de847f406b8516f254bc93c Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 3 Feb 2025 16:55:30 -0800 Subject: [PATCH 12/17] Fixed partition key issue --- sdk/cosmos/azure-cosmos/test/demo-writes-1.py | 9 ++++----- sdk/cosmos/azure-cosmos/test/demo-writes-2.py | 9 ++++----- sdk/cosmos/azure-cosmos/test/demo-writes-3.py | 9 ++++----- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/test/demo-writes-1.py b/sdk/cosmos/azure-cosmos/test/demo-writes-1.py index 8e57f695d92e..04fcffd65112 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-writes-1.py +++ b/sdk/cosmos/azure-cosmos/test/demo-writes-1.py @@ -5,7 +5,6 @@ sys.path.append(r"") import uuid -from azure.cosmos import PartitionKey from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio @@ -78,7 +77,7 @@ async def multi_region(): enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Tomas") as client: db = client.get_database_client("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) while True: @@ -96,7 +95,7 @@ async def create_items(client_id): user_agent="Concurrent-Write-Tomas" + str(client_id) + str( client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) try: @@ -112,7 +111,7 @@ async def create_items_initial(): enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) try: @@ -133,7 +132,7 @@ async def create_item(): enable_diagnostics_logging=True, logger=logger, user_agent="-Concurrent-Write-Tomas") as client: db = await client.create_database_if_not_exists("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) try: diff --git a/sdk/cosmos/azure-cosmos/test/demo-writes-2.py b/sdk/cosmos/azure-cosmos/test/demo-writes-2.py index 3cf992b6f9e8..438440e0fa82 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-writes-2.py +++ b/sdk/cosmos/azure-cosmos/test/demo-writes-2.py @@ -5,7 +5,6 @@ sys.path.append(r"") import uuid -from azure.cosmos import PartitionKey from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio @@ -78,7 +77,7 @@ async def multi_region(): enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Tomas") as client: db = client.get_database_client("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) while True: @@ -96,7 +95,7 @@ async def create_items(client_id): user_agent="Concurrent-Write-Tomas" + str(client_id) + str( client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) try: @@ -112,7 +111,7 @@ async def create_items_initial(): enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas-") as client: db = await client.create_database_if_not_exists("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) try: @@ -133,7 +132,7 @@ async def create_item(): enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) try: diff --git a/sdk/cosmos/azure-cosmos/test/demo-writes-3.py b/sdk/cosmos/azure-cosmos/test/demo-writes-3.py index eae8a26bfd4c..11f64a208a64 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-writes-3.py +++ b/sdk/cosmos/azure-cosmos/test/demo-writes-3.py @@ -5,7 +5,6 @@ sys.path.append(r"") import uuid -from azure.cosmos import PartitionKey from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio @@ -78,7 +77,7 @@ async def multi_region(): enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Tomas") as client: db = client.get_database_client("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) while True: @@ -101,7 +100,7 @@ async def create_items(client_id): user_agent="Concurrent-Write-Tomas" + str(client_id) + datetime.now().strftime( "%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) try: @@ -117,7 +116,7 @@ async def create_items_initial(): enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-Tomas-") as client: db = await client.create_database_if_not_exists("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) try: @@ -133,7 +132,7 @@ async def create_item(): enable_diagnostics_logging=True, logger=logger, user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: db = await client.create_database_if_not_exists("SimonDB") - cont = await db.create_container_if_not_exists("SimonContainer", PartitionKey("/pk")) + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") time.sleep(1) try: From 135306be3dd07285b70161c90a980315828e2be0 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Tue, 4 Feb 2025 09:39:33 -0800 Subject: [PATCH 13/17] Updated query to be single partition query --- sdk/cosmos/azure-cosmos/test/demo-1.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/test/demo-1.py b/sdk/cosmos/azure-cosmos/test/demo-1.py index aeb664dc5de2..f14c4f99e0ff 100644 --- a/sdk/cosmos/azure-cosmos/test/demo-1.py +++ b/sdk/cosmos/azure-cosmos/test/demo-1.py @@ -56,7 +56,10 @@ async def query_items_concurrently(container, num_queries): async def perform_query(container): random_item = get_random_item() - results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[{"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}], + partition_key=random_item["pk"]) items = [item async for item in results] @@ -83,8 +86,8 @@ async def multi_region(client_id): time.sleep(1) await read_item_concurrently(cont, 5) # Number of concurrent reads time.sleep(1) - # await query_items_concurrently(cont, 500) # Number of concurrent queries - # time.sleep(1) + await query_items_concurrently(cont, 2) # Number of concurrent queries + time.sleep(1) # await change_feed(cont) # time.sleep(1) except Exception as e: From a10d4f4d2cb80a144b7e3e298f37e76b4e89a67e Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Tue, 4 Feb 2025 14:16:32 -0800 Subject: [PATCH 14/17] Fixed indentation --- .../azure/cosmos/_location_cache.py | 134 +++++++++--------- 1 file changed, 68 insertions(+), 66 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index a33af4294081..66a067ba0334 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -25,8 +25,8 @@ import collections import logging import time -from urllib.parse import urlparse from datetime import datetime +from urllib.parse import urlparse from . import documents from . import http_constants @@ -36,11 +36,13 @@ logger = logging.getLogger("azure.cosmos.LocationCache") + class EndpointOperationType(object): NoneType = "None" ReadType = "Read" WriteType = "Write" + class RegionalEndpoint(object): def __init__(self, c_endpoint: str, p_endpoint: str): self.current_endpoint = c_endpoint @@ -73,6 +75,7 @@ def swap(self): def unique_endpoints(self): return {self.current_endpoint, self.previous_endpoint} + def get_endpoints_by_location(new_locations, old_endpoints_by_location, default_regional_endpoint, @@ -82,8 +85,7 @@ def get_endpoints_by_location(new_locations, endpoints_by_location = collections.OrderedDict() parsed_locations = [] - - for new_location in new_locations: # pylint: disable=too-many-nested-blocks + for new_location in new_locations: # pylint: disable=too-many-nested-blocks # if name in new_location and same for database account endpoint if "name" in new_location and "databaseAccountEndpoint" in new_location: if not new_location["name"]: @@ -95,39 +97,39 @@ def get_endpoints_by_location(new_locations, if new_location["name"] in old_endpoints_by_location: regional_object = old_endpoints_by_location[new_location["name"]] logger.info("%s - In location cache: Existing regional object: %s", - datetime.now().strftime("%Y%m%d-%H%M%S"), str(regional_object)) - current = regional_object.get_current() - # swap the previous with current and current with new region_uri received from the gateway - if current != region_uri: - regional_object.set_previous(current) - regional_object.set_current(region_uri) - logger.info("%s - In location cache: Updated regional object: %s", - datetime.now().strftime("%Y%m%d-%H%M%S"), - str(regional_object)) - # This is the bootstrapping condition - else: - regional_object = RegionalEndpoint(region_uri, region_uri) - # if it is for writes, then we update the previous to default_endpoint - if writes and not use_multiple_write_locations: - # if region_uri is different than global endpoint set global endpoint - # as fallback - # else construct regional uri - if region_uri != default_regional_endpoint.get_current(): - regional_object.set_previous(default_regional_endpoint.get_current()) - else: - constructed_region_uri = LocationCache.GetLocationalEndpoint( - default_regional_endpoint.get_current(), - new_location["name"]) - regional_object.set_previous(constructed_region_uri) - - logger.info("%s - In location cache: This is regional object on initialization: %s", - datetime.now().strftime("%Y%m%d-%H%M%S"), - str(regional_object)) - - # pass in object with region uri , last known good, curr etc - endpoints_by_location.update({new_location["name"]: regional_object}) - except Exception as e: - raise e + datetime.now().strftime("%Y%m%d-%H%M%S"), str(regional_object)) + current = regional_object.get_current() + # swap the previous with current and current with new region_uri received from the gateway + if current != region_uri: + regional_object.set_previous(current) + regional_object.set_current(region_uri) + logger.info("%s - In location cache: Updated regional object: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + str(regional_object)) + # This is the bootstrapping condition + else: + regional_object = RegionalEndpoint(region_uri, region_uri) + # if it is for writes, then we update the previous to default_endpoint + if writes and not use_multiple_write_locations: + # if region_uri is different than global endpoint set global endpoint + # as fallback + # else construct regional uri + if region_uri != default_regional_endpoint.get_current(): + regional_object.set_previous(default_regional_endpoint.get_current()) + else: + constructed_region_uri = LocationCache.GetLocationalEndpoint( + default_regional_endpoint.get_current(), + new_location["name"]) + regional_object.set_previous(constructed_region_uri) + + logger.info("%s - In location cache: This is regional object on initialization: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + str(regional_object)) + + # pass in object with region uri , last known good, curr etc + endpoints_by_location.update({new_location["name"]: regional_object}) + except Exception as e: + raise e return endpoints_by_location, parsed_locations @@ -137,12 +139,12 @@ def current_time_millis(self): return int(round(time.time() * 1000)) def __init__( - self, - preferred_locations, - default_endpoint, - enable_endpoint_discovery, - use_multiple_write_locations, - refresh_time_interval_in_ms, + self, + preferred_locations, + default_endpoint, + enable_endpoint_discovery, + use_multiple_write_locations, + refresh_time_interval_in_ms, ): self.preferred_locations = preferred_locations self.default_regional_endpoint = RegionalEndpoint(default_endpoint, default_endpoint) @@ -156,15 +158,15 @@ def __init__( self.location_unavailability_info_by_endpoint = {} self.refresh_time_interval_in_ms = refresh_time_interval_in_ms self.last_cache_update_time_stamp = 0 - self.available_read_regional_endpoints_by_location = {} # pylint: disable=name-too-long - self.available_write_regional_endpoints_by_location = {} # pylint: disable=name-too-long + self.available_read_regional_endpoints_by_location = {} # pylint: disable=name-too-long + self.available_write_regional_endpoints_by_location = {} # pylint: disable=name-too-long self.available_write_locations = [] self.available_read_locations = [] def check_and_update_cache(self): if ( - self.location_unavailability_info_by_endpoint - and self.current_time_millis() - self.last_cache_update_time_stamp > self.refresh_time_interval_in_ms + self.location_unavailability_info_by_endpoint + and self.current_time_millis() - self.last_cache_update_time_stamp > self.refresh_time_interval_in_ms ): self.update_location_cache() @@ -231,8 +233,8 @@ def resolve_service_endpoint(self, request): ) if not use_preferred_locations or ( - documents._OperationType.IsWriteOperation(request.operation_type) - and not self.can_use_multiple_write_locations_for_request(request) + documents._OperationType.IsWriteOperation(request.operation_type) + and not self.can_use_multiple_write_locations_for_request(request) ): # For non-document resource types in case of client can use multiple write locations # or when client cannot use multiple write locations, flip-flop between the @@ -304,12 +306,12 @@ def should_refresh_endpoints(self): # pylint: disable=too-many-return-statement def clear_stale_endpoint_unavailability_info(self): new_location_unavailability_info = {} if self.location_unavailability_info_by_endpoint: - for unavailable_endpoint in self.location_unavailability_info_by_endpoint: #pylint: disable=consider-using-dict-items + for unavailable_endpoint in self.location_unavailability_info_by_endpoint: # pylint: disable=consider-using-dict-items unavailability_info = self.location_unavailability_info_by_endpoint[unavailable_endpoint] if not ( - unavailability_info - and self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"] - > self.refresh_time_interval_in_ms + unavailability_info + and self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"] + > self.refresh_time_interval_in_ms ): new_location_unavailability_info[ unavailable_endpoint @@ -335,15 +337,15 @@ def is_endpoint_unavailable_internal(self, endpoint: str, expected_available_ope ) if ( - expected_available_operation == EndpointOperationType.NoneType - or not unavailability_info - or expected_available_operation not in unavailability_info["operationType"] + expected_available_operation == EndpointOperationType.NoneType + or not unavailability_info + or expected_available_operation not in unavailability_info["operationType"] ): return False if ( - self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"] - > self.refresh_time_interval_in_ms + self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"] + > self.refresh_time_interval_in_ms ): return False # Unexpired entry present. Endpoint is unavailable @@ -415,8 +417,8 @@ def update_location_cache(self, write_locations=None, read_locations=None, enabl ) self.last_cache_update_timestamp = self.current_time_millis() # pylint: disable=attribute-defined-outside-init - def get_preferred_available_regional_endpoints( # pylint: disable=name-too-long - self, endpoints_by_location, orderedLocations, expected_available_operation, fallback_endpoint + def get_preferred_available_regional_endpoints( # pylint: disable=name-too-long + self, endpoints_by_location, orderedLocations, expected_available_operation, fallback_endpoint ): regional_endpoints = [] health_endpoints = set() @@ -424,8 +426,8 @@ def get_preferred_available_regional_endpoints( # pylint: disable=name-too-long # user passed in during documentClient init if self.enable_endpoint_discovery and endpoints_by_location: # pylint: disable=too-many-nested-blocks if ( - self.can_use_multiple_write_locations() - or expected_available_operation == EndpointOperationType.ReadType + self.can_use_multiple_write_locations() + or expected_available_operation == EndpointOperationType.ReadType ): unavailable_endpoints = [] if self.preferred_locations: @@ -473,11 +475,11 @@ def can_use_multiple_write_locations(self): def can_use_multiple_write_locations_for_request(self, request): # pylint: disable=name-too-long return self.can_use_multiple_write_locations() and ( - request.resource_type == http_constants.ResourceType.Document - or ( - request.resource_type == http_constants.ResourceType.StoredProcedure - and request.operation_type == documents._OperationType.ExecuteJavaScript - ) + request.resource_type == http_constants.ResourceType.Document + or ( + request.resource_type == http_constants.ResourceType.StoredProcedure + and request.operation_type == documents._OperationType.ExecuteJavaScript + ) ) @staticmethod From 5fc33335166c7391de195e79c43921b28cb92283 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Tue, 4 Feb 2025 18:20:04 -0800 Subject: [PATCH 15/17] Added logic for swapping on health check --- .../azure/cosmos/_global_endpoint_manager.py | 19 +++++++++++------- .../azure/cosmos/_location_cache.py | 20 +++---------------- .../aio/_global_endpoint_manager_async.py | 19 +++++++++++------- 3 files changed, 27 insertions(+), 31 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py index 01bb253f79d2..335d0b53424a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py @@ -158,16 +158,21 @@ def _endpoints_health_check(self, **kwargs): Validating if the endpoint is healthy else marking it as unavailable. """ - all_endpoints = set(self.location_cache.read_health_endpoints) - all_endpoints.update(self.location_cache.write_health_endpoints) + all_endpoints = set(self.location_cache.read_regional_endpoints[0]) + all_endpoints.update(self.location_cache.write_regional_endpoints) + count = 0 for endpoint in all_endpoints: + count += 1 + if count > 3: + break try: - self.Client._GetDatabaseAccountCheck(endpoint, **kwargs) + self.Client._GetDatabaseAccountCheck(endpoint.get_current(), **kwargs) except (exceptions.CosmosHttpResponseError, AzureError): - if endpoint in self.location_cache.read_health_endpoints: - self.mark_endpoint_unavailable_for_read(endpoint, False) - if endpoint in self.location_cache.write_health_endpoints: - self.mark_endpoint_unavailable_for_write(endpoint, False) + if endpoint in self.location_cache.read_regional_endpoints: + self.mark_endpoint_unavailable_for_read(endpoint.get_current(), False) + if endpoint in self.location_cache.write_regional_endpoints: + self.mark_endpoint_unavailable_for_write(endpoint.get_current(), False) + endpoint.swap() self.location_cache.update_location_cache() def _GetDatabaseAccountStub(self, endpoint, **kwargs): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index c5ddf9d66a3e..57f0993bd96a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -72,9 +72,6 @@ def swap(self): self.current_endpoint = self.previous_endpoint self.previous_endpoint = temp - def unique_endpoints(self): - return {self.current_endpoint, self.previous_endpoint} - def get_endpoints_by_location(new_locations, old_endpoints_by_location, @@ -153,8 +150,6 @@ def __init__( self.enable_multiple_writable_locations = False self.write_regional_endpoints = [self.default_regional_endpoint] self.read_regional_endpoints = [self.default_regional_endpoint] - self.read_health_endpoints = self.default_regional_endpoint.unique_endpoints() - self.write_health_endpoints = self.default_regional_endpoint.unique_endpoints() self.location_unavailability_info_by_endpoint = {} self.refresh_time_interval_in_ms = refresh_time_interval_in_ms self.last_cache_update_time_stamp = 0 @@ -403,13 +398,13 @@ def update_location_cache(self, write_locations=None, read_locations=None, enabl self.use_multiple_write_locations, ) - self.write_regional_endpoints, self.write_health_endpoints = self.get_preferred_available_regional_endpoints( + self.write_regional_endpoints = self.get_preferred_available_regional_endpoints( self.available_write_regional_endpoints_by_location, self.available_write_locations, EndpointOperationType.WriteType, self.default_regional_endpoint, ) - self.read_regional_endpoints, self.read_health_endpoints = self.get_preferred_available_regional_endpoints( + self.read_regional_endpoints = self.get_preferred_available_regional_endpoints( self.available_read_regional_endpoints_by_location, self.available_read_locations, EndpointOperationType.ReadType, @@ -421,7 +416,6 @@ def get_preferred_available_regional_endpoints( # pylint: disable=name-too-long self, endpoints_by_location, orderedLocations, expected_available_operation, fallback_endpoint ): regional_endpoints = [] - health_endpoints = set() # if enableEndpointDiscovery is false, we always use the defaultEndpoint that # user passed in during documentClient init if self.enable_endpoint_discovery and endpoints_by_location: # pylint: disable=too-many-nested-blocks @@ -443,11 +437,9 @@ def get_preferred_available_regional_endpoints( # pylint: disable=name-too-long unavailable_endpoints.append(regional_endpoint) else: regional_endpoints.append(regional_endpoint) - self.update_health_endpoints(health_endpoints, regional_endpoint.unique_endpoints()) if not regional_endpoints: regional_endpoints.append(fallback_endpoint) - self.update_health_endpoints(health_endpoints, fallback_endpoint.unique_endpoints()) regional_endpoints.extend(unavailable_endpoints) else: @@ -456,17 +448,11 @@ def get_preferred_available_regional_endpoints( # pylint: disable=name-too-long # location is empty during manual failover regional_endpoint = endpoints_by_location[location] regional_endpoints.append(regional_endpoint) - self.update_health_endpoints(health_endpoints, regional_endpoint.unique_endpoints()) if not regional_endpoints: regional_endpoints.append(fallback_endpoint) - self.update_health_endpoints(health_endpoints, fallback_endpoint.unique_endpoints()) - - return regional_endpoints, health_endpoints - def update_health_endpoints(self, endpoints, endpoints_to_add): - if len(endpoints) < 3: - endpoints.update(endpoints_to_add) + return regional_endpoints def can_use_multiple_write_locations(self): return self.use_multiple_write_locations and self.enable_multiple_writable_locations diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py index 9b3e85c8b18e..174205a1748d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py @@ -124,16 +124,21 @@ async def _endpoints_health_check(self, **kwargs): Validating if the endpoint is healthy else marking it as unavailable. """ - all_endpoints = set(self.location_cache.read_health_endpoints) - all_endpoints.update(self.location_cache.write_health_endpoints) + all_endpoints = set(self.location_cache.read_regional_endpoints[0]) + all_endpoints.update(self.location_cache.write_regional_endpoints) + count = 0 for endpoint in all_endpoints: + count += 1 + if count > 3: + break try: - await self.client._GetDatabaseAccountCheck(endpoint, **kwargs) + await self.client._GetDatabaseAccountCheck(endpoint.get_current(), **kwargs) except (exceptions.CosmosHttpResponseError, AzureError): - if endpoint in self.location_cache.read_health_endpoints: - self.mark_endpoint_unavailable_for_read(endpoint, False) - if endpoint in self.location_cache.write_health_endpoints: - self.mark_endpoint_unavailable_for_write(endpoint, False) + if endpoint in self.location_cache.read_regional_endpoints: + self.mark_endpoint_unavailable_for_read(endpoint.get_current(), False) + if endpoint in self.location_cache.write_regional_endpoints: + self.mark_endpoint_unavailable_for_write(endpoint.get_current(), False) + endpoint.swap() self.location_cache.update_location_cache() async def _GetDatabaseAccount(self, **kwargs): From 0d70baa5a7cecc16649146024a0ae54935f50081 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Tue, 4 Feb 2025 18:37:17 -0800 Subject: [PATCH 16/17] Added logic for swapping on health check --- .../azure/cosmos/_global_endpoint_manager.py | 4 ++-- .../azure-cosmos/azure/cosmos/_location_cache.py | 10 +++++++--- .../azure/cosmos/_service_request_retry_policy.py | 6 ------ .../azure/cosmos/aio/_global_endpoint_manager_async.py | 4 ++-- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py index 335d0b53424a..7a534c316dc5 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py @@ -158,8 +158,8 @@ def _endpoints_health_check(self, **kwargs): Validating if the endpoint is healthy else marking it as unavailable. """ - all_endpoints = set(self.location_cache.read_regional_endpoints[0]) - all_endpoints.update(self.location_cache.write_regional_endpoints) + all_endpoints = [self.location_cache.read_regional_endpoints[0]] + all_endpoints.extend(self.location_cache.write_regional_endpoints) count = 0 for endpoint in all_endpoints: count += 1 diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index 57f0993bd96a..ddc2121eb686 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -71,6 +71,9 @@ def swap(self): temp = self.current_endpoint self.current_endpoint = self.previous_endpoint self.previous_endpoint = temp + logger.warning("%s - Swapped regional endpoint values: ", + datetime.now().strftime("%Y%m%d-%H%M%S"), + " - Current: " + self.current_endpoint + " ,Previous: " + self.previous_endpoint) def get_endpoints_by_location(new_locations, @@ -214,9 +217,6 @@ def swap_regional_endpoint_values(self, request): datetime.now().strftime("%Y%m%d-%H%M%S"), str(regional_endpoint)) regional_endpoint.swap() - logger.warning("%s - Swapped regional endpoint values: %s", - datetime.now().strftime("%Y%m%d-%H%M%S"), - str(regional_endpoint)) def resolve_service_endpoint(self, request): if request.location_endpoint_to_route: @@ -347,6 +347,10 @@ def is_endpoint_unavailable_internal(self, endpoint: str, expected_available_ope return True def mark_endpoint_unavailable(self, unavailable_endpoint: str, unavailable_operation_type, refresh_cache: bool): + logger.warning("%s - Marking %s unavailable for %s ", + datetime.now().strftime("%Y%m%d-%H%M%S"), + unavailable_endpoint, + unavailable_operation_type) unavailability_info = ( self.location_unavailability_info_by_endpoint[unavailable_endpoint] if unavailable_endpoint in self.location_unavailability_info_by_endpoint diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py index 73b84c07fde4..5ce05ceaa41e 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py @@ -116,14 +116,8 @@ def resolve_next_region_service_endpoint(self): def mark_endpoint_unavailable(self, unavailable_endpoint, refresh_cache: bool): if _OperationType.IsReadOnlyOperation(self.request.operation_type): - self.logger.warning("%s - Marking %s unavailable for read", - datetime.now().strftime("%Y%m%d-%H%M%S"), - unavailable_endpoint) self.global_endpoint_manager.mark_endpoint_unavailable_for_read(unavailable_endpoint, True) else: - self.logger.warning("%s - Marking %s unavailable for write", - datetime.now().strftime("%Y%m%d-%H%M%S"), - unavailable_endpoint) self.global_endpoint_manager.mark_endpoint_unavailable_for_write(unavailable_endpoint, refresh_cache) def update_location_cache(self): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py index 174205a1748d..374fd940c184 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py @@ -124,8 +124,8 @@ async def _endpoints_health_check(self, **kwargs): Validating if the endpoint is healthy else marking it as unavailable. """ - all_endpoints = set(self.location_cache.read_regional_endpoints[0]) - all_endpoints.update(self.location_cache.write_regional_endpoints) + all_endpoints = [self.location_cache.read_regional_endpoints[0]] + all_endpoints.extend(self.location_cache.write_regional_endpoints) count = 0 for endpoint in all_endpoints: count += 1 From 3408ae92d9f422524dcd2bd7981936a063a35bbf Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Tue, 4 Feb 2025 18:41:55 -0800 Subject: [PATCH 17/17] sync demo --- sdk/cosmos/azure-cosmos/test/demo-sync.py | 81 +++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/test/demo-sync.py diff --git a/sdk/cosmos/azure-cosmos/test/demo-sync.py b/sdk/cosmos/azure-cosmos/test/demo-sync.py new file mode 100644 index 000000000000..b01cdeae022a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-sync.py @@ -0,0 +1,81 @@ +import os +import random +import sys + +from azure.cosmos import CosmosClient + +sys.path.append(r"") + + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +def upsert_item(container, num_upserts): + for _ in range(num_upserts): + container.upsert_item(get_random_item()) + + +def read_item(container, num_upserts): + for _ in range(num_upserts): + item = get_random_item() + container.read_item(item["id"], item["pk"]) + +def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[{"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}], + partition_key=random_item["pk"]) + items = [item for item in results] + + +def query_items(cont, num_queries): + for _ in range(num_queries): + perform_query(cont) + + +def multi_region(client_id): + with CosmosClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-Sync" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + upsert_item(cont, 5) # Number of concurrent upserts + time.sleep(1) + read_item(cont, 5) # Number of concurrent reads + time.sleep(1) + query_items(cont, 2) # Number of concurrent queries + time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + multi_region(file_name)