Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d992344
Logging changes
kushagraThapar Feb 1, 2025
a5739f3
Merge branch 'tvaron3/regionalEndpoints' into testing_logging_experim…
kushagraThapar Feb 2, 2025
8300626
Added timestamp logging changes
kushagraThapar Feb 2, 2025
f48cf3d
Added error logging
kushagraThapar Feb 2, 2025
325a9f9
Updated default connection timeout to 5 seconds
kushagraThapar Feb 2, 2025
22495af
Merged latest commit and resolved conflicts
kushagraThapar Feb 2, 2025
dde37ee
Add write files
tvaron3 Feb 3, 2025
933d1b4
Added sub status code logging
kushagraThapar Feb 3, 2025
4721246
Merge branch 'testing_logging_experiments' of github.com:kushagraThap…
kushagraThapar Feb 3, 2025
675c659
Added demo.py for stress testing
kushagraThapar Feb 3, 2025
e089117
Added more tests for stress testing
kushagraThapar Feb 3, 2025
7630f38
Removed substatus code
kushagraThapar Feb 3, 2025
ee1ee52
Merged latest commit and resolved conflicts
kushagraThapar Feb 3, 2025
e592b11
Improved logging
kushagraThapar Feb 3, 2025
a317aec
Moved connection timeout to 5 seconds
kushagraThapar Feb 4, 2025
eb23946
Fixed partition key issue
kushagraThapar Feb 4, 2025
135306b
Updated query to be single partition query
kushagraThapar Feb 4, 2025
01ba6d6
Merged latest commit and resolved conflicts
kushagraThapar Feb 4, 2025
a10d4f4
Fixed indentation
kushagraThapar Feb 4, 2025
b801048
Merge branch 'tvaron3/regionalEndpoints' into testing_logging_experim…
kushagraThapar Feb 4, 2025
454ada6
Merge branch 'tvaron3/regionalEndpoints' into testing_logging_experim…
kushagraThapar Feb 4, 2025
9540d88
Merge branch 'tvaron3/regionalEndpoints' into testing_logging_experim…
kushagraThapar Feb 5, 2025
405c0bf
Merge branch 'tvaron3/regionalEndpoints' into testing_logging_experim…
kushagraThapar Feb 5, 2025
5fc3333
Added logic for swapping on health check
kushagraThapar Feb 5, 2025
0d70baa
Added logic for swapping on health check
kushagraThapar Feb 5, 2025
3408ae9
sync demo
tvaron3 Feb 5, 2025
a3e802d
Merge branch 'testing_logging_experiments' of https://github.com/kush…
tvaron3 Feb 5, 2025
f94fea6
Merged latest commit and resolved conflicts
kushagraThapar Feb 5, 2025
372a242
Merge branch 'testing_logging_experiments' of github.com:kushagraThap…
kushagraThapar Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import json
import logging
import time
from datetime import datetime
from typing import Optional, Union, Dict, Any, TYPE_CHECKING, Callable, Mapping
import types

Expand Down Expand Up @@ -121,9 +122,9 @@ def on_request(self, request: PipelineRequest[HTTPRequestType]) -> None:
resource_type = self._resource_map['docs']
if self._should_log(verb=verb,database_name=database_name,collection_name=collection_name,
resource_type=resource_type, is_request=True):
self._log_client_settings()
self._log_database_account_settings()
super().on_request(request)
# self._log_client_settings()
# self._log_database_account_settings()
# super().on_request(request)
self.__request_already_logged = True

# pylint: disable=too-many-statements
Expand Down Expand Up @@ -173,26 +174,41 @@ def on_response(
if self._should_log(duration=duration, status_code=status_code, sub_status_code=sub_status_code,
verb=verb, http_version=http_version_obj, database_name=database_name,
collection_name=collection_name, resource_type=resource_type, is_request=False):
ThinClientProxyResourceType = "x-ms-thinclient-proxy-resource-type"
if not self.__request_already_logged:
self._log_client_settings()
self._log_database_account_settings()
super().on_request(request)
# self._log_client_settings()
# self._log_database_account_settings()
# super().on_request(request)
self.__request_already_logged = True
else:
self.__request_already_logged = False
super().on_response(request, response)
# super().on_response(request, response)
if self._enable_diagnostics_logging:
http_response = response.http_response
options = response.context.options
logger = request.context.setdefault("logger", options.pop("logger", self.logger))
try:
if "start_time" in request.context:
logger.info("Elapsed time in seconds: {}".format(duration))
else:
logger.info("Elapsed time in seconds: unknown")
logger.info("{} - Response status code: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"),
http_response.status_code))
logger.info("{} - Response URL: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"),
request.http_request.url))
# Thin Client headers
ThinClientProxyOperationType = "x-ms-thinclient-proxy-operation-type"
ThinClientProxyResourceType = "x-ms-thinclient-proxy-resource-type"
logger.info("{} - Operation type: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"),
request.http_request.headers[ThinClientProxyOperationType]))
logger.info("{} - Resource type: {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"),
request.http_request.headers[ThinClientProxyResourceType]))
# if "start_time" in request.context:
# logger.info("Elapsed time in seconds: {}".format(duration))
# else:
# logger.info("Elapsed time in seconds: unknown")
if http_response.status_code >= 400:
logger.info("Response error message: %r", _format_error(http_response.text()))
logger.info("Response error message: %r",
_format_error(http_response.text()))
except Exception as err: # pylint: disable=broad-except
logger.warning("Failed to log request: %s", repr(err)) # pylint: disable=do-not-log-exceptions
logger.warning("Failed to log request: %s",
repr(err)) # pylint: disable=do-not-log-exceptions

# pylint: disable=unused-argument
def _default_should_log(
Expand Down
81 changes: 47 additions & 34 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import collections
import logging
import time
from datetime import datetime
from urllib.parse import urlparse

from . import documents
Expand All @@ -35,11 +36,13 @@

logger = logging.getLogger("azure.cosmos.LocationCache")


class EndpointOperationType(object):
NoneType = "None"
ReadType = "Read"
WriteType = "Write"


class RegionalEndpoint(object):
def __init__(self, c_endpoint: str, p_endpoint: str):
self.current_endpoint = c_endpoint
Expand Down Expand Up @@ -81,8 +84,7 @@ def get_endpoints_by_location(new_locations,
endpoints_by_location = collections.OrderedDict()
parsed_locations = []


for new_location in new_locations: # pylint: disable=too-many-nested-blocks
for new_location in new_locations: # pylint: disable=too-many-nested-blocks
# if name in new_location and same for database account endpoint
if "name" in new_location and "databaseAccountEndpoint" in new_location:
if not new_location["name"]:
Expand All @@ -93,11 +95,16 @@ def get_endpoints_by_location(new_locations,
parsed_locations.append(new_location["name"])
if new_location["name"] in old_endpoints_by_location:
regional_object = old_endpoints_by_location[new_location["name"]]
logger.info("%s - In location cache: Existing regional object: %s",
datetime.now().strftime("%Y%m%d-%H%M%S"), str(regional_object))
current = regional_object.get_current()
# swap the previous with current and current with new region_uri received from the gateway
if current != region_uri:
regional_object.set_previous(current)
regional_object.set_current(region_uri)
logger.info("%s - In location cache: Updated regional object: %s",
datetime.now().strftime("%Y%m%d-%H%M%S"),
str(regional_object))
# This is the bootstrapping condition
else:
regional_object = RegionalEndpoint(region_uri, region_uri)
Expand All @@ -109,10 +116,15 @@ def get_endpoints_by_location(new_locations,
if region_uri != default_regional_endpoint.get_current():
regional_object.set_previous(default_regional_endpoint.get_current())
else:
constructed_region_uri = LocationCache.GetLocationalEndpoint(
constructed_region_uri = LocationCache.GetLocationalEndpoint(
default_regional_endpoint.get_current(),
new_location["name"])
regional_object.set_previous(constructed_region_uri)

logger.info("%s - In location cache: This is regional object on initialization: %s",
datetime.now().strftime("%Y%m%d-%H%M%S"),
str(regional_object))

# pass in object with region uri , last known good, curr etc
endpoints_by_location.update({new_location["name"]: regional_object})
except Exception as e:
Expand All @@ -126,12 +138,12 @@ def current_time_millis(self):
return int(round(time.time() * 1000))

def __init__(
self,
preferred_locations,
default_endpoint,
enable_endpoint_discovery,
use_multiple_write_locations,
refresh_time_interval_in_ms,
self,
preferred_locations,
default_endpoint,
enable_endpoint_discovery,
use_multiple_write_locations,
refresh_time_interval_in_ms,
):
self.preferred_locations = preferred_locations
self.default_regional_endpoint = RegionalEndpoint(default_endpoint, default_endpoint)
Expand All @@ -143,15 +155,15 @@ def __init__(
self.location_unavailability_info_by_endpoint = {}
self.refresh_time_interval_in_ms = refresh_time_interval_in_ms
self.last_cache_update_time_stamp = 0
self.available_read_regional_endpoints_by_location = {} # pylint: disable=name-too-long
self.available_write_regional_endpoints_by_location = {} # pylint: disable=name-too-long
self.available_read_regional_endpoints_by_location = {} # pylint: disable=name-too-long
self.available_write_regional_endpoints_by_location = {} # pylint: disable=name-too-long
self.available_write_locations = []
self.available_read_locations = []

def check_and_update_cache(self):
if (
self.location_unavailability_info_by_endpoint
and self.current_time_millis() - self.last_cache_update_time_stamp > self.refresh_time_interval_in_ms
self.location_unavailability_info_by_endpoint
and self.current_time_millis() - self.last_cache_update_time_stamp > self.refresh_time_interval_in_ms
):
self.update_location_cache()

Expand Down Expand Up @@ -200,7 +212,8 @@ def swap_regional_endpoint_values(self, request):
)
regional_endpoint = regional_endpoints[location_index % len(regional_endpoints)]
if request.location_endpoint_to_route == regional_endpoint.get_current():
logger.warning("Swapping regional endpoint values: %s",
logger.warning("%s - Swapping regional endpoint values: %s",
datetime.now().strftime("%Y%m%d-%H%M%S"),
str(regional_endpoint))
regional_endpoint.swap()

Expand All @@ -214,8 +227,8 @@ def resolve_service_endpoint(self, request):
)

if not use_preferred_locations or (
documents._OperationType.IsWriteOperation(request.operation_type)
and not self.can_use_multiple_write_locations_for_request(request)
documents._OperationType.IsWriteOperation(request.operation_type)
and not self.can_use_multiple_write_locations_for_request(request)
):
# For non-document resource types in case of client can use multiple write locations
# or when client cannot use multiple write locations, flip-flop between the
Expand Down Expand Up @@ -287,12 +300,12 @@ def should_refresh_endpoints(self): # pylint: disable=too-many-return-statement
def clear_stale_endpoint_unavailability_info(self):
new_location_unavailability_info = {}
if self.location_unavailability_info_by_endpoint:
for unavailable_endpoint in self.location_unavailability_info_by_endpoint: #pylint: disable=consider-using-dict-items
for unavailable_endpoint in self.location_unavailability_info_by_endpoint: # pylint: disable=consider-using-dict-items
unavailability_info = self.location_unavailability_info_by_endpoint[unavailable_endpoint]
if not (
unavailability_info
and self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"]
> self.refresh_time_interval_in_ms
unavailability_info
and self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"]
> self.refresh_time_interval_in_ms
):
new_location_unavailability_info[
unavailable_endpoint
Expand All @@ -318,15 +331,15 @@ def is_endpoint_unavailable_internal(self, endpoint: str, expected_available_ope
)

if (
expected_available_operation == EndpointOperationType.NoneType
or not unavailability_info
or expected_available_operation not in unavailability_info["operationType"]
expected_available_operation == EndpointOperationType.NoneType
or not unavailability_info
or expected_available_operation not in unavailability_info["operationType"]
):
return False

if (
self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"]
> self.refresh_time_interval_in_ms
self.current_time_millis() - unavailability_info["lastUnavailabilityCheckTimeStamp"]
> self.refresh_time_interval_in_ms
):
return False
# Unexpired entry present. Endpoint is unavailable
Expand Down Expand Up @@ -401,16 +414,16 @@ def update_location_cache(self, write_locations=None, read_locations=None, enabl
)
self.last_cache_update_timestamp = self.current_time_millis() # pylint: disable=attribute-defined-outside-init

def get_preferred_available_regional_endpoints( # pylint: disable=name-too-long
self, endpoints_by_location, orderedLocations, expected_available_operation, fallback_endpoint
def get_preferred_available_regional_endpoints( # pylint: disable=name-too-long
self, endpoints_by_location, orderedLocations, expected_available_operation, fallback_endpoint
):
regional_endpoints = []
# if enableEndpointDiscovery is false, we always use the defaultEndpoint that
# user passed in during documentClient init
if self.enable_endpoint_discovery and endpoints_by_location: # pylint: disable=too-many-nested-blocks
if (
self.can_use_multiple_write_locations()
or expected_available_operation == EndpointOperationType.ReadType
self.can_use_multiple_write_locations()
or expected_available_operation == EndpointOperationType.ReadType
):
unavailable_endpoints = []
if self.preferred_locations:
Expand Down Expand Up @@ -448,11 +461,11 @@ def can_use_multiple_write_locations(self):

def can_use_multiple_write_locations_for_request(self, request): # pylint: disable=name-too-long
return self.can_use_multiple_write_locations() and (
request.resource_type == http_constants.ResourceType.Document
or (
request.resource_type == http_constants.ResourceType.StoredProcedure
and request.operation_type == documents._OperationType.ExecuteJavaScript
)
request.resource_type == http_constants.ResourceType.Document
or (
request.resource_type == http_constants.ResourceType.StoredProcedure
and request.operation_type == documents._OperationType.ExecuteJavaScript
)
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"""

import logging
from datetime import datetime

from azure.cosmos.documents import _OperationType
from azure.cosmos.http_constants import ResourceType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

"""Document client class for the Azure Cosmos database service.
"""
import logging
import os
from datetime import datetime
from urllib.parse import urlparse
import uuid
from typing import (
Expand Down Expand Up @@ -81,6 +83,7 @@

PartitionKeyType = Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]], Type[NonePartitionKeyValue]] # pylint: disable=line-too-long

logger = logging.getLogger("azure.cosmos.CosmosClientConnectionAsync")

class CredentialDict(TypedDict, total=False):
masterKey: str
Expand Down Expand Up @@ -444,6 +447,12 @@ async def GetDatabaseAccount(
self.UseMultipleWriteLocations = (
self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations
)
logger.info("%s - Database account - writable locations: %s",
datetime.now().strftime("%Y%m%d-%H%M%S"),
database_account.WritableLocations)
logger.info("%s - Database account - readable locations: %s",
datetime.now().strftime("%Y%m%d-%H%M%S"),
database_account.ReadableLocations)
return database_account

async def _GetDatabaseAccountCheck(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
"""
import asyncio # pylint: disable=do-not-import-asyncio
import json
import logging
import time
from datetime import datetime

from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError, ServiceResponseError
from azure.core.pipeline.policies import AsyncRetryPolicy
Expand All @@ -42,6 +44,7 @@
_has_database_account_header)
from ..http_constants import HttpHeaders, StatusCodes, SubStatusCodes

logger = logging.getLogger("azure.cosmos.RetryUtilityAsync")

# pylint: disable=protected-access, disable=too-many-lines, disable=too-many-statements, disable=too-many-branches

Expand Down Expand Up @@ -274,6 +277,8 @@ async def send(self, request):
raise
except ServiceRequestError as err:
retry_error = err
logger.warning("{} - Received ServiceRequestError {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"),
str(retry_error)))
# the request ran into a socket timeout or failed to establish a new connection
# since request wasn't sent, raise exception immediately to be dealt with in client retry policies
if not _has_database_account_header(request.http_request.headers):
Expand All @@ -285,6 +290,8 @@ async def send(self, request):
raise err
except ServiceResponseError as err:
retry_error = err
logger.warning("{} - Received ServiceResponseError {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"),
str(retry_error)))
if _has_database_account_header(request.http_request.headers):
raise err
# Since this is ClientConnectionError, it is safe to be retried on both read and write requests
Expand All @@ -304,6 +311,8 @@ async def send(self, request):
raise err
except AzureError as err:
retry_error = err
logger.warning("{} - Received AzureError {}".format(datetime.now().strftime("%Y%m%d-%H%M%S"),
str(retry_error)))
if _has_database_account_header(request.http_request.headers):
raise err
if self._is_method_retryable(retry_settings, request.http_request):
Expand Down
Loading