@@ -4,6 +4,10 @@ use crate::ccm::cluster::{Cluster, ClusterOptions};
4
4
use crate :: ccm:: { run_ccm_test, CLUSTER_VERSION } ;
5
5
use crate :: common:: utils:: { setup_tracing, unique_keyspace_name} ;
6
6
7
+ use scylla:: client:: execution_profile:: ExecutionProfile ;
8
+ use scylla:: cluster:: { ClusterState , Node , NodeRef } ;
9
+ use scylla:: policies:: load_balancing:: { FallbackPlan , LoadBalancingPolicy , RoutingInfo } ;
10
+ use scylla:: query:: Query ;
7
11
use tokio:: sync:: Mutex ;
8
12
9
13
fn cluster_3_nodes ( ) -> ClusterOptions {
@@ -15,6 +19,33 @@ fn cluster_3_nodes() -> ClusterOptions {
15
19
}
16
20
}
17
21
22
+ #[ derive( Debug ) ]
23
+ struct SingleTargetLBP {
24
+ target : ( Arc < Node > , Option < u32 > ) ,
25
+ }
26
+
27
+ impl LoadBalancingPolicy for SingleTargetLBP {
28
+ fn pick < ' a > (
29
+ & ' a self ,
30
+ _query : & ' a RoutingInfo ,
31
+ _cluster : & ' a ClusterState ,
32
+ ) -> Option < ( NodeRef < ' a > , Option < u32 > ) > {
33
+ Some ( ( & self . target . 0 , self . target . 1 ) )
34
+ }
35
+
36
+ fn fallback < ' a > (
37
+ & ' a self ,
38
+ _query : & ' a RoutingInfo ,
39
+ _cluster : & ' a ClusterState ,
40
+ ) -> FallbackPlan < ' a > {
41
+ Box :: new ( std:: iter:: empty ( ) )
42
+ }
43
+
44
+ fn name ( & self ) -> String {
45
+ "SingleTargetLBP" . to_owned ( )
46
+ }
47
+ }
48
+
18
49
#[ tokio:: test]
19
50
#[ cfg_attr( not( ccm_tests) , ignore) ]
20
51
async fn test_schema_agreement_all_nodes ( ) {
@@ -105,15 +136,13 @@ async fn test_schema_agreement_with_stopped_node() {
105
136
run_ccm_test ( cluster_3_nodes, test) . await ;
106
137
}
107
138
108
-
109
139
#[ tokio:: test]
110
140
#[ cfg_attr( not( ccm_tests) , ignore) ]
111
141
async fn test_schema_agreement_with_paused_node ( ) {
112
142
setup_tracing ( ) ;
113
143
async fn test ( cluster : Arc < Mutex < Cluster > > ) {
114
144
let cluster = cluster. lock ( ) . await ;
115
145
116
- // Create keyspace
117
146
let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
118
147
119
148
let keyspace = unique_keyspace_name ( ) ;
@@ -128,35 +157,41 @@ async fn test_schema_agreement_with_paused_node() {
128
157
. await
129
158
. unwrap ( ) ;
130
159
131
- // Use keyspace
132
160
session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
133
161
134
162
// Stop node 2
135
- let node = cluster. nodes ( ) . get_by_id ( 2 ) . await . unwrap ( ) ;
136
- node. write ( ) . await . pause ( ) . await . unwrap ( ) ;
137
-
138
- // Try a simple query to verify database is responsive
139
- let result = session
140
- . query_unpaged ( "SELECT * FROM system.local" , & [ ] )
141
- . await
142
- . unwrap ( ) ;
143
- println ! ( "Result after pause: {result:?}" ) ;
144
-
163
+ let node_id = 2 ;
164
+ let ccm_node = cluster. nodes ( ) . get_by_id ( node_id) . await . unwrap ( ) ;
165
+ let ccm_node_addr = ccm_node. read ( ) . await . broadcast_rpc_address ( ) . clone ( ) ;
166
+ ccm_node. write ( ) . await . pause ( ) . await . unwrap ( ) ;
167
+
168
+ // Find the corresponding Scylla node from the session to avoid querying it directly
169
+ let cluster_state = session. get_cluster_state ( ) ;
170
+ let scylla_node = cluster_state
171
+ . get_nodes_info ( )
172
+ . iter ( )
173
+ . find ( |n| n. address . ip ( ) != ccm_node_addr)
174
+ . expect ( "Could not find unpaused Scylla node for querying" ) ;
175
+
176
+ let policy = SingleTargetLBP {
177
+ target : ( scylla_node. clone ( ) , Some ( 0 ) ) ,
178
+ } ;
179
+ let execution_profile = ExecutionProfile :: builder ( )
180
+ . load_balancing_policy ( Arc :: new ( policy) )
181
+ . build ( ) ;
182
+ let mut stmt =
183
+ Query :: new ( "CREATE TABLE test_schema_agreement_paused (k int primary key, v int)" ) ;
184
+ stmt. set_execution_profile_handle ( Some ( execution_profile. into_handle ( ) ) ) ;
145
185
// Create a table while one node is paused
146
- let _result = session
147
- . query_unpaged (
148
- "CREATE TABLE test_schema_agreement_paused (k int primary key, v int)" ,
149
- & [ ] ,
150
- )
151
- . await
152
- . unwrap ( ) ;
186
+ let _result = session. query_unpaged ( stmt, & [ ] ) . await . unwrap ( ) ;
153
187
154
188
// Schema agreement should succeed with remaining up nodes
155
189
let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
156
190
assert ! ( schema_agreement. is_some( ) ) ;
157
191
158
192
// Start the node back
159
- node. write ( ) . await . resume ( ) . await . unwrap ( ) ;
193
+ ccm_node. write ( ) . await . resume ( ) . await . unwrap ( ) ;
194
+
160
195
let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
161
196
assert ! ( schema_agreement. is_some( ) ) ;
162
197
}
0 commit comments