5757from cassandra .cluster import Cluster as ClusterDriver
5858from cassandra .cluster import NoHostAvailable
5959from cassandra .policies import RetryPolicy
60+ < << << << HEAD
6061from cassandra .policies import WhiteListRoundRobinPolicy , HostFilterPolicy , RoundRobinPolicy
62+ | | | | | | | parent of 0 b4a70a12 (chore (serverless ): remove cloud_config option )
63+ from cassandra .policies import WhiteListRoundRobinPolicy , HostFilterPolicy , RoundRobinPolicy , RackAwareRoundRobinPolicy , LoadBalancingPolicy
64+ == == == =
65+ from cassandra .policies import WhiteListRoundRobinPolicy , RackAwareRoundRobinPolicy , LoadBalancingPolicy
66+ > >> >> >> 0 b4a70a12 (chore (serverless ): remove cloud_config option )
6167from cassandra .query import SimpleStatement
6268from argus .common .enums import ResourceState
6369from argus .client .sct .types import LogLink
@@ -2874,10 +2880,6 @@ def print_node_running_nemesis(self, node_ip):
28742880
28752881 return f' ({ node .running_nemesis } nemesis target node)' if node .running_nemesis else ' (not target node)'
28762882
2877- @property
2878- def is_cqlsh_support_cloud_bundle (self ):
2879- return bool (self .parent_cluster .connection_bundle_file )
2880-
28812883 @property
28822884 def is_replacement_by_host_id_supported (self ):
28832885 return ComparableScyllaVersion (self .scylla_version ) > '5.2.0~dev'
@@ -2899,12 +2901,6 @@ def _gen_cqlsh_cmd(self, command, keyspace, timeout, connect_timeout):
28992901 command = '"{}"' .format (command .strip ().replace ('"' , '\\ "' ))
29002902
29012903 cqlsh_cmd = self .add_install_prefix ('/usr/bin/cqlsh' )
2902- if self .is_cqlsh_support_cloud_bundle :
2903- connection_bundle_file = self .parent_cluster .connection_bundle_file
2904- target_connection_bundle_file = str (Path ('/tmp/' ) / connection_bundle_file .name )
2905- self .remoter .send_files (str (connection_bundle_file ), target_connection_bundle_file )
2906-
2907- return f'{ cqlsh_cmd } { options } -e { command } --cloudconf { target_connection_bundle_file } '
29082904 return f'{ cqlsh_cmd } { options } -e { command } { host } '
29092905
29102906 def run_cqlsh (self , cmd , keyspace = None , timeout = 120 , verbose = True , split = False , connect_timeout = 60 ,
@@ -3594,7 +3590,7 @@ def create_ssl_context(keyfile: str, certfile: str, truststore: str):
35943590 return ssl_context
35953591
35963592 def _create_session (self , node , keyspace , user , password , compression , protocol_version , load_balancing_policy = None , port = None , # noqa: PLR0913
3597- ssl_context = None , node_ips = None , connect_timeout = None , verbose = True , connection_bundle_file = None ):
3593+ ssl_context = None , node_ips = None , connect_timeout = None , verbose = True ):
35983594 if not port :
35993595 port = node .CQL_PORT
36003596
@@ -3620,8 +3616,6 @@ def _create_session(self, node, keyspace, user, password, compression, protocol_
36203616 self .log .debug ("ssl_context: %s" , str (ssl_context ))
36213617
36223618 kwargs = dict (contact_points = node_ips , port = port , ssl_context = ssl_context )
3623- if connection_bundle_file :
3624- kwargs = dict (scylla_cloud = connection_bundle_file )
36253619 cluster_driver = ClusterDriver (auth_provider = auth_provider ,
36263620 compression = compression ,
36273621 protocol_version = protocol_version ,
@@ -3671,21 +3665,49 @@ def cql_connection(self, node, keyspace=None, user=None,
36713665 - If a connection bundle file is available in the parent cluster, it will be used for the connection.
36723666 - If no connection bundle file is provided, the method will use the WhiteListRoundRobinPolicy with the specified nodes.
36733667 """
3668+ < << << << HEAD
36743669 if connection_bundle_file := node .parent_cluster .connection_bundle_file :
36753670 wlrr = None
36763671 node_ips = []
36773672 else :
36783673 node_ips = self .get_node_cql_ips (nodes = whitelist_nodes )
36793674 wlrr = WhiteListRoundRobinPolicy (node_ips )
3675+ | | | | | | | parent of 0 b4a70a12 (chore (serverless ): remove cloud_config option )
3676+ if connection_bundle_file := node .parent_cluster .connection_bundle_file :
3677+ wlrr = None
3678+ node_ips = []
3679+ else :
3680+ wlrr , node_ips = self .get_load_balancing_policy (whitelist_nodes = whitelist_nodes )
3681+
3682+ == == == =
3683+
3684+ wlrr , node_ips = self .get_load_balancing_policy (whitelist_nodes = whitelist_nodes )
3685+ >> >> >> > 0 b4a70a12 (chore (serverless ): remove cloud_config option )
36803686 return self ._create_session (node = node , keyspace = keyspace , user = user , password = password , compression = compression ,
36813687 protocol_version = protocol_version , load_balancing_policy = wlrr , port = port , ssl_context = ssl_context ,
3682- node_ips = node_ips , connect_timeout = connect_timeout , verbose = verbose ,
3683- connection_bundle_file = connection_bundle_file )
3688+ node_ips = node_ips , connect_timeout = connect_timeout , verbose = verbose )
36843689
36853690 def cql_connection_exclusive (self , node , keyspace = None , user = None ,
36863691 password = None , compression = True ,
36873692 protocol_version = None , port = None ,
36883693 ssl_context = None , connect_timeout = 100 , verbose = True ):
3694+ < << << << HEAD
3695+ if connection_bundle_file := node .parent_cluster .connection_bundle_file :
3696+ # TODO: handle the case of multiple datacenters
3697+ bundle_yaml = yaml .safe_load (connection_bundle_file .open ('r' , encoding = 'utf-8' ))
3698+ node_domain = None
3699+ for _ , connection_data in bundle_yaml .get ('datacenters' , {}).items ():
3700+ node_domain = connection_data .get ('nodeDomain' ).strip ()
3701+ assert node_domain , f"didn't found nodeDomain in bundle [{ connection_bundle_file } ]"
3702+
3703+ def host_filter (host ):
3704+ return str (host .host_id ) == str (node .host_id ) or node_domain == host .endpoint ._server_name
3705+ wlrr = HostFilterPolicy (child_policy = RoundRobinPolicy (), predicate = host_filter )
3706+ node_ips = []
3707+ else :
3708+ node_ips = [node .cql_address ]
3709+ wlrr = WhiteListRoundRobinPolicy (node_ips )
3710+ | | | | | | | parent of 0 b4a70a12 (chore (serverless ): remove cloud_config option )
36893711 if connection_bundle_file := node .parent_cluster .connection_bundle_file :
36903712 # TODO: handle the case of multiple datacenters
36913713 bundle_yaml = yaml .safe_load (connection_bundle_file .open ('r' , encoding = 'utf-8' ))
@@ -3699,12 +3721,21 @@ def host_filter(host):
36993721 wlrr = HostFilterPolicy (child_policy = RoundRobinPolicy (), predicate = host_filter )
37003722 node_ips = []
37013723 else :
3724+ # Use WhiteListRoundRobinPolicy with a single node IP.
3725+ # RackAwareRoundRobinPolicy is not applicable for exclusive node connections,
3726+ # as it operates based on rack and datacenter, not individual nodes.
37023727 node_ips = [node .cql_address ]
37033728 wlrr = WhiteListRoundRobinPolicy (node_ips )
3729+ == == == =
3730+ # Use WhiteListRoundRobinPolicy with a single node IP.
3731+ # RackAwareRoundRobinPolicy is not applicable for exclusive node connections,
3732+ # as it operates based on rack and datacenter, not individual nodes.
3733+ node_ips = [node .cql_address ]
3734+ wlrr = WhiteListRoundRobinPolicy (node_ips )
3735+ >> >> >> > 0 b4a70a12 (chore (serverless ): remove cloud_config option )
37043736 return self ._create_session (node = node , keyspace = keyspace , user = user , password = password , compression = compression ,
37053737 protocol_version = protocol_version , load_balancing_policy = wlrr , port = port , ssl_context = ssl_context ,
3706- node_ips = node_ips , connect_timeout = connect_timeout , verbose = verbose ,
3707- connection_bundle_file = connection_bundle_file )
3738+ node_ips = node_ips , connect_timeout = connect_timeout , verbose = verbose )
37083739
37093740 @retrying (n = 8 , sleep_time = 15 , allowed_exceptions = (NoHostAvailable ,))
37103741 def cql_connection_patient (self , node , keyspace = None ,
@@ -4172,11 +4203,6 @@ def proposed_scylla_yaml(self) -> ScyllaYaml:
41724203 )
41734204 return ScyllaYaml (** cluster_params_builder .dict (exclude_unset = True , exclude_none = True ))
41744205
4175- @property
4176- def connection_bundle_file (self ) -> Path | None :
4177- bundle_file = self .params .get ("k8s_connection_bundle_file" )
4178- return Path (bundle_file ) if bundle_file else None
4179-
41804206 @property
41814207 def racks (self ) -> Set [int ]:
41824208 return {node .rack for node in self .nodes }
0 commit comments