5656from cassandra .cluster import Cluster as ClusterDriver
5757from cassandra .cluster import NoHostAvailable
5858from cassandra .policies import RetryPolicy
59+ < << << << HEAD
5960from cassandra .policies import WhiteListRoundRobinPolicy , HostFilterPolicy , RoundRobinPolicy
61+ | | | | | | | parent of 0 b4a70a12 (chore (serverless ): remove cloud_config option )
62+ from cassandra .policies import WhiteListRoundRobinPolicy , HostFilterPolicy , RoundRobinPolicy , RackAwareRoundRobinPolicy , LoadBalancingPolicy
63+ == == == =
64+ from cassandra .policies import WhiteListRoundRobinPolicy , RackAwareRoundRobinPolicy , LoadBalancingPolicy
65+ > >> >> >> 0 b4a70a12 (chore (serverless ): remove cloud_config option )
6066from cassandra .query import SimpleStatement
6167from argus .common .enums import ResourceState
6268from argus .client .sct .types import LogLink
@@ -2882,10 +2888,6 @@ def print_node_running_nemesis(self, node_ip):
28822888
28832889 return f' ({ node .running_nemesis } nemesis target node)' if node .running_nemesis else ' (not target node)'
28842890
2885- @property
2886- def is_cqlsh_support_cloud_bundle (self ):
2887- return bool (self .parent_cluster .connection_bundle_file )
2888-
28892891 @property
28902892 def is_replacement_by_host_id_supported (self ):
28912893 return ComparableScyllaVersion (self .scylla_version ) > '5.2.0~dev'
@@ -2907,12 +2909,6 @@ def _gen_cqlsh_cmd(self, command, keyspace, timeout, connect_timeout):
29072909 command = '"{}"' .format (command .strip ().replace ('"' , '\\ "' ))
29082910
29092911 cqlsh_cmd = self .add_install_prefix ('/usr/bin/cqlsh' )
2910- if self .is_cqlsh_support_cloud_bundle :
2911- connection_bundle_file = self .parent_cluster .connection_bundle_file
2912- target_connection_bundle_file = str (Path ('/tmp/' ) / connection_bundle_file .name )
2913- self .remoter .send_files (str (connection_bundle_file ), target_connection_bundle_file )
2914-
2915- return f'{ cqlsh_cmd } { options } -e { command } --cloudconf { target_connection_bundle_file } '
29162912 return f'{ cqlsh_cmd } { options } -e { command } { host } '
29172913
29182914 def run_cqlsh (self , cmd , keyspace = None , timeout = 120 , verbose = True , split = False , connect_timeout = 60 ,
@@ -3620,7 +3616,7 @@ def create_ssl_context(keyfile: str, certfile: str, truststore: str):
36203616 return ssl_context
36213617
36223618 def _create_session (self , node , keyspace , user , password , compression , protocol_version , load_balancing_policy = None , port = None , # noqa: PLR0913
3623- ssl_context = None , node_ips = None , connect_timeout = None , verbose = True , connection_bundle_file = None ):
3619+ ssl_context = None , node_ips = None , connect_timeout = None , verbose = True ):
36243620 if not port :
36253621 port = node .CQL_PORT
36263622
@@ -3646,8 +3642,6 @@ def _create_session(self, node, keyspace, user, password, compression, protocol_
36463642 self .log .debug ("ssl_context: %s" , str (ssl_context ))
36473643
36483644 kwargs = dict (contact_points = node_ips , port = port , ssl_context = ssl_context )
3649- if connection_bundle_file :
3650- kwargs = dict (scylla_cloud = connection_bundle_file )
36513645 cluster_driver = ClusterDriver (auth_provider = auth_provider ,
36523646 compression = compression ,
36533647 protocol_version = protocol_version ,
@@ -3697,21 +3691,49 @@ def cql_connection(self, node, keyspace=None, user=None,
36973691 - If a connection bundle file is available in the parent cluster, it will be used for the connection.
36983692 - If no connection bundle file is provided, the method will use the WhiteListRoundRobinPolicy with the specified nodes.
36993693 """
3694+ < << << << HEAD
37003695 if connection_bundle_file := node .parent_cluster .connection_bundle_file :
37013696 wlrr = None
37023697 node_ips = []
37033698 else :
37043699 node_ips = self .get_node_cql_ips (nodes = whitelist_nodes )
37053700 wlrr = WhiteListRoundRobinPolicy (node_ips )
3701+ | | | | | | | parent of 0 b4a70a12 (chore (serverless ): remove cloud_config option )
3702+ if connection_bundle_file := node .parent_cluster .connection_bundle_file :
3703+ wlrr = None
3704+ node_ips = []
3705+ else :
3706+ wlrr , node_ips = self .get_load_balancing_policy (whitelist_nodes = whitelist_nodes )
3707+
3708+ == == == =
3709+
3710+ wlrr , node_ips = self .get_load_balancing_policy (whitelist_nodes = whitelist_nodes )
3711+ >> >> >> > 0 b4a70a12 (chore (serverless ): remove cloud_config option )
37063712 return self ._create_session (node = node , keyspace = keyspace , user = user , password = password , compression = compression ,
37073713 protocol_version = protocol_version , load_balancing_policy = wlrr , port = port , ssl_context = ssl_context ,
3708- node_ips = node_ips , connect_timeout = connect_timeout , verbose = verbose ,
3709- connection_bundle_file = connection_bundle_file )
3714+ node_ips = node_ips , connect_timeout = connect_timeout , verbose = verbose )
37103715
37113716 def cql_connection_exclusive (self , node , keyspace = None , user = None ,
37123717 password = None , compression = True ,
37133718 protocol_version = None , port = None ,
37143719 ssl_context = None , connect_timeout = 100 , verbose = True ):
3720+ < << << << HEAD
3721+ if connection_bundle_file := node .parent_cluster .connection_bundle_file :
3722+ # TODO: handle the case of multiple datacenters
3723+ bundle_yaml = yaml .safe_load (connection_bundle_file .open ('r' , encoding = 'utf-8' ))
3724+ node_domain = None
3725+ for _ , connection_data in bundle_yaml .get ('datacenters' , {}).items ():
3726+ node_domain = connection_data .get ('nodeDomain' ).strip ()
3727+ assert node_domain , f"didn't found nodeDomain in bundle [{ connection_bundle_file } ]"
3728+
3729+ def host_filter (host ):
3730+ return str (host .host_id ) == str (node .host_id ) or node_domain == host .endpoint ._server_name
3731+ wlrr = HostFilterPolicy (child_policy = RoundRobinPolicy (), predicate = host_filter )
3732+ node_ips = []
3733+ else :
3734+ node_ips = [node .cql_address ]
3735+ wlrr = WhiteListRoundRobinPolicy (node_ips )
3736+ | | | | | | | parent of 0 b4a70a12 (chore (serverless ): remove cloud_config option )
37153737 if connection_bundle_file := node .parent_cluster .connection_bundle_file :
37163738 # TODO: handle the case of multiple datacenters
37173739 bundle_yaml = yaml .safe_load (connection_bundle_file .open ('r' , encoding = 'utf-8' ))
@@ -3725,12 +3747,21 @@ def host_filter(host):
37253747 wlrr = HostFilterPolicy (child_policy = RoundRobinPolicy (), predicate = host_filter )
37263748 node_ips = []
37273749 else :
3750+ # Use WhiteListRoundRobinPolicy with a single node IP.
3751+ # RackAwareRoundRobinPolicy is not applicable for exclusive node connections,
3752+ # as it operates based on rack and datacenter, not individual nodes.
37283753 node_ips = [node .cql_address ]
37293754 wlrr = WhiteListRoundRobinPolicy (node_ips )
3755+ == == == =
3756+ # Use WhiteListRoundRobinPolicy with a single node IP.
3757+ # RackAwareRoundRobinPolicy is not applicable for exclusive node connections,
3758+ # as it operates based on rack and datacenter, not individual nodes.
3759+ node_ips = [node .cql_address ]
3760+ wlrr = WhiteListRoundRobinPolicy (node_ips )
3761+ >> >> >> > 0 b4a70a12 (chore (serverless ): remove cloud_config option )
37303762 return self ._create_session (node = node , keyspace = keyspace , user = user , password = password , compression = compression ,
37313763 protocol_version = protocol_version , load_balancing_policy = wlrr , port = port , ssl_context = ssl_context ,
3732- node_ips = node_ips , connect_timeout = connect_timeout , verbose = verbose ,
3733- connection_bundle_file = connection_bundle_file )
3764+ node_ips = node_ips , connect_timeout = connect_timeout , verbose = verbose )
37343765
37353766 @retrying (n = 8 , sleep_time = 15 , allowed_exceptions = (NoHostAvailable ,))
37363767 def cql_connection_patient (self , node , keyspace = None ,
@@ -4198,11 +4229,6 @@ def proposed_scylla_yaml(self) -> ScyllaYaml:
41984229 )
41994230 return ScyllaYaml (** cluster_params_builder .model_dump (exclude_unset = True , exclude_none = True ))
42004231
4201- @property
4202- def connection_bundle_file (self ) -> Path | None :
4203- bundle_file = self .params .get ("k8s_connection_bundle_file" )
4204- return Path (bundle_file ) if bundle_file else None
4205-
42064232 @property
42074233 def racks (self ) -> Set [int ]:
42084234 return {node .rack for node in self .nodes }
0 commit comments