2424"""
2525
2626import threading
27+ from typing import Tuple
2728
2829from azure .core .exceptions import AzureError
2930
3031from . import _constants as constants
3132from . import exceptions
33+ from .documents import DatabaseAccount
3234from ._location_cache import LocationCache
3335
3436
3537# pylint: disable=protected-access
3638
3739
38- class _GlobalEndpointManager (object ):
40+ class _GlobalEndpointManager (object ): # pylint: disable=too-many-instance-attributes
3941 """
4042 This internal class implements the logic for endpoint management for
4143 geo-replicated database accounts.
@@ -51,25 +53,24 @@ def __init__(self, client):
5153 self .PreferredLocations ,
5254 self .DefaultEndpoint ,
5355 self .EnableEndpointDiscovery ,
54- client .connection_policy .UseMultipleWriteLocations ,
55- self .refresh_time_interval_in_ms ,
56+ client .connection_policy .UseMultipleWriteLocations
5657 )
5758 self .refresh_needed = False
5859 self .refresh_lock = threading .RLock ()
5960 self .last_refresh_time = 0
6061 self ._database_account_cache = None
6162
63+ def get_use_multiple_write_locations (self ):
64+ return self .location_cache .can_use_multiple_write_locations ()
65+
6266 def get_refresh_time_interval_in_ms_stub (self ):
63- return constants ._Constants .DefaultUnavailableLocationExpirationTime
67+ return constants ._Constants .DefaultEndpointsRefreshTime
6468
6569 def get_write_endpoint (self ):
66- return self .location_cache .get_write_regional_endpoint ()
70+ return self .location_cache .get_write_regional_routing_context ()
6771
6872 def get_read_endpoint (self ):
69- return self .location_cache .get_read_regional_endpoint ()
70-
71- def swap_regional_endpoint_values (self , request ):
72- return self .location_cache .swap_regional_endpoint_values (request )
73+ return self .location_cache .get_read_regional_routing_context ()
7374
7475 def resolve_service_endpoint (self , request ):
7576 return self .location_cache .resolve_service_endpoint (request )
@@ -89,7 +90,7 @@ def get_ordered_read_locations(self):
8990 def can_use_multiple_write_locations (self , request ):
9091 return self .location_cache .can_use_multiple_write_locations_for_request (request )
9192
92- def force_refresh (self , database_account ):
93+ def force_refresh_on_startup (self , database_account ):
9394 self .refresh_needed = True
9495 self .refresh_endpoint_list (database_account )
9596
@@ -118,61 +119,88 @@ def _refresh_endpoint_list_private(self, database_account=None, **kwargs):
118119 if self .location_cache .should_refresh_endpoints () or self .refresh_needed :
119120 self .refresh_needed = False
120121 self .last_refresh_time = self .location_cache .current_time_millis ()
121- database_account = self ._GetDatabaseAccount (** kwargs )
122- self .location_cache .perform_on_database_account_read (database_account )
123122 # this will perform getDatabaseAccount calls to check endpoint health
124123 self ._endpoints_health_check (** kwargs )
125124
126- def _GetDatabaseAccount (self , ** kwargs ):
125+ def _GetDatabaseAccount (self , ** kwargs ) -> Tuple [ DatabaseAccount , str ] :
127126 """Gets the database account.
128127
129128 First tries by using the default endpoint, and if that doesn't work,
130129 use the endpoints for the preferred locations in the order they are
131130 specified, to get the database account.
132- :returns: A `DatabaseAccount` instance representing the Cosmos DB Database Account.
133- :rtype: ~azure.cosmos.DatabaseAccount
131+ :returns: A `DatabaseAccount` instance representing the Cosmos DB Database Account
132+ and the endpoint that was used for the request.
133+ :rtype: tuple of (~azure.cosmos.DatabaseAccount, str)
134134 """
135135 try :
136136 database_account = self ._GetDatabaseAccountStub (self .DefaultEndpoint , ** kwargs )
137137 self ._database_account_cache = database_account
138- return database_account
138+ self .location_cache .mark_endpoint_available (self .DefaultEndpoint )
139+ return database_account , self .DefaultEndpoint
139140 # If for any reason(non-globaldb related), we are not able to get the database
140141 # account from the above call to GetDatabaseAccount, we would try to get this
141142 # information from any of the preferred locations that the user might have
142143 # specified (by creating a locational endpoint) and keeping eating the exception
143144 # until we get the database account and return None at the end, if we are not able
144145 # to get that info from any endpoints
145146 except (exceptions .CosmosHttpResponseError , AzureError ):
147+ # when atm is available, L: 145, 146 should be removed as the global endpoint shouldn't be used
148+ # for dataplane operations anymore
149+ self .mark_endpoint_unavailable_for_read (self .DefaultEndpoint , False )
150+ self .mark_endpoint_unavailable_for_write (self .DefaultEndpoint , False )
146151 for location_name in self .PreferredLocations :
147152 locational_endpoint = LocationCache .GetLocationalEndpoint (self .DefaultEndpoint , location_name )
148153 try :
149154 database_account = self ._GetDatabaseAccountStub (locational_endpoint , ** kwargs )
150155 self ._database_account_cache = database_account
151- return database_account
156+ self .location_cache .mark_endpoint_available (locational_endpoint )
157+ return database_account , locational_endpoint
152158 except (exceptions .CosmosHttpResponseError , AzureError ):
153- pass
159+ self .mark_endpoint_unavailable_for_read (locational_endpoint , False )
160+ self .mark_endpoint_unavailable_for_write (locational_endpoint , False )
154161 raise
155162
156163 def _endpoints_health_check (self , ** kwargs ):
157164 """Gets the database account for each endpoint.
158165
159166 Validating if the endpoint is healthy else marking it as unavailable.
160167 """
161- all_endpoints = [self .location_cache .read_regional_endpoints [0 ]]
162- all_endpoints .extend (self .location_cache .write_regional_endpoints )
163- count = 0
164- for endpoint in all_endpoints :
165- count += 1
166- if count > 3 :
167- break
168- try :
169- self .Client ._GetDatabaseAccountCheck (endpoint .get_current (), ** kwargs )
170- except (exceptions .CosmosHttpResponseError , AzureError ):
171- if endpoint in self .location_cache .read_regional_endpoints :
172- self .mark_endpoint_unavailable_for_read (endpoint .get_current (), False )
173- if endpoint in self .location_cache .write_regional_endpoints :
174- self .mark_endpoint_unavailable_for_write (endpoint .get_current (), False )
175- endpoint .swap ()
168+ endpoints_attempted = set ()
169+ database_account , attempted_endpoint = self ._GetDatabaseAccount (** kwargs )
170+ endpoints_attempted .add (attempted_endpoint )
171+ self .location_cache .perform_on_database_account_read (database_account )
172+ # get all the regional routing contexts to check
173+ endpoints = self .location_cache .endpoints_to_health_check ()
174+ success_count = 0
175+ for endpoint in endpoints :
176+ if endpoint not in endpoints_attempted :
177+ if success_count >= 4 :
178+ break
179+ endpoints_attempted .add (endpoint )
180+ # save current dba timeouts
181+ previous_dba_read_timeout = self .Client .connection_policy .DBAReadTimeout
182+ previous_dba_connection_timeout = self .Client .connection_policy .DBAConnectionTimeout
183+ try :
184+ if (endpoint in
185+ self .location_cache .location_unavailability_info_by_endpoint ):
186+ # if the endpoint is unavailable, we need to lower the timeouts to be more aggressive in the
187+ # health check. This helps reduce the time the health check is blocking all requests.
188+ self .Client .connection_policy .override_dba_timeouts (constants ._Constants
189+ .UnavailableEndpointDBATimeouts ,
190+ constants ._Constants
191+ .UnavailableEndpointDBATimeouts )
192+ self .Client ._GetDatabaseAccountCheck (endpoint , ** kwargs )
193+ else :
194+ self .Client ._GetDatabaseAccountCheck (endpoint , ** kwargs )
195+ success_count += 1
196+ self .location_cache .mark_endpoint_available (endpoint )
197+ except (exceptions .CosmosHttpResponseError , AzureError ):
198+ self .mark_endpoint_unavailable_for_read (endpoint , False )
199+ self .mark_endpoint_unavailable_for_write (endpoint , False )
200+ finally :
201+ # after the health check for that endpoint setting the timeouts back to their original values
202+ self .Client .connection_policy .override_dba_timeouts (previous_dba_read_timeout ,
203+ previous_dba_connection_timeout )
176204 self .location_cache .update_location_cache ()
177205
178206 def _GetDatabaseAccountStub (self , endpoint , ** kwargs ):
@@ -183,4 +211,21 @@ def _GetDatabaseAccountStub(self, endpoint, **kwargs):
183211 :returns: A `DatabaseAccount` instance representing the Cosmos DB Database Account.
184212 :rtype: ~azure.cosmos.DatabaseAccount
185213 """
186- return self .Client .GetDatabaseAccount (endpoint , ** kwargs )
214+ if endpoint in self .location_cache .location_unavailability_info_by_endpoint :
215+ previous_dba_read_timeout = self .Client .connection_policy .DBAReadTimeout
216+ previous_dba_connection_timeout = self .Client .connection_policy .DBAConnectionTimeout
217+ try :
218+ # if the endpoint is unavailable, we need to lower the timeouts to be more aggressive in the
219+ # health check. This helps reduce the time the health check is blocking all requests.
220+ self .Client .connection_policy .override_dba_timeouts (constants ._Constants
221+ .UnavailableEndpointDBATimeouts ,
222+ constants ._Constants
223+ .UnavailableEndpointDBATimeouts )
224+ database_account = self .Client .GetDatabaseAccount (endpoint , ** kwargs )
225+ finally :
226+ # after the health check for that endpoint setting the timeouts back to their original values
227+ self .Client .connection_policy .override_dba_timeouts (previous_dba_read_timeout ,
228+ previous_dba_connection_timeout )
229+ else :
230+ database_account = self .Client .GetDatabaseAccount (endpoint , ** kwargs )
231+ return database_account
0 commit comments