Skip to content

Commit 279a3ce

Browse files
Revert Dual Endpoint Tracking (#40451)
* revert dual endpoints and update relevant tests * simplify health check * fix tests * fix pylint * Removed marking global endpoint as unavailable * Add to changelog for future reference * merge main and react to comments * fix ci tests * fix pylint * Update documentation for retries and update location cache * react to comment * fix tests * fix tests * fix tests * fix tests * fix tests * fix tests * remove unused vars * fix test * Apply suggestion from @allenkim0129 Co-authored-by: Allen Kim <[email protected]> --------- Co-authored-by: Allen Kim <[email protected]>
1 parent 6a0aab8 commit 279a3ce

23 files changed

+200
-312
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010

1111
#### Other Changes
12+
* Removed dual endpoint tracking from the sdk. See [PR 40451](https://github.com/Azure/azure-sdk-for-python/pull/40451).
1213

1314
### 4.14.0b4 (2025-09-11)
1415

sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ def _GetDatabaseAccount(self, **kwargs) -> Tuple[DatabaseAccount, str]:
143143
try:
144144
database_account = self._GetDatabaseAccountStub(self.DefaultEndpoint, **kwargs)
145145
self._database_account_cache = database_account
146-
self.location_cache.mark_endpoint_available(self.DefaultEndpoint)
147146
return database_account, self.DefaultEndpoint
148147
# If for any reason(non-globaldb related), we are not able to get the database
149148
# account from the above call to GetDatabaseAccount, we would try to get this
@@ -152,9 +151,6 @@ def _GetDatabaseAccount(self, **kwargs) -> Tuple[DatabaseAccount, str]:
152151
# until we get the database account and return None at the end, if we are not able
153152
# to get that info from any endpoints
154153
except (exceptions.CosmosHttpResponseError, AzureError):
155-
# when atm is available, L: 145, 146 should be removed as the global endpoint shouldn't be used
156-
# for dataplane operations anymore
157-
self._mark_endpoint_unavailable(self.DefaultEndpoint)
158154
for location_name in self.PreferredLocations:
159155
locational_endpoint = LocationCache.GetLocationalEndpoint(self.DefaultEndpoint, location_name)
160156
try:

sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py

Lines changed: 23 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
from . import documents, _base as base
3232
from .http_constants import ResourceType
33-
from .documents import _OperationType, ConnectionPolicy
33+
from .documents import ConnectionPolicy
3434
from ._request_object import RequestObject
3535

3636
# pylint: disable=protected-access
@@ -43,40 +43,27 @@ class EndpointOperationType(object):
4343
WriteType = "Write"
4444

4545
class RegionalRoutingContext(object):
46-
def __init__(self, primary_endpoint: str, alternate_endpoint: str):
46+
def __init__(self, primary_endpoint: str):
4747
self.primary_endpoint: str = primary_endpoint
48-
self.alternate_endpoint: str = alternate_endpoint
4948

5049
def set_primary(self, endpoint: str):
5150
self.primary_endpoint = endpoint
5251

53-
def set_alternate(self, endpoint: str):
54-
self.alternate_endpoint = endpoint
55-
5652
def get_primary(self):
5753
return self.primary_endpoint
5854

59-
def get_alternate(self):
60-
return self.alternate_endpoint
61-
6255
def __eq__(self, other):
63-
return (self.primary_endpoint == other.primary_endpoint
64-
and self.alternate_endpoint == other.alternate_endpoint)
56+
return self.primary_endpoint == other.primary_endpoint
6557

6658
def __str__(self):
67-
return "Primary: " + self.primary_endpoint + ", Alternate: " + self.alternate_endpoint
59+
return "Primary: " + self.primary_endpoint
6860

69-
def get_endpoints_by_location(new_locations: List[Dict[str, str]],
70-
old_regional_routing_contexts_by_location: Dict[str, RegionalRoutingContext],
71-
default_regional_endpoint: RegionalRoutingContext,
72-
writes: bool,
73-
use_multiple_write_locations: bool):
61+
def get_regional_routing_contexts_by_loc(new_locations: List[Dict[str, str]]):
7462
# construct from previous object
75-
regional_routing_context_by_location: OrderedDict[str, RegionalRoutingContext] = collections.OrderedDict()
63+
regional_routing_contexts_by_location: OrderedDict[str, RegionalRoutingContext] = collections.OrderedDict()
7664
parsed_locations = []
7765

78-
79-
for new_location in new_locations: # pylint: disable=too-many-nested-blocks
66+
for new_location in new_locations:
8067
# if name in new_location and same for database account endpoint
8168
if "name" in new_location and "databaseAccountEndpoint" in new_location:
8269
if not new_location["name"]:
@@ -85,44 +72,19 @@ def get_endpoints_by_location(new_locations: List[Dict[str, str]],
8572
try:
8673
region_uri = new_location["databaseAccountEndpoint"]
8774
parsed_locations.append(new_location["name"])
88-
if not writes or use_multiple_write_locations:
89-
regional_object = RegionalRoutingContext(region_uri, region_uri)
90-
elif new_location["name"] in old_regional_routing_contexts_by_location:
91-
regional_object = old_regional_routing_contexts_by_location[new_location["name"]]
92-
current = regional_object.get_primary()
93-
# swap the previous with current and current with new region_uri received from the gateway
94-
if current != region_uri:
95-
regional_object.set_alternate(current)
96-
regional_object.set_primary(region_uri)
97-
# This is the bootstrapping condition
98-
else:
99-
regional_object = RegionalRoutingContext(region_uri, region_uri)
100-
# if it is for writes, then we update the previous to default_endpoint
101-
if writes:
102-
# if region_uri is different than global endpoint set global endpoint
103-
# as fallback
104-
# else construct regional uri
105-
if region_uri != default_regional_endpoint.get_primary():
106-
regional_object.set_alternate(default_regional_endpoint.get_primary())
107-
else:
108-
constructed_region_uri = LocationCache.GetLocationalEndpoint(
109-
default_regional_endpoint.get_primary(),
110-
new_location["name"])
111-
regional_object.set_alternate(constructed_region_uri)
112-
regional_routing_context_by_location.update({new_location["name"]: regional_object})
75+
regional_object = RegionalRoutingContext(region_uri)
76+
regional_routing_contexts_by_location.update({new_location["name"]: regional_object})
11377
except Exception as e:
11478
raise e
11579

11680
# Also store a hash map of endpoints for each location
117-
locations_by_endpoints = {value.get_primary(): key for key, value in regional_routing_context_by_location.items()}
81+
locations_by_endpoints = {value.get_primary(): key for key, value in regional_routing_contexts_by_location.items()}
11882

119-
return regional_routing_context_by_location, locations_by_endpoints, parsed_locations
83+
return regional_routing_contexts_by_location, locations_by_endpoints, parsed_locations
12084

12185
def _get_health_check_endpoints(regional_routing_contexts) -> Set[str]:
12286
# should use the endpoints in the order returned from gateway and only the ones specified in preferred locations
123-
preferred_endpoints = {context.get_primary() for context in regional_routing_contexts}.union(
124-
{context.get_alternate() for context in regional_routing_contexts}
125-
)
87+
preferred_endpoints = {context.get_primary() for context in regional_routing_contexts}
12688
return preferred_endpoints
12789

12890
def _get_applicable_regional_routing_contexts(regional_routing_contexts: List[RegionalRoutingContext],
@@ -157,8 +119,7 @@ def __init__(
157119
default_endpoint: str,
158120
connection_policy: ConnectionPolicy,
159121
):
160-
self.default_regional_routing_context: RegionalRoutingContext = RegionalRoutingContext(default_endpoint,
161-
default_endpoint)
122+
self.default_regional_routing_context: RegionalRoutingContext = RegionalRoutingContext(default_endpoint)
162123
self.effective_preferred_locations: List[str] = []
163124
self.enable_multiple_writable_locations: bool = False
164125
self.write_regional_routing_contexts: List[RegionalRoutingContext] = [self.default_regional_routing_context]
@@ -205,9 +166,8 @@ def perform_on_database_account_read(self, database_account):
205166

206167
def get_all_write_endpoints(self) -> Set[str]:
207168
return {
208-
endpoint
169+
context.get_primary()
209170
for context in self.get_write_regional_routing_contexts()
210-
for endpoint in (context.get_primary(), context.get_alternate())
211171
}
212172

213173
def get_ordered_write_locations(self):
@@ -272,10 +232,6 @@ def resolve_service_endpoint(self, request):
272232
request.use_preferred_locations if request.use_preferred_locations is not None else True
273233
)
274234

275-
# whether to check for write or read unavailable
276-
endpoint_operation_type = EndpointOperationType.WriteType if (
277-
documents._OperationType.IsWriteOperation(request.operation_type)) else EndpointOperationType.ReadType
278-
279235
if not use_preferred_locations or (
280236
documents._OperationType.IsWriteOperation(request.operation_type)
281237
and not self.can_use_multiple_write_locations_for_request(request)
@@ -290,14 +246,6 @@ def resolve_service_endpoint(self, request):
290246
and write_location in self.account_write_regional_routing_contexts_by_location):
291247
write_regional_routing_context = (
292248
self.account_write_regional_routing_contexts_by_location)[write_location]
293-
if (
294-
request.last_routed_location_endpoint_within_region is not None
295-
and request.last_routed_location_endpoint_within_region
296-
== write_regional_routing_context.get_primary()
297-
or self.is_endpoint_unavailable_internal(write_regional_routing_context.get_primary(),
298-
endpoint_operation_type)
299-
):
300-
return write_regional_routing_context.get_alternate()
301249
return write_regional_routing_context.get_primary()
302250
# if endpoint discovery is off for reads it should use passed in endpoint
303251
return self.default_regional_routing_context.get_primary()
@@ -308,14 +256,6 @@ def resolve_service_endpoint(self, request):
308256
else self._get_applicable_read_regional_routing_contexts(request)
309257
)
310258
regional_routing_context = regional_routing_contexts[location_index % len(regional_routing_contexts)]
311-
if (
312-
request.last_routed_location_endpoint_within_region is not None
313-
and request.last_routed_location_endpoint_within_region
314-
== regional_routing_context.get_primary()
315-
or self.is_endpoint_unavailable_internal(regional_routing_context.get_primary(),
316-
endpoint_operation_type)
317-
):
318-
return regional_routing_context.get_alternate()
319259
return regional_routing_context.get_primary()
320260

321261
def should_refresh_endpoints(self): # pylint: disable=too-many-return-statements
@@ -342,7 +282,7 @@ def should_refresh_endpoints(self): # pylint: disable=too-many-return-statement
342282
return True
343283

344284
if not self.can_use_multiple_write_locations():
345-
if self.is_location_unavailable(self.write_regional_routing_contexts[0],
285+
if self.is_endpoint_unavailable(self.write_regional_routing_contexts[0].get_primary(),
346286
EndpointOperationType.WriteType):
347287
# same logic as other
348288
# Since most preferred write endpoint is unavailable, we can only refresh in background if
@@ -360,16 +300,7 @@ def should_refresh_endpoints(self): # pylint: disable=too-many-return-statement
360300
return should_refresh
361301
return False
362302

363-
def is_location_unavailable(self, endpoint: RegionalRoutingContext, operation_type: str):
364-
# For writes with single write region accounts only mark it unavailable if both are down
365-
if not _OperationType.IsReadOnlyOperation(operation_type) and not self.can_use_multiple_write_locations():
366-
return (self.is_endpoint_unavailable_internal(endpoint.get_primary(), operation_type)
367-
and self.is_endpoint_unavailable_internal(endpoint.get_alternate(), operation_type))
368-
369-
# For reads mark the region as down if primary endpoint is unavailable
370-
return self.is_endpoint_unavailable_internal(endpoint.get_primary(), operation_type)
371-
372-
def is_endpoint_unavailable_internal(self, endpoint: str, expected_available_operation: str):
303+
def is_endpoint_unavailable(self, endpoint: str, expected_available_operation: str):
373304
unavailability_info = (
374305
self.location_unavailability_info_by_endpoint[endpoint]
375306
if endpoint in self.location_unavailability_info_by_endpoint
@@ -420,24 +351,12 @@ def update_location_cache(self, write_locations=None, read_locations=None, enabl
420351
if read_locations:
421352
(self.account_read_regional_routing_contexts_by_location,
422353
self.account_locations_by_read_endpoints,
423-
self.account_read_locations) = get_endpoints_by_location(
424-
read_locations,
425-
self.account_read_regional_routing_contexts_by_location,
426-
self.default_regional_routing_context,
427-
False,
428-
self.connection_policy.UseMultipleWriteLocations
429-
)
354+
self.account_read_locations) = get_regional_routing_contexts_by_loc(read_locations)
430355

431356
if write_locations:
432357
(self.account_write_regional_routing_contexts_by_location,
433358
self.account_locations_by_write_endpoints,
434-
self.account_write_locations) = get_endpoints_by_location(
435-
write_locations,
436-
self.account_write_regional_routing_contexts_by_location,
437-
self.default_regional_routing_context,
438-
True,
439-
self.connection_policy.UseMultipleWriteLocations
440-
)
359+
self.account_write_locations) = get_regional_routing_contexts_by_loc(write_locations)
441360

442361
# if preferred locations is empty and the default endpoint is a global endpoint,
443362
# we should use the read locations from gateway as effective preferred locations
@@ -482,7 +401,8 @@ def get_preferred_regional_routing_contexts(
482401
regional_endpoint = endpoints_by_location[location] if location in endpoints_by_location \
483402
else None
484403
if regional_endpoint:
485-
if self.is_location_unavailable(regional_endpoint, expected_available_operation):
404+
if self.is_endpoint_unavailable(regional_endpoint.get_primary(),
405+
expected_available_operation):
486406
unavailable_endpoints.append(regional_endpoint)
487407
else:
488408
regional_endpoints.append(regional_endpoint)
@@ -525,12 +445,10 @@ def can_use_multiple_write_locations_for_request(self, request): # pylint: disa
525445

526446
def endpoints_to_health_check(self) -> Set[str]:
527447
# add read endpoints from gateway and in preferred locations
528-
health_check_endpoints = _get_health_check_endpoints(
529-
self.read_regional_routing_contexts
530-
)
448+
health_check_endpoints = _get_health_check_endpoints(self.read_regional_routing_contexts)
531449
# add first write endpoint in case that the write region is not in preferred locations
532-
health_check_endpoints = health_check_endpoints.union(_get_health_check_endpoints(
533-
self.write_regional_routing_contexts[:1]
450+
health_check_endpoints = health_check_endpoints.union(
451+
_get_health_check_endpoints(self.write_regional_routing_contexts[:1]
534452
))
535453

536454
return health_check_endpoints

sdk/cosmos/azure-cosmos/azure/cosmos/_request_object.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ def __init__(
4343
self.use_preferred_locations: Optional[bool] = None
4444
self.location_index_to_route: Optional[int] = None
4545
self.location_endpoint_to_route: Optional[str] = None
46-
self.last_routed_location_endpoint_within_region: Optional[str] = None
4746
self.excluded_locations: Optional[List[str]] = None
4847
self.excluded_locations_circuit_breaker: List[str] = []
4948
self.healthy_tentative_location: Optional[str] = None

0 commit comments

Comments
 (0)