@@ -48,152 +48,143 @@ impl LoadBalancingPolicy for SingleTargetLBP {
48
48
49
49
#[ tokio:: test]
50
50
#[ cfg_attr( not( ccm_tests) , ignore) ]
51
- async fn test_schema_agreement_all_nodes ( ) {
51
+ async fn test_schema_agreement ( ) {
52
52
setup_tracing ( ) ;
53
- async fn test ( cluster : Arc < Mutex < Cluster > > ) {
54
- let cluster = cluster. lock ( ) . await ;
55
- let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
56
-
57
- // Create keyspace
58
- let keyspace = unique_keyspace_name ( ) ;
59
- session
60
- . query_unpaged (
61
- format ! (
62
- "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
63
- keyspace
64
- ) ,
65
- & [ ] ,
66
- )
67
- . await
68
- . unwrap ( ) ;
69
-
70
- // Use keyspace
71
- session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
72
-
73
- // Create a table and check schema agreement
74
- let _result = session
75
- . query_unpaged (
76
- "CREATE TABLE test_schema_agreement_all (k int primary key, v int)" ,
77
- & [ ] ,
78
- )
79
- . await
80
- . unwrap ( ) ;
81
-
82
- // Check if schema is in agreement
83
- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
84
- assert ! ( schema_agreement. is_some( ) ) ;
85
- }
86
- run_ccm_test ( cluster_3_nodes, test) . await ;
53
+ run_ccm_test ( cluster_3_nodes, test_schema_agreement_all_nodes) . await ;
54
+ run_ccm_test ( cluster_3_nodes, test_schema_agreement_with_stopped_node) . await ;
55
+ run_ccm_test ( cluster_3_nodes, test_schema_agreement_with_paused_node) . await ;
87
56
}
88
57
89
- #[ tokio:: test]
90
- #[ cfg_attr( not( ccm_tests) , ignore) ]
91
- async fn test_schema_agreement_with_stopped_node ( ) {
92
- setup_tracing ( ) ;
93
- async fn test ( cluster : Arc < Mutex < Cluster > > ) {
94
- let cluster = cluster. lock ( ) . await ;
95
-
96
- // Create keyspace
97
- let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
98
-
99
- let keyspace = unique_keyspace_name ( ) ;
100
- session
101
- . query_unpaged (
102
- format ! (
103
- "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
104
- keyspace
105
- ) ,
106
- & [ ] ,
107
- )
108
- . await
109
- . unwrap ( ) ;
110
-
111
- // Use keyspace
112
- session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
113
-
114
- // Stop node 2
115
- let node = cluster. nodes ( ) . get_by_id ( 2 ) . await . unwrap ( ) ;
116
- node. write ( ) . await . stop ( None ) . await . unwrap ( ) ;
117
-
118
- // Create a table while one node is stopped
119
- let _result = session
120
- . query_unpaged (
121
- "CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)" ,
122
- & [ ] ,
123
- )
124
- . await
125
- . unwrap ( ) ;
126
-
127
- // Schema agreement should succeed with remaining up nodes
128
- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
129
- assert ! ( schema_agreement. is_some( ) ) ;
130
-
131
- // Start the node back
132
- node. write ( ) . await . start ( None ) . await . unwrap ( ) ;
133
- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
134
- assert ! ( schema_agreement. is_some( ) ) ;
135
- }
136
- run_ccm_test ( cluster_3_nodes, test) . await ;
58
+ async fn test_schema_agreement_all_nodes ( cluster : Arc < Mutex < Cluster > > ) {
59
+ let cluster = cluster. lock ( ) . await ;
60
+ let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
61
+
62
+ // Create keyspace
63
+ let keyspace = unique_keyspace_name ( ) ;
64
+ session
65
+ . query_unpaged (
66
+ format ! (
67
+ "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
68
+ keyspace
69
+ ) ,
70
+ & [ ] ,
71
+ )
72
+ . await
73
+ . unwrap ( ) ;
74
+
75
+ // Use keyspace
76
+ session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
77
+
78
+ // Create a table and check schema agreement
79
+ let _result = session
80
+ . query_unpaged (
81
+ "CREATE TABLE test_schema_agreement_all (k int primary key, v int)" ,
82
+ & [ ] ,
83
+ )
84
+ . await
85
+ . unwrap ( ) ;
86
+
87
+ // Check if schema is in agreement
88
+ let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
89
+ assert ! ( schema_agreement. is_some( ) ) ;
137
90
}
138
91
139
- #[ tokio:: test]
140
- #[ cfg_attr( not( ccm_tests) , ignore) ]
141
- async fn test_schema_agreement_with_paused_node ( ) {
142
- setup_tracing ( ) ;
143
- async fn test ( cluster : Arc < Mutex < Cluster > > ) {
144
- let cluster = cluster. lock ( ) . await ;
145
-
146
- let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
147
-
148
- let keyspace = unique_keyspace_name ( ) ;
149
- session
150
- . query_unpaged (
151
- format ! (
152
- "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
153
- keyspace
154
- ) ,
155
- & [ ] ,
156
- )
157
- . await
158
- . unwrap ( ) ;
159
-
160
- session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
161
-
162
- // Stop node 2
163
- let node_id = 2 ;
164
- let ccm_node = cluster. nodes ( ) . get_by_id ( node_id) . await . unwrap ( ) ;
165
- let ccm_node_addr = ccm_node. read ( ) . await . broadcast_rpc_address ( ) . clone ( ) ;
166
- ccm_node. write ( ) . await . pause ( ) . await . unwrap ( ) ;
167
-
168
- // Find the corresponding Scylla node from the session to avoid querying it directly
169
- let cluster_state = session. get_cluster_state ( ) ;
170
- let scylla_node = cluster_state
171
- . get_nodes_info ( )
172
- . iter ( )
173
- . find ( |n| n. address . ip ( ) != ccm_node_addr)
174
- . expect ( "Could not find unpaused Scylla node for querying" ) ;
175
-
176
- let policy = SingleTargetLBP {
177
- target : ( scylla_node. clone ( ) , Some ( 0 ) ) ,
178
- } ;
179
- let execution_profile = ExecutionProfile :: builder ( )
180
- . load_balancing_policy ( Arc :: new ( policy) )
181
- . build ( ) ;
182
- let mut stmt =
183
- Query :: new ( "CREATE TABLE test_schema_agreement_paused (k int primary key, v int)" ) ;
184
- stmt. set_execution_profile_handle ( Some ( execution_profile. into_handle ( ) ) ) ;
185
- // Create a table while one node is paused
186
- let _result = session. query_unpaged ( stmt, & [ ] ) . await . unwrap ( ) ;
187
-
188
- // Schema agreement should succeed with remaining up nodes
189
- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
190
- assert ! ( schema_agreement. is_some( ) ) ;
191
-
192
- // Start the node back
193
- ccm_node. write ( ) . await . resume ( ) . await . unwrap ( ) ;
194
-
195
- let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
196
- assert ! ( schema_agreement. is_some( ) ) ;
197
- }
198
- run_ccm_test ( cluster_3_nodes, test) . await ;
92
+ async fn test_schema_agreement_with_stopped_node ( cluster : Arc < Mutex < Cluster > > ) {
93
+ let cluster = cluster. lock ( ) . await ;
94
+
95
+ // Create keyspace
96
+ let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
97
+
98
+ let keyspace = unique_keyspace_name ( ) ;
99
+ session
100
+ . query_unpaged (
101
+ format ! (
102
+ "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
103
+ keyspace
104
+ ) ,
105
+ & [ ] ,
106
+ )
107
+ . await
108
+ . unwrap ( ) ;
109
+
110
+ // Use keyspace
111
+ session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
112
+
113
+ // Stop node 2
114
+ let node = cluster. nodes ( ) . get_by_id ( 2 ) . await . unwrap ( ) ;
115
+ node. write ( ) . await . stop ( None ) . await . unwrap ( ) ;
116
+
117
+ // Create a table while one node is stopped
118
+ let _result = session
119
+ . query_unpaged (
120
+ "CREATE TABLE test_schema_agreement_stopped (k int primary key, v int)" ,
121
+ & [ ] ,
122
+ )
123
+ . await
124
+ . unwrap ( ) ;
125
+
126
+ // Schema agreement should succeed with remaining up nodes
127
+ let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
128
+ assert ! ( schema_agreement. is_some( ) ) ;
129
+
130
+ // Start the node back
131
+ node. write ( ) . await . start ( None ) . await . unwrap ( ) ;
132
+ let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
133
+ assert ! ( schema_agreement. is_some( ) ) ;
134
+ }
135
+
136
+ async fn test_schema_agreement_with_paused_node ( cluster : Arc < Mutex < Cluster > > ) {
137
+ let cluster = cluster. lock ( ) . await ;
138
+
139
+ let session = cluster. make_session_builder ( ) . await . build ( ) . await . unwrap ( ) ;
140
+
141
+ let keyspace = unique_keyspace_name ( ) ;
142
+ session
143
+ . query_unpaged (
144
+ format ! (
145
+ "CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" ,
146
+ keyspace
147
+ ) ,
148
+ & [ ] ,
149
+ )
150
+ . await
151
+ . unwrap ( ) ;
152
+
153
+ session. use_keyspace ( keyspace, true ) . await . unwrap ( ) ;
154
+
155
+ // Stop node 2
156
+ let node_id = 2 ;
157
+ let ccm_node = cluster. nodes ( ) . get_by_id ( node_id) . await . unwrap ( ) ;
158
+ let ccm_node_addr = ccm_node. read ( ) . await . broadcast_rpc_address ( ) . clone ( ) ;
159
+ ccm_node. write ( ) . await . pause ( ) . await . unwrap ( ) ;
160
+
161
+ // Find the corresponding Scylla node from the session to avoid querying it directly
162
+ let cluster_state = session. get_cluster_state ( ) ;
163
+ let scylla_node = cluster_state
164
+ . get_nodes_info ( )
165
+ . iter ( )
166
+ . find ( |n| n. address . ip ( ) != ccm_node_addr)
167
+ . expect ( "Could not find unpaused Scylla node for querying" ) ;
168
+
169
+ let policy = SingleTargetLBP {
170
+ target : ( scylla_node. clone ( ) , Some ( 0 ) ) ,
171
+ } ;
172
+ let execution_profile = ExecutionProfile :: builder ( )
173
+ . load_balancing_policy ( Arc :: new ( policy) )
174
+ . build ( ) ;
175
+ let mut stmt =
176
+ Query :: new ( "CREATE TABLE test_schema_agreement_paused (k int primary key, v int)" ) ;
177
+ stmt. set_execution_profile_handle ( Some ( execution_profile. into_handle ( ) ) ) ;
178
+ // Create a table while one node is paused
179
+ let _result = session. query_unpaged ( stmt, & [ ] ) . await . unwrap ( ) ;
180
+
181
+ // Schema agreement should succeed with remaining up nodes
182
+ let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
183
+ assert ! ( schema_agreement. is_some( ) ) ;
184
+
185
+ // Start the node back
186
+ ccm_node. write ( ) . await . resume ( ) . await . unwrap ( ) ;
187
+
188
+ let schema_agreement = session. check_schema_agreement ( ) . await . unwrap ( ) ;
189
+ assert ! ( schema_agreement. is_some( ) ) ;
199
190
}
0 commit comments