diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py index 29d2ccee8452..385c9f299f0a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py @@ -26,6 +26,7 @@ import json import logging import time +from datetime import datetime from typing import Optional, Union, Dict, Any, TYPE_CHECKING, Callable, Mapping import types @@ -121,9 +122,9 @@ def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None: resource_type = self._resource_map['docs'] if self._should_log(verb=verb,database_name=database_name,collection_name=collection_name, resource_type=resource_type, is_request=True): - self._log_client_settings() - self._log_database_account_settings() - super().on_request(request) + # self._log_client_settings() + # self._log_database_account_settings() + # super().on_request(request) self.__request_already_logged = True # pylint: disable=too-many-statements @@ -173,26 +174,41 @@ def on_response( if self._should_log(duration=duration, status_code=status_code, sub_status_code=sub_status_code, verb=verb, http_version=http_version_obj, database_name=database_name, collection_name=collection_name, resource_type=resource_type, is_request=False): + ThinClientProxyResourceType = "x-ms-thinclient-proxy-resource-type" if not self.__request_already_logged: - self._log_client_settings() - self._log_database_account_settings() - super().on_request(request) + # self._log_client_settings() + # self._log_database_account_settings() + # super().on_request(request) + self.__request_already_logged = True else: self.__request_already_logged = False - super().on_response(request, response) + # super().on_response(request, response) if self._enable_diagnostics_logging: http_response = response.http_response options = response.context.options logger = request.context.setdefault("logger", options.pop("logger", self.logger)) try: - if "start_time" in request.context: - logger.info("Elapsed time in seconds: {}".format(duration)) - else: - logger.info("Elapsed time in seconds: unknown") + logger.info("{} - Response status code: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + http_response.status_code)) + logger.info("{} - Response URL: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + request.http_request.url)) + # Thin Client headers + ThinClientProxyOperationType = "x-ms-thinclient-proxy-operation-type" + ThinClientProxyResourceType = "x-ms-thinclient-proxy-resource-type" + logger.info("{} - Operation type: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + request.http_request.headers[ThinClientProxyOperationType])) + logger.info("{} - Resource type: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + request.http_request.headers[ThinClientProxyResourceType])) + # if "start_time" in request.context: + # logger.info("Elapsed time in seconds: {}".format(duration)) + # else: + # logger.info("Elapsed time in seconds: unknown") if http_response.status_code >= 400: - logger.info("Response error message: %r", _format_error(http_response.text())) + logger.info("Response error message: %r", + _format_error(http_response.text())) except Exception as err: # pylint: disable=broad-except - logger.warning("Failed to log request: %s", repr(err)) # pylint: disable=do-not-log-exceptions + logger.warning("Failed to log request: %s", + repr(err)) # pylint: disable=do-not-log-exceptions # pylint: disable=unused-argument def _default_should_log( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index d7d68a4563ef..42ea5847e897 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -25,6 +25,7 @@ import collections import logging import time +from datetime import datetime from urllib.parse import urlparse from . import documents @@ -35,11 +36,13 @@ logger = logging.getLogger("azure.cosmos.LocationCache") + class EndpointOperationType(object): NoneType = "None" ReadType = "Read" WriteType = "Write" + class RegionalEndpoint(object): def __init__(self, c_endpoint: str, p_endpoint: str): self.current_endpoint = c_endpoint @@ -81,8 +84,7 @@ def get_endpoints_by_location(new_locations, endpoints_by_location = collections.OrderedDict() parsed_locations = [] - - for new_location in new_locations: # pylint: disable=too-many-nested-blocks + for new_location in new_locations: # pylint: disable=too-many-nested-blocks # if name in new_location and same for database account endpoint if "name" in new_location and "databaseAccountEndpoint" in new_location: if not new_location["name"]: @@ -93,11 +95,16 @@ def get_endpoints_by_location(new_locations, parsed_locations.append(new_location["name"]) if new_location["name"] in old_endpoints_by_location: regional_object = old_endpoints_by_location[new_location["name"]] + logger.info("%s - In location cache: Existing regional object: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), str(regional_object)) current = regional_object.get_current() # swap the previous with current and current with new region_uri received from the gateway if current != region_uri: regional_object.set_previous(current) regional_object.set_current(region_uri) + logger.info("%s - In location cache: Updated regional object: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + str(regional_object)) # This is the bootstrapping condition else: regional_object = RegionalEndpoint(region_uri, region_uri) @@ -109,10 +116,15 @@ def get_endpoints_by_location(new_locations, if region_uri != default_regional_endpoint.get_current(): regional_object.set_previous(default_regional_endpoint.get_current()) else: - constructed_region_uri = LocationCache.GetLocationalEndpoint( + constructed_region_uri = LocationCache.GetLocationalEndpoint( default_regional_endpoint.get_current(), new_location["name"]) regional_object.set_previous(constructed_region_uri) + + logger.info("%s - In location cache: This is regional object on initialization: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + str(regional_object)) + # pass in object with region uri , last known good, curr etc endpoints_by_location.update({new_location["name"]: regional_object}) except Exception as e: @@ -126,12 +138,12 @@ def current_time_millis(self): return int(round(time.time() * 1000)) def __init__( - self, - preferred_locations, - default_endpoint, - enable_endpoint_discovery, - use_multiple_write_locations, - refresh_time_interval_in_ms, + self, + preferred_locations, + default_endpoint, + enable_endpoint_discovery, + use_multiple_write_locations, + refresh_time_interval_in_ms, ): self.preferred_locations = preferred_locations self.default_regional_endpoint = RegionalEndpoint(default_endpoint, default_endpoint) @@ -143,15 +155,15 @@ def __init__( self.location_unavailability_info_by_endpoint = {} self.refresh_time_interval_in_ms = refresh_time_interval_in_ms self.last_cache_update_time_stamp = 0 - self.available_read_regional_endpoints_by_location = {} # pylint: disable=name-too-long - self.available_write_regional_endpoints_by_location = {} # pylint: disable=name-too-long + self.available_read_regional_endpoints_by_location = {} # pylint: disable=name-too-long + self.available_write_regional_endpoints_by_location = {} # pylint: disable=name-too-long self.available_write_locations = [] self.available_read_locations = [] def check_and_update_cache(self): if ( - self.location_unavailability_info_by_endpoint - and self.current_time_millis() - self.last_cache_update_time_stamp > self.refresh_time_interval_in_ms + self.location_unavailability_info_by_endpoint + and self.current_time_millis() - self.last_cache_update_time_stamp > self.refresh_time_interval_in_ms ): self.update_location_cache() @@ -200,7 +212,8 @@ def swap_regional_endpoint_values(self, request): ) regional_endpoint = regional_endpoints[location_index % len(regional_endpoints)] if request.location_endpoint_to_route == regional_endpoint.get_current(): - logger.warning("Swapping regional endpoint values: %s", + logger.warning("%s - Swapping regional endpoint values: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), str(regional_endpoint)) regional_endpoint.swap() @@ -214,8 +227,8 @@ def resolve_service_endpoint(self, request): ) if not use_preferred_locations or ( - documents._OperationType.IsWriteOperation(request.operation_type) - and not self.can_use_multiple_write_locations_for_request(request) + documents._OperationType.IsWriteOperation(request.operation_type) + and not self.can_use_multiple_write_locations_for_request(request) ): # For non-document resource types in case of client can use multiple write locations # or when client cannot use multiple write locations, flip-flop between the @@ -287,12 +300,12 @@ def should_refresh_endpoints(self): # pylint: disable=too-many-return-statement def clear_stale_endpoint_unavailability_info(self): new_location_unavailability_info = {} if self.location_unavailability_info_by_endpoint: - for unavailable_endpoint in self.location_unavailability_info_by_endpoint: #pylint: disable=consider-using-dict-items + for unavailable_endpoint in self.location_unavailability_info_by_endpoint: # pylint: disable=consider-using-dict-items unavailability_info = self.location_unavailability_info_by_endpoint[unavailable_endpoint] if not ( - unavailability_info - and self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"] - > self.refresh_time_interval_in_ms + unavailability_info + and self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"] + > self.refresh_time_interval_in_ms ): new_location_unavailability_info[ unavailable_endpoint @@ -318,15 +331,15 @@ def is_endpoint_unavailable_internal(self, endpoint: str, expected_available_ope ) if ( - expected_available_operation == EndpointOperationType.NoneType - or not unavailability_info - or expected_available_operation not in unavailability_info["operationType"] + expected_available_operation == EndpointOperationType.NoneType + or not unavailability_info + or expected_available_operation not in unavailability_info["operationType"] ): return False if ( - self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"] - > self.refresh_time_interval_in_ms + self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"] + > self.refresh_time_interval_in_ms ): return False # Unexpired entry present. Endpoint is unavailable @@ -401,16 +414,16 @@ def update_location_cache(self, write_locations=None, read_locations=None, enabl ) self.last_cache_update_timestamp = self.current_time_millis() # pylint: disable=attribute-defined-outside-init - def get_preferred_available_regional_endpoints( # pylint: disable=name-too-long - self, endpoints_by_location, orderedLocations, expected_available_operation, fallback_endpoint + def get_preferred_available_regional_endpoints( # pylint: disable=name-too-long + self, endpoints_by_location, orderedLocations, expected_available_operation, fallback_endpoint ): regional_endpoints = [] # if enableEndpointDiscovery is false, we always use the defaultEndpoint that # user passed in during documentClient init if self.enable_endpoint_discovery and endpoints_by_location: # pylint: disable=too-many-nested-blocks if ( - self.can_use_multiple_write_locations() - or expected_available_operation == EndpointOperationType.ReadType + self.can_use_multiple_write_locations() + or expected_available_operation == EndpointOperationType.ReadType ): unavailable_endpoints = [] if self.preferred_locations: @@ -448,11 +461,11 @@ def can_use_multiple_write_locations(self): def can_use_multiple_write_locations_for_request(self, request): # pylint: disable=name-too-long return self.can_use_multiple_write_locations() and ( - request.resource_type == http_constants.ResourceType.Document - or ( - request.resource_type == http_constants.ResourceType.StoredProcedure - and request.operation_type == documents._OperationType.ExecuteJavaScript - ) + request.resource_type == http_constants.ResourceType.Document + or ( + request.resource_type == http_constants.ResourceType.StoredProcedure + and request.operation_type == documents._OperationType.ExecuteJavaScript + ) ) @staticmethod diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py index 1b406a88ae4d..5ce05ceaa41e 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py @@ -8,6 +8,8 @@ """ import logging +from datetime import datetime + from azure.cosmos.documents import _OperationType from azure.cosmos.http_constants import ResourceType diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 735b8bd1698a..689f310c4420 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -23,7 +23,9 @@ """Document client class for the Azure Cosmos database service. """ +import logging import os +from datetime import datetime from urllib.parse import urlparse import uuid from typing import ( @@ -81,6 +83,7 @@ PartitionKeyType = Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]] # pylint: disable=line-too-long +logger = logging.getLogger("azure.cosmos.CosmosClientConnectionAsync") class CredentialDict(TypedDict, total=False): masterKey: str @@ -444,6 +447,12 @@ async def GetDatabaseAccount( self.UseMultipleWriteLocations = ( self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations ) + logger.info("%s - Database account - writable locations: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + database_account.WritableLocations) + logger.info("%s - Database account - readable locations: %s", + datetime.now().strftime("%Y%m%d-%H%M%S"), + database_account.ReadableLocations) return database_account async def _GetDatabaseAccountCheck( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py index 9d1120c5a4ca..036682f339e7 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py @@ -23,7 +23,9 @@ """ import asyncio # pylint: disable=do-not-import-asyncio import json +import logging import time +from datetime import datetime from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError, ServiceResponseError from azure.core.pipeline.policies import AsyncRetryPolicy @@ -42,6 +44,7 @@ _has_database_account_header) from ..http_constants import HttpHeaders, StatusCodes, SubStatusCodes +logger = logging.getLogger("azure.cosmos.RetryUtilityAsync") # pylint: disable=protected-access, disable=too-many-lines, disable=too-many-statements, disable=too-many-branches @@ -274,6 +277,8 @@ async def send(self, request): raise except ServiceRequestError as err: retry_error = err + logger.warning("{} - Received ServiceRequestError {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + str(retry_error))) # the request ran into a socket timeout or failed to establish a new connection # since request wasn't sent, raise exception immediately to be dealt with in client retry policies if not _has_database_account_header(request.http_request.headers): @@ -285,6 +290,8 @@ async def send(self, request): raise err except ServiceResponseError as err: retry_error = err + logger.warning("{} - Received ServiceResponseError {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + str(retry_error))) if _has_database_account_header(request.http_request.headers): raise err # Since this is ClientConnectionError, it is safe to be retried on both read and write requests @@ -304,6 +311,8 @@ async def send(self, request): raise err except AzureError as err: retry_error = err + logger.warning("{} - Received AzureError {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"), + str(retry_error))) if _has_database_account_header(request.http_request.headers): raise err if self._is_method_retryable(retry_settings, request.http_request): diff --git a/sdk/cosmos/azure-cosmos/test/demo-1.py b/sdk/cosmos/azure-cosmos/test/demo-1.py new file mode 100644 index 000000000000..f14c4f99e0ff --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-1.py @@ -0,0 +1,105 @@ +import os +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[{"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}], + partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + await query_items_concurrently(cont, 2) # Number of concurrent queries + time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(file_name)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-2.py b/sdk/cosmos/azure-cosmos/test/demo-2.py new file mode 100644 index 000000000000..aeb664dc5de2 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-2.py @@ -0,0 +1,102 @@ +import os +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(file_name)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-3.py b/sdk/cosmos/azure-cosmos/test/demo-3.py new file mode 100644 index 000000000000..aeb664dc5de2 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-3.py @@ -0,0 +1,102 @@ +import os +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(file_name)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-4.py b/sdk/cosmos/azure-cosmos/test/demo-4.py new file mode 100644 index 000000000000..aeb664dc5de2 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-4.py @@ -0,0 +1,102 @@ +import os +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(file_name)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-5.py b/sdk/cosmos/azure-cosmos/test/demo-5.py new file mode 100644 index 000000000000..e1d949af78ea --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-5.py @@ -0,0 +1,102 @@ +import os.path +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(file_name)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-reads-1.py b/sdk/cosmos/azure-cosmos/test/demo-reads-1.py new file mode 100644 index 000000000000..34187f7370a9 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-reads-1.py @@ -0,0 +1,102 @@ +import os +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-reads-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(file_name)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-reads-2.py b/sdk/cosmos/azure-cosmos/test/demo-reads-2.py new file mode 100644 index 000000000000..34187f7370a9 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-reads-2.py @@ -0,0 +1,102 @@ +import os +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-reads-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(file_name)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-reads-3.py b/sdk/cosmos/azure-cosmos/test/demo-reads-3.py new file mode 100644 index 000000000000..34187f7370a9 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-reads-3.py @@ -0,0 +1,102 @@ +import os +import random +import sys + +sys.path.append(r"") + +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently(container, num_upserts, start_id): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(start_id), "pk": "pk-" + str(start_id)})) + start_id += 1 + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def query_items_concurrently(container, num_queries): + tasks = [] + for _ in range(num_queries): + tasks.append(perform_query(container)) + await asyncio.gather(*tasks) + + +async def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c", partition_key=random_item["pk"]) + items = [item async for item in results] + + +async def change_feed(container): + response = container.query_items_change_feed(is_start_from_beginning=True) + + count = 0 + async for doc in response: + count += 1 + + +async def multi_region(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-reads-" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 5) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 5) # Number of concurrent reads + time.sleep(1) + # await query_items_concurrently(cont, 500) # Number of concurrent queries + # time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(multi_region(file_name)) + # asyncio.run(create_item()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-sync.py b/sdk/cosmos/azure-cosmos/test/demo-sync.py new file mode 100644 index 000000000000..b01cdeae022a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-sync.py @@ -0,0 +1,81 @@ +import os +import random +import sys + +from azure.cosmos import CosmosClient + +sys.path.append(r"") + + +import time +from datetime import datetime + +import sys +import logging + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +def upsert_item(container, num_upserts): + for _ in range(num_upserts): + container.upsert_item(get_random_item()) + + +def read_item(container, num_upserts): + for _ in range(num_upserts): + item = get_random_item() + container.read_item(item["id"], item["pk"]) + +def perform_query(container): + random_item = get_random_item() + results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[{"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}], + partition_key=random_item["pk"]) + items = [item for item in results] + + +def query_items(cont, num_queries): + for _ in range(num_queries): + perform_query(cont) + + +def multi_region(client_id): + with CosmosClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-VM-Sync" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = client.get_database_client("SimonDB") + cont = db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + upsert_item(cont, 5) # Number of concurrent upserts + time.sleep(1) + read_item(cont, 5) # Number of concurrent reads + time.sleep(1) + query_items(cont, 2) # Number of concurrent queries + time.sleep(1) + # await change_feed(cont) + # time.sleep(1) + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + multi_region(file_name) diff --git a/sdk/cosmos/azure-cosmos/test/demo-writes-1.py b/sdk/cosmos/azure-cosmos/test/demo-writes-1.py new file mode 100644 index 000000000000..04fcffd65112 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-writes-1.py @@ -0,0 +1,153 @@ +import os +import random +import sys + +sys.path.append(r"") + +import uuid +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Create a logger for the "azure" SDK +# logger = logging.getLogger("azure") +# logger.setLevel(logging.DEBUG) +# handler = logging.StreamHandler(stream=sys.stdout) +# logger.addHandler(handler) + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently_initial(container, num_upserts, initial): + tasks = [] + for i in range(initial, initial + num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(i), "pk": "pk-" + str(i)})) + await asyncio.gather(*tasks) + + +async def write_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append( + container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +def get_upsert_random_item(): + random_int = random.randint(1, 1000000000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_upsert_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + print(item["id"], item["pk"]) + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def multi_region(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing_2.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Tomas") as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 100) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 100) # Number of concurrent reads + except Exception as e: + raise e + + +async def create_items(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-Tomas" + str(client_id) + str( + client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + try: + while True: + await write_item_concurrently(cont, 4) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + + +async def create_items_initial(): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + try: + for i in range(0, 10000, 1000): + await write_item_concurrently_initial(cont, 1000, i) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + + +async def create_item(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('create_item_testing.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="-Concurrent-Write-Tomas") as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + try: + # await cont.create_item({"id": "Simon-4082", "pk": "pk-4082"}) + await cont.read_item("Simon-4082", "pk-4082") + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(create_items(file_name)) + # asyncio.run(create_items_initial()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-writes-2.py b/sdk/cosmos/azure-cosmos/test/demo-writes-2.py new file mode 100644 index 000000000000..438440e0fa82 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-writes-2.py @@ -0,0 +1,153 @@ +import os +import random +import sys + +sys.path.append(r"") + +import uuid +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Create a logger for the "azure" SDK +# logger = logging.getLogger("azure") +# logger.setLevel(logging.DEBUG) +# handler = logging.StreamHandler(stream=sys.stdout) +# logger.addHandler(handler) + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently_initial(container, num_upserts, initial): + tasks = [] + for i in range(initial, initial + num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(i), "pk": "pk-" + str(i)})) + await asyncio.gather(*tasks) + + +async def write_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append( + container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +def get_upsert_random_item(): + random_int = random.randint(1, 1000000000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_upsert_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + print(item["id"], item["pk"]) + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def multi_region(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing_2.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Tomas") as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 100) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 100) # Number of concurrent reads + except Exception as e: + raise e + + +async def create_items(client_id): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-Tomas" + str(client_id) + str( + client_id) + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + try: + while True: + await write_item_concurrently(cont, 4) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + + +async def create_items_initial(): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-Tomas-") as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + try: + for i in range(0, 10000, 1000): + await write_item_concurrently_initial(cont, 1000, i) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + + +async def create_item(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('create_item_testing.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + try: + # await cont.create_item({"id": "Simon-4082", "pk": "pk-4082"}) + await cont.read_item("Simon-4082", "pk-4082") + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(create_items(file_name)) + # asyncio.run(create_items_initial()) diff --git a/sdk/cosmos/azure-cosmos/test/demo-writes-3.py b/sdk/cosmos/azure-cosmos/test/demo-writes-3.py new file mode 100644 index 000000000000..11f64a208a64 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/demo-writes-3.py @@ -0,0 +1,153 @@ +import os +import random +import sys + +sys.path.append(r"") + +import uuid +from azure.cosmos.aio import CosmosClient as AsyncClient +import asyncio + +import time +from datetime import datetime + +import sys +import logging + +# Create a logger for the "azure" SDK +# logger = logging.getLogger("azure") +# logger.setLevel(logging.DEBUG) +# handler = logging.StreamHandler(stream=sys.stdout) +# logger.addHandler(handler) + +# Replace with your Cosmos DB details +preferred_regions_2 = ['East US 2 EUAP', 'Central US EUAP'] +COSMOS_URI2 = "" +COSMOS_KEY2 = "" + + +async def write_item_concurrently_initial(container, num_upserts, initial): + tasks = [] + for i in range(initial, initial + num_upserts): + tasks.append(container.upsert_item({"id": "Simon-" + str(i), "pk": "pk-" + str(i)})) + await asyncio.gather(*tasks) + + +async def write_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append( + container.upsert_item({"id": "Simon-" + str(uuid.uuid4()), "pk": "pk-" + str(random.randint(1, 10000))})) + await asyncio.gather(*tasks) + + +def get_random_item(): + random_int = random.randint(1, 10000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +def get_upsert_random_item(): + random_int = random.randint(1, 1000000000) + return {"id": "Simon-" + str(random_int), "pk": "pk-" + str(random_int)} + + +async def upsert_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + tasks.append(container.upsert_item(get_upsert_random_item())) + await asyncio.gather(*tasks) + + +async def read_item_concurrently(container, num_upserts): + tasks = [] + for _ in range(num_upserts): + item = get_random_item() + print(item["id"], item["pk"]) + tasks.append(container.read_item(item["id"], item["pk"])) + await asyncio.gather(*tasks) + + +async def multi_region(): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing_2.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Tomas") as client: + db = client.get_database_client("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, 100) # Number of concurrent upserts + time.sleep(1) + await read_item_concurrently(cont, 100) # Number of concurrent reads + except Exception as e: + raise e + + +async def create_items(client_id): + logger = logging.getLogger('azure.cosmos') + file_handler = logging.FileHandler('fiddler_testing.txt') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-Tomas" + str(client_id) + datetime.now().strftime( + "%Y%m%d-%H%M%S")) as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + try: + while True: + await write_item_concurrently(cont, 4) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + + +async def create_items_initial(): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-Tomas-") as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + try: + for i in range(0, 10000, 1000): + await write_item_concurrently_initial(cont, 1000, i) # Number of concurrent upserts + time.sleep(1) + except Exception as e: + raise e + + +async def create_item(): + async with AsyncClient(COSMOS_URI2, COSMOS_KEY2, preferred_locations=preferred_regions_2, + enable_diagnostics_logging=True, logger=logger, + user_agent="Concurrent-Write-VM-" + datetime.now().strftime("%Y%m%d-%H%M%S")) as client: + db = await client.create_database_if_not_exists("SimonDB") + cont = await db.create_container_if_not_exists("SimonContainer", "/pk") + time.sleep(1) + + try: + # await cont.create_item({"id": "Simon-4082", "pk": "pk-4082"}) + await cont.read_item("Simon-4082", "pk-4082") + except Exception as e: + raise e + + +if __name__ == "__main__": + logger = logging.getLogger('azure.cosmos') + file_name = os.path.basename(__file__) + file_handler = logging.FileHandler(file_name + '-testing-' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.log') + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(logging.StreamHandler(sys.stdout)) + asyncio.run(create_items(file_name)) + # asyncio.run(create_items_initial())