2727from copy import copy
2828from functools import partial , reduce , wraps
2929from itertools import groupby , count , chain
30+ import itertools
3031import json
3132import logging
3233from typing import Optional , Union
@@ -1683,14 +1684,7 @@ def protocol_downgrade(self, host_endpoint, previous_version):
16831684 "http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version" , self .protocol_version , new_version , host_endpoint )
16841685 self .protocol_version = new_version
16851686
1686- def _add_resolved_hosts (self ):
1687- for endpoint in self .endpoints_resolved :
1688- host , new = self .add_host (endpoint , signal = False )
1689- if new :
1690- host .set_up ()
1691- for listener in self .listeners :
1692- listener .on_add (host )
1693-
1687+ def _populate_hosts (self ):
16941688 self .profile_manager .populate (
16951689 weakref .proxy (self ), self .metadata .all_hosts ())
16961690 self .load_balancing_policy .populate (
@@ -1717,17 +1711,10 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17171711 self .contact_points , self .protocol_version )
17181712 self .connection_class .initialize_reactor ()
17191713 _register_cluster_shutdown (self )
1720-
1721- self ._add_resolved_hosts ()
17221714
17231715 try :
17241716 self .control_connection .connect ()
1725-
1726- # we set all contact points up for connecting, but we won't infer state after this
1727- for endpoint in self .endpoints_resolved :
1728- h = self .metadata .get_host (endpoint )
1729- if h and self .profile_manager .distance (h ) == HostDistance .IGNORED :
1730- h .is_up = None
1717+ self ._populate_hosts ()
17311718
17321719 log .debug ("Control connection created" )
17331720 except Exception :
@@ -3534,28 +3521,23 @@ def _set_new_connection(self, conn):
35343521 if old :
35353522 log .debug ("[control connection] Closing old connection %r, replacing with %r" , old , conn )
35363523 old .close ()
3537-
3538- def _connect_host_in_lbp (self ):
3524+
3525+ def _try_connect_to_hosts (self ):
35393526 errors = {}
3540- lbp = (
3541- self ._cluster .load_balancing_policy
3542- if self ._cluster ._config_mode == _ConfigMode .LEGACY else
3543- self ._cluster ._default_load_balancing_policy
3544- )
35453527
3546- for host in lbp .make_query_plan ():
3528+ lbp = self ._cluster .load_balancing_policy \
3529+ if self ._cluster ._config_mode == _ConfigMode .LEGACY else self ._cluster ._default_load_balancing_policy
3530+
3531+ # use endpoints from the default LBP if it is already initialized
3532+ for endpoint in itertools .chain ((host .endpoint for host in lbp .make_query_plan ()), self ._cluster .endpoints_resolved ):
35473533 try :
3548- return (self ._try_connect (host ), None )
3549- except ConnectionException as exc :
3550- errors [str (host .endpoint )] = exc
3551- log .warning ("[control connection] Error connecting to %s:" , host , exc_info = True )
3552- self ._cluster .signal_connection_failure (host , exc , is_host_addition = False )
3534+ return (self ._try_connect (endpoint ), None )
35533535 except Exception as exc :
3554- errors [str (host . endpoint )] = exc
3555- log .warning ("[control connection] Error connecting to %s:" , host , exc_info = True )
3536+ errors [str (endpoint )] = exc
3537+ log .warning ("[control connection] Error connecting to %s:" , endpoint , exc_info = True )
35563538 if self ._is_shutdown :
35573539 raise DriverException ("[control connection] Reconnection in progress during shutdown" )
3558-
3540+
35593541 return (None , errors )
35603542
35613543 def _reconnect_internal (self ):
@@ -3567,43 +3549,43 @@ def _reconnect_internal(self):
35673549 to the exception that was raised when an attempt was made to open
35683550 a connection to that host.
35693551 """
3570- (conn , _ ) = self ._connect_host_in_lbp ()
3552+ (conn , _ ) = self ._try_connect_to_hosts ()
35713553 if conn is not None :
35723554 return conn
35733555
35743556 # Try to re-resolve hostnames as a fallback when all hosts are unreachable
35753557 self ._cluster ._resolve_hostnames ()
35763558
3577- self ._cluster ._add_resolved_hosts ()
3559+ self ._cluster ._populate_hosts ()
35783560
3579- (conn , errors ) = self ._connect_host_in_lbp ()
3561+ (conn , errors ) = self ._try_connect_to_hosts ()
35803562 if conn is not None :
35813563 return conn
3582-
3564+
35833565 raise NoHostAvailable ("Unable to connect to any servers" , errors )
35843566
3585- def _try_connect (self , host ):
3567+ def _try_connect (self , endpoint ):
35863568 """
35873569 Creates a new Connection, registers for pushed events, and refreshes
35883570 node/token and schema metadata.
35893571 """
3590- log .debug ("[control connection] Opening new connection to %s" , host )
3572+ log .debug ("[control connection] Opening new connection to %s" , endpoint )
35913573
35923574 while True :
35933575 try :
3594- connection = self ._cluster .connection_factory (host . endpoint , is_control_connection = True )
3576+ connection = self ._cluster .connection_factory (endpoint , is_control_connection = True )
35953577 if self ._is_shutdown :
35963578 connection .close ()
35973579 raise DriverException ("Reconnecting during shutdown" )
35983580 break
35993581 except ProtocolVersionUnsupported as e :
3600- self ._cluster .protocol_downgrade (host . endpoint , e .startup_version )
3582+ self ._cluster .protocol_downgrade (endpoint , e .startup_version )
36013583 except ProtocolException as e :
36023584 # protocol v5 is out of beta in C* >=4.0-beta5 and is now the default driver
36033585 # protocol version. If the protocol version was not explicitly specified,
36043586 # and that the server raises a beta protocol error, we should downgrade.
36053587 if not self ._cluster ._protocol_version_explicit and e .is_beta_protocol_error :
3606- self ._cluster .protocol_downgrade (host . endpoint , self ._cluster .protocol_version )
3588+ self ._cluster .protocol_downgrade (endpoint , self ._cluster .protocol_version )
36073589 else :
36083590 raise
36093591
@@ -3879,6 +3861,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38793861
38803862 self ._cluster .metadata .update_host (host , old_endpoint = connection .endpoint )
38813863 connection .original_endpoint = connection .endpoint = host .endpoint
3864+ else :
3865+ log .info ("Consider local host new found host" )
3866+ peers_result .append (local_row )
38823867 # Check metadata.partitioner to see if we haven't built anything yet. If
38833868 # every node in the cluster was in the contact points, we won't discover
38843869 # any new nodes, so we need this additional check. (See PYTHON-90)
@@ -4177,8 +4162,8 @@ def _get_peers_query(self, peers_query_type, connection=None):
41774162 query_template = (self ._SELECT_SCHEMA_PEERS_TEMPLATE
41784163 if peers_query_type == self .PeersQueryType .PEERS_SCHEMA
41794164 else self ._SELECT_PEERS_NO_TOKENS_TEMPLATE )
4180- host_release_version = self ._cluster .metadata .get_host (connection .original_endpoint ).release_version
4181- host_dse_version = self ._cluster .metadata .get_host (connection .original_endpoint ).dse_version
4165+ host_release_version = None if self . _cluster . metadata . get_host ( connection . original_endpoint ) == None else self ._cluster .metadata .get_host (connection .original_endpoint ).release_version
4166+ host_dse_version = None if self . _cluster . metadata . get_host ( connection . original_endpoint ) == None else self ._cluster .metadata .get_host (connection .original_endpoint ).dse_version
41824167 uses_native_address_query = (
41834168 host_dse_version and Version (host_dse_version ) >= self ._MINIMUM_NATIVE_ADDRESS_DSE_VERSION )
41844169
0 commit comments