@@ -2,8 +2,8 @@ use std::sync::Arc;
2
2
3
3
use crate :: utils:: {
4
4
execute_prepared_statement_everywhere, execute_unprepared_statement_everywhere,
5
- scylla_supports_tablets, setup_tracing, test_with_3_node_cluster , unique_keyspace_name ,
6
- PerformDDL ,
5
+ scylla_supports_tablets, setup_tracing, supports_feature , test_with_3_node_cluster ,
6
+ unique_keyspace_name , PerformDDL ,
7
7
} ;
8
8
9
9
use futures:: future:: try_join_all;
@@ -19,7 +19,7 @@ use scylla_proxy::{
19
19
ShardAwareness , TargetShard , WorkerError ,
20
20
} ;
21
21
22
- use tokio:: sync:: mpsc;
22
+ use tokio:: sync:: mpsc:: { self , UnboundedReceiver } ;
23
23
use tracing:: info;
24
24
use uuid:: Uuid ;
25
25
@@ -29,6 +29,7 @@ struct SelectedTablet {
29
29
replicas : Vec < ( Uuid , i32 ) > ,
30
30
}
31
31
32
+ #[ derive( Eq , PartialEq ) ]
32
33
struct Tablet {
33
34
first_token : i64 ,
34
35
last_token : i64 ,
@@ -168,31 +169,123 @@ fn count_tablet_feedbacks(
168
169
}
169
170
170
171
async fn prepare_schema ( session : & Session , ks : & str , table : & str , tablet_count : usize ) {
172
+ let supports_table_tablet_options = supports_feature ( session, "TABLET_OPTIONS" ) . await ;
173
+ let ( keyspace_tablet_opts, table_tablet_opts) = if supports_table_tablet_options {
174
+ (
175
+ "AND tablets = { 'enabled': true }" . to_string ( ) ,
176
+ format ! ( "WITH tablets = {{ 'min_tablet_count': {tablet_count} }}" ) ,
177
+ )
178
+ } else {
179
+ (
180
+ format ! ( "AND tablets = {{ 'initial': {tablet_count} }}" ) ,
181
+ String :: new ( ) ,
182
+ )
183
+ } ;
184
+
171
185
session
172
186
. ddl ( format ! (
173
187
"CREATE KEYSPACE IF NOT EXISTS {ks}
174
188
WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 2}}
175
- AND tablets = {{ 'initial': {tablet_count} } }"
189
+ {keyspace_tablet_opts }"
176
190
) )
177
191
. await
178
192
. unwrap ( ) ;
179
193
session
180
194
. ddl ( format ! (
181
- "CREATE TABLE IF NOT EXISTS {ks}.{table} (a int, b int, c text, primary key (a, b))"
195
+ "CREATE TABLE IF NOT EXISTS {ks}.{table} (a int, b int, c text, primary key (a, b))
196
+ {table_tablet_opts}"
182
197
) )
183
198
. await
184
199
. unwrap ( ) ;
185
200
}
186
201
187
- /// Tests that, when using DefaultPolicy with TokenAwareness and querying table
188
- /// that uses tablets:
189
- /// 1. When querying data that belongs to tablet we didn't receive yet we will
190
- /// receive this tablet at some point.
191
- /// 2. When we have all tablets info locally then we'll never receive tablet info.
202
+ async fn populate_internal_driver_tablet_info (
203
+ session : & Session ,
204
+ prepared : & PreparedStatement ,
205
+ value_per_tablet : & [ ( i32 , i32 ) ] ,
206
+ feedback_rxs : & mut [ UnboundedReceiver < ( ResponseFrame , Option < u16 > ) > ] ,
207
+ ) -> Result < ( ) , String > {
208
+ let mut total_tablets_with_feedback = 0 ;
209
+ for values in value_per_tablet. iter ( ) {
210
+ info ! (
211
+ "First loop, trying key {:?}, token: {}" ,
212
+ values,
213
+ prepared. calculate_token( & values) . unwrap( ) . unwrap( ) . value( )
214
+ ) ;
215
+ execute_prepared_statement_everywhere (
216
+ session,
217
+ & session. get_cluster_state ( ) ,
218
+ prepared,
219
+ values,
220
+ )
221
+ . await
222
+ . unwrap ( ) ;
223
+ let feedbacks: usize = feedback_rxs. iter_mut ( ) . map ( count_tablet_feedbacks) . sum ( ) ;
224
+ if feedbacks > 0 {
225
+ total_tablets_with_feedback += 1 ;
226
+ }
227
+ }
228
+
229
+ if total_tablets_with_feedback == value_per_tablet. len ( ) {
230
+ Ok ( ( ) )
231
+ } else {
232
+ Err ( format ! (
233
+ "Expected feedback for {} tablets, got it for {}" ,
234
+ value_per_tablet. len( ) ,
235
+ total_tablets_with_feedback
236
+ ) )
237
+ }
238
+ }
239
+
240
+ async fn verify_queries_routed_to_correct_tablets (
241
+ session : & Session ,
242
+ prepared : & PreparedStatement ,
243
+ value_per_tablet : & [ ( i32 , i32 ) ] ,
244
+ feedback_rxs : & mut [ UnboundedReceiver < ( ResponseFrame , Option < u16 > ) > ] ,
245
+ ) -> Result < ( ) , String > {
246
+ for values in value_per_tablet. iter ( ) {
247
+ info ! (
248
+ "Second loop, trying key {:?}, token: {}" ,
249
+ values,
250
+ prepared. calculate_token( & values) . unwrap( ) . unwrap( ) . value( )
251
+ ) ;
252
+ try_join_all ( ( 0 ..100 ) . map ( |_| async { session. execute_unpaged ( prepared, values) . await } ) )
253
+ . await
254
+ . unwrap ( ) ;
255
+ let feedbacks: usize = feedback_rxs. iter_mut ( ) . map ( count_tablet_feedbacks) . sum ( ) ;
256
+ if feedbacks != 0 {
257
+ return Err ( format ! ( "Expected 0 tablet feedbacks, received {feedbacks}" ) ) ;
258
+ }
259
+ }
260
+
261
+ Ok ( ( ) )
262
+ }
263
+
264
+ async fn run_test_default_policy_is_tablet_aware_attempt (
265
+ session : & Session ,
266
+ prepared : & PreparedStatement ,
267
+ value_per_tablet : & [ ( i32 , i32 ) ] ,
268
+ feedback_rxs : & mut [ UnboundedReceiver < ( ResponseFrame , Option < u16 > ) > ] ,
269
+ ) -> Result < ( ) , String > {
270
+ populate_internal_driver_tablet_info ( session, prepared, value_per_tablet, feedback_rxs) . await ?;
271
+
272
+ // Now we must have info about all the tablets. It should not be
273
+ // possible to receive any feedback if DefaultPolicy is properly
274
+ // tablet-aware.
275
+ verify_queries_routed_to_correct_tablets ( session, prepared, value_per_tablet, feedback_rxs)
276
+ . await
277
+ }
278
+
279
+ /// Tests that, when:
280
+ /// - Using DefaultPolicy with TokenAwareness
281
+ /// - Querying table that uses tablets
282
+ /// - Driver has all driver info fetched locally
283
+ /// Then we'll never receive tablet info.
192
284
///
193
- /// The test first sends 100 queries per tablet and expects to receive tablet info.
194
- /// After that we know we have all the info. The test sends the statements again
195
- /// and expects to not receive any tablet info.
285
+ /// The test first sends, to each possible target, one insert request per tablet.
286
+ /// It expects to get tablet feedback for each tablet.
287
+ /// After that we know we have all the info. The test sends the same insert request
288
+ /// per tablet, this time using DefaultPolicy, and expects to not get any feedbacks.
196
289
#[ cfg_attr( scylla_cloud_tests, ignore) ]
197
290
#[ tokio:: test]
198
291
#[ ntest:: timeout( 30000 ) ]
@@ -220,15 +313,11 @@ async fn test_default_policy_is_tablet_aware() {
220
313
/* Prepare schema */
221
314
prepare_schema ( & session, & ks, "t" , TABLET_COUNT ) . await ;
222
315
223
- let tablets = get_tablets ( & session, & ks, "t" ) . await ;
224
-
225
316
let prepared = session
226
317
. prepare ( format ! ( "INSERT INTO {ks}.t (a, b, c) VALUES (?, ?, 'abc')" ) )
227
318
. await
228
319
. unwrap ( ) ;
229
320
230
- let value_lists = calculate_key_per_tablet ( & tablets, & prepared) ;
231
-
232
321
let ( feedback_txs, mut feedback_rxs) : ( Vec < _ > , Vec < _ > ) = ( 0 ..3 )
233
322
. map ( |_| mpsc:: unbounded_channel :: < ( ResponseFrame , Option < TargetShard > ) > ( ) )
234
323
. unzip ( ) ;
@@ -240,71 +329,36 @@ async fn test_default_policy_is_tablet_aware() {
240
329
) ] ) ) ;
241
330
}
242
331
243
- // When the driver never received tablet info for any tablet in a given table,
244
- // then it will not be aware that the table is tablet-based and fall back
245
- // to token-ring routing for this table.
246
- // After it receives any tablet for this table:
247
- // - tablet-aware routing will be used for tablets that the driver has locally
248
- // - non-token-aware routing will be used for other tablets (which basically means
249
- // sending the requests to random nodes)
250
- // In the following code I want to make sure that the driver fetches info
251
- // about all the tablets in the table.
252
- // Obvious way to do this would be to, for each tablet, send some requests (here some == 100)
253
- // and expect that at least one will land on non-replica and return tablet feedback.
254
- // This mostly works, but there is a problem: initially driver has no
255
- // tablet information at all for this table so it will fall back to token-ring routing.
256
- // It is possible that token-ring replicas and tablet replicas are the same
257
- // for some tokens. If it happens for the first token that we use in this loop,
258
- // then no matter how many requests we send we are not going to receive any tablet feedback.
259
- // The solution is to iterate over tablets twice.
260
- //
261
- // First iteration guarantees that the driver will receive at least one tablet
262
- // for this table (it is statistically improbable for all tokens used here to have the same
263
- // set of replicas for tablets and token-ring). In practice it will receive all or almost all of the tablets.
264
- //
265
- // Second iteration will not use token-ring routing (because the driver has some tablets
266
- // for this table, so it is aware that the table is tablet based),
267
- // which means that for unknown tablets it will send requests to random nodes,
268
- // and definitely fetch the rest of the tablets.
269
- let mut total_tablets_with_feedback = 0 ;
270
- for values in value_lists. iter ( ) . chain ( value_lists. iter ( ) ) {
271
- info ! (
272
- "First loop, trying key {:?}, token: {}" ,
273
- values,
274
- prepared. calculate_token( & values) . unwrap( ) . unwrap( ) . value( )
275
- ) ;
276
- try_join_all (
277
- ( 0 ..100 ) . map ( |_| async { session. execute_unpaged ( & prepared, values) . await } ) ,
332
+ // Test attempt can fail because of tablet migrations.
333
+ // Let's try a few times if there are migrations.
334
+ let mut last_error = None ;
335
+ for _ in 0 ..5 {
336
+ let tablets = get_tablets ( & session, & ks, "t" ) . await ;
337
+ let value_per_tablet = calculate_key_per_tablet ( & tablets, & prepared) ;
338
+ match run_test_default_policy_is_tablet_aware_attempt (
339
+ & session,
340
+ & prepared,
341
+ & value_per_tablet,
342
+ & mut feedback_rxs,
278
343
)
279
344
. await
280
- . unwrap ( ) ;
281
- let feedbacks: usize = feedback_rxs. iter_mut ( ) . map ( count_tablet_feedbacks) . sum ( ) ;
282
- if feedbacks > 0 {
283
- total_tablets_with_feedback += 1 ;
345
+ {
346
+ Ok ( _) => return running_proxy, // Test succeeded
347
+ Err ( e) => {
348
+ let new_tablets = get_tablets ( & session, & ks, "t" ) . await ;
349
+ if tablets == new_tablets {
350
+ // We failed, but there was no migration.
351
+ panic ! ( "Test attempt failed despite no migration. Error: {e}" ) ;
352
+ }
353
+ last_error = Some ( e) ;
354
+ // There was a migration, let's try again
355
+ }
284
356
}
285
357
}
286
-
287
- assert_eq ! ( total_tablets_with_feedback, TABLET_COUNT ) ;
288
-
289
- // Now we must have info about all the tablets. It should not be
290
- // possible to receive any feedback if DefaultPolicy is properly
291
- // tablet-aware.
292
- for values in value_lists. iter ( ) {
293
- info ! (
294
- "Second loop, trying key {:?}, token: {}" ,
295
- values,
296
- prepared. calculate_token( & values) . unwrap( ) . unwrap( ) . value( )
297
- ) ;
298
- try_join_all (
299
- ( 0 ..100 ) . map ( |_| async { session. execute_unpaged ( & prepared, values) . await } ) ,
300
- )
301
- . await
302
- . unwrap ( ) ;
303
- let feedbacks: usize = feedback_rxs. iter_mut ( ) . map ( count_tablet_feedbacks) . sum ( ) ;
304
- assert_eq ! ( feedbacks, 0 ) ;
305
- }
306
-
307
- running_proxy
358
+ panic ! (
359
+ "There was a tablet migration during each attempt! Last error: {}" ,
360
+ last_error. unwrap( )
361
+ ) ;
308
362
} ,
309
363
)
310
364
. await ;
0 commit comments