66import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
77import com .datastax .oss .driver .api .core .cql .ResultSet ;
88import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
9+ import com .datastax .oss .driver .api .core .cql .Statement ;
910import com .datastax .oss .driver .api .core .metadata .KeyspaceTableNamePair ;
11+ import com .datastax .oss .driver .api .core .metadata .Node ;
1012import com .datastax .oss .driver .api .core .metadata .Tablet ;
1113import com .datastax .oss .driver .api .testinfra .CassandraSkip ;
1214import com .datastax .oss .driver .api .testinfra .ScyllaRequirement ;
1618import com .datastax .oss .driver .internal .core .protocol .TabletInfo ;
1719import java .nio .ByteBuffer ;
1820import java .time .Duration ;
21+ import java .util .ArrayList ;
22+ import java .util .HashMap ;
23+ import java .util .List ;
1924import java .util .Map ;
2025import java .util .Set ;
2126import java .util .concurrent .ConcurrentMap ;
2227import java .util .concurrent .ConcurrentSkipListSet ;
28+ import java .util .function .Function ;
29+ import java .util .function .Supplier ;
2330import org .junit .Assert ;
31+ import org .junit .BeforeClass ;
2432import org .junit .ClassRule ;
2533import org .junit .Test ;
2634import org .junit .rules .RuleChain ;
@@ -55,9 +63,9 @@ public class DefaultMetadataTabletMapIT {
5563 private static final int INITIAL_TABLETS = 32 ;
5664 private static final int QUERIES = 1600 ;
5765 private static final int REPLICATION_FACTOR = 2 ;
58- private static String KEYSPACE_NAME = "tabletsTest" ;
59- private static String TABLE_NAME = "tabletsTable" ;
60- private static String CREATE_KEYSPACE_QUERY =
66+ private static final String KEYSPACE_NAME = "tabletsTest" ;
67+ private static final String TABLE_NAME = "tabletsTable" ;
68+ private static final String CREATE_KEYSPACE_QUERY =
6169 "CREATE KEYSPACE IF NOT EXISTS "
6270 + KEYSPACE_NAME
6371 + " WITH replication = {'class': "
@@ -68,49 +76,227 @@ public class DefaultMetadataTabletMapIT {
6876 + "{'initial': "
6977 + INITIAL_TABLETS
7078 + "};" ;
71- private static String CREATE_TABLE_QUERY =
79+ private static final String CREATE_TABLE_QUERY =
7280 "CREATE TABLE IF NOT EXISTS "
7381 + KEYSPACE_NAME
7482 + "."
7583 + TABLE_NAME
76- + " (pk int, ck int, PRIMARY KEY(pk, ck));" ;
84+ + " (pk int, ck int, val int, PRIMARY KEY(pk, ck));" ;
7785
78- @ Test
79- public void should_receive_each_tablet_exactly_once () {
80- CqlSession session = SESSION_RULE .session ();
86+ private static final SimpleStatement STMT_INSERT =
87+ buildStatement ("INSERT INTO %s.%s (pk, ck) VALUES (?, ?);" );
88+
89+ private static final SimpleStatement STMT_INSERT_NO_KS =
90+ buildStatement ("INSERT INTO %s (pk, ck) VALUES (?, ?);" );
91+
92+ private static final SimpleStatement STMT_INSERT_CONCRETE =
93+ buildStatement ("INSERT INTO %s.%s (pk, ck) VALUES (1, 1);" );
94+
95+ private static final SimpleStatement STMT_INSERT_PK_CONCRETE =
96+ buildStatement ("INSERT INTO %s.%s (pk, ck) VALUES (1, ?);" );
97+
98+ private static final SimpleStatement STMT_INSERT_CK_CONCRETE =
99+ buildStatement ("INSERT INTO %s.%s (pk, ck) VALUES (?, 1);" );
100+
101+ private static final SimpleStatement STMT_INSERT_LWT_IF_NOT_EXISTS =
102+ buildStatement ("INSERT INTO %s.%s (pk, ck) VALUES (?, ?) IF NOT EXISTS;" );
103+
104+ private static final SimpleStatement STMT_SELECT =
105+ buildStatement ("SELECT pk,ck FROM %s.%s WHERE pk = ? AND ck = ?" );
106+
107+ private static final SimpleStatement STMT_SELECT_NO_KS =
108+ buildStatement ("SELECT pk, ck FROM %s WHERE pk = ? AND ck = ?" );
109+
110+ private static final SimpleStatement STMT_SELECT_CONCRETE =
111+ buildStatement ("SELECT pk,ck FROM %s.%s WHERE pk = 1 AND ck = 1" );
112+
113+ private static final SimpleStatement STMT_SELECT_PK_CONCRETE =
114+ buildStatement ("SELECT pk, ck FROM %s.%s WHERE pk = 1 AND ck = ?" );
115+
116+ private static final SimpleStatement STMT_SELECT_CK_CONCRETE =
117+ buildStatement ("SELECT pk, ck FROM %s.%s WHERE pk = ? AND ck = 1" );
118+
119+ private static final SimpleStatement STMT_UPDATE =
120+ buildStatement ("UPDATE %s.%s SET val = 1 WHERE pk = ? AND ck = ?" );
121+
122+ private static final SimpleStatement STMT_UPDATE_NO_KS =
123+ buildStatement ("UPDATE %s SET val = 1 WHERE pk = ? AND ck = ?" );
124+
125+ private static final SimpleStatement STMT_UPDATE_CONCRETE =
126+ buildStatement ("UPDATE %s.%s SET val = 1 WHERE pk = 1 AND ck = 1" );
127+
128+ private static final SimpleStatement STMT_UPDATE_PK_CONCRETE =
129+ buildStatement ("UPDATE %s.%s SET val = 1 WHERE pk = 1 AND ck = ?" );
130+
131+ private static final SimpleStatement STMT_UPDATE_CK_CONCRETE =
132+ buildStatement ("UPDATE %s.%s SET val = 1 WHERE pk = ? AND ck = 1" );
133+
134+ private static final SimpleStatement STMT_UPDATE_LWT_IF_EXISTS =
135+ buildStatement ("UPDATE %s.%s SET val = 1 WHERE pk = ? AND ck = ? IF EXISTS" );
136+
137+ private static final SimpleStatement STMT_UPDATE_LWT_IF_VAL =
138+ buildStatement ("UPDATE %s.%s SET val = 1 WHERE pk = ? AND ck = ? IF val = 2" );
139+
140+ private static final SimpleStatement STMT_DELETE =
141+ buildStatement ("DELETE FROM %s.%s WHERE pk = ? AND ck = ?" );
142+
143+ private static final SimpleStatement STMT_DELETE_NO_KS =
144+ buildStatement ("DELETE FROM %s WHERE pk = ? AND ck = ?" );
145+
146+ private static final SimpleStatement STMT_DELETE_CONCRETE =
147+ buildStatement ("DELETE FROM %s.%s WHERE pk = 1 AND ck = 1" );
148+
149+ private static final SimpleStatement STMT_DELETE_PK_CONCRETE =
150+ buildStatement ("DELETE FROM %s.%s WHERE pk = 1 AND ck = ?" );
81151
152+ private static final SimpleStatement STMT_DELETE_CK_CONCRETE =
153+ buildStatement ("DELETE FROM %s.%s WHERE pk = ? AND ck = 1" );
154+
155+ private static final SimpleStatement STMT_DELETE_IF_EXISTS =
156+ buildStatement ("DELETE FROM %s.%s WHERE pk = ? AND ck = ? IF EXISTS" );
157+
158+ @ BeforeClass
159+ public static void setup () {
160+ CqlSession session = SESSION_RULE .session ();
82161 session .execute (CREATE_KEYSPACE_QUERY );
83162 session .execute (CREATE_TABLE_QUERY );
163+ }
84164
85- for (int i = 1 ; i <= QUERIES ; i ++) {
86- session .execute (
87- "INSERT INTO "
88- + KEYSPACE_NAME
89- + "."
90- + TABLE_NAME
91- + " (pk,ck) VALUES ("
92- + i
93- + ","
94- + i
95- + ");" );
165+ @ Test
166+ public void every_statement_should_deliver_tablet_info () {
167+ Map <String , Supplier <CqlSession >> sessions = new HashMap <>();
168+ sessions .put (
169+ "REGULAR" ,
170+ () -> CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ());
171+ sessions .put (
172+ "WITH_KEYSPACE" ,
173+ () ->
174+ CqlSession .builder ()
175+ .addContactEndPoints (CCM_RULE .getContactPoints ())
176+ .withKeyspace (KEYSPACE_NAME )
177+ .build ());
178+ sessions .put (
179+ "USE_KEYSPACE" ,
180+ () -> {
181+ CqlSession s =
182+ CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ();
183+ s .execute ("USE " + KEYSPACE_NAME );
184+ return s ;
185+ });
186+
187+ Map <String , Function <CqlSession , Statement >> statements = new HashMap <>();
188+ statements .put ("SELECT_CONCRETE" , s -> STMT_SELECT_CONCRETE );
189+ statements .put ("SELECT_PREPARED" , s -> s .prepare (STMT_SELECT ).bind (2 , 2 ));
190+ statements .put ("SELECT_NO_KS_PREPARED" , s -> s .prepare (STMT_SELECT_NO_KS ).bind (2 , 2 ));
191+ statements .put ("SELECT_CONCRETE_PREPARED" , s -> s .prepare (STMT_SELECT_CONCRETE ).bind ());
192+ statements .put ("SELECT_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_SELECT_PK_CONCRETE ).bind (2 ));
193+ statements .put ("SELECT_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_SELECT_CK_CONCRETE ).bind (2 ));
194+ statements .put ("INSERT_CONCRETE" , s -> STMT_INSERT_CONCRETE );
195+ statements .put ("INSERT_PREPARED" , s -> s .prepare (STMT_INSERT ).bind (2 , 2 ));
196+ statements .put ("INSERT_NO_KS_PREPARED" , s -> s .prepare (STMT_INSERT_NO_KS ).bind (2 , 2 ));
197+ statements .put ("INSERT_CONCRETE_PREPARED" , s -> s .prepare (STMT_INSERT_CONCRETE ).bind ());
198+ statements .put ("INSERT_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_INSERT_PK_CONCRETE ).bind (2 ));
199+ statements .put ("INSERT_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_INSERT_CK_CONCRETE ).bind (2 ));
200+ statements .put (
201+ "INSERT_LWT_IF_NOT_EXISTS" , s -> s .prepare (STMT_INSERT_LWT_IF_NOT_EXISTS ).bind (2 , 2 ));
202+ statements .put ("UPDATE_CONCRETE" , s -> STMT_UPDATE_CONCRETE );
203+ statements .put ("UPDATE_PREPARED" , s -> s .prepare (STMT_UPDATE ).bind (2 , 2 ));
204+ statements .put ("UPDATE_NO_KS_PREPARED" , s -> s .prepare (STMT_UPDATE_NO_KS ).bind (2 , 2 ));
205+ statements .put ("UPDATE_CONCRETE_PREPARED" , s -> s .prepare (STMT_UPDATE_CONCRETE ).bind ());
206+ statements .put ("UPDATE_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_UPDATE_PK_CONCRETE ).bind (2 ));
207+ statements .put ("UPDATE_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_UPDATE_CK_CONCRETE ).bind (2 ));
208+ statements .put ("UPDATE_LWT_IF_EXISTS" , s -> s .prepare (STMT_UPDATE_LWT_IF_EXISTS ).bind (2 , 2 ));
209+ statements .put ("STMT_UPDATE_LWT_IF_VAL" , s -> s .prepare (STMT_UPDATE_LWT_IF_VAL ).bind (2 , 2 ));
210+ statements .put ("DELETE_CONCRETE" , s -> STMT_DELETE_CONCRETE );
211+ statements .put ("DELETE_PREPARED" , s -> s .prepare (STMT_DELETE ).bind (2 , 2 ));
212+ statements .put ("DELETE_NO_KS_PREPARED" , s -> s .prepare (STMT_DELETE_NO_KS ).bind (2 , 2 ));
213+ statements .put ("DELETE_CONCRETE_PREPARED" , s -> s .prepare (STMT_DELETE_CONCRETE ).bind ());
214+ statements .put ("DELETE_PK_CONCRETE_PREPARED" , s -> s .prepare (STMT_DELETE_PK_CONCRETE ).bind (2 ));
215+ statements .put ("DELETE_CK_CONCRETE_PREPARED" , s -> s .prepare (STMT_DELETE_CK_CONCRETE ).bind (2 ));
216+ statements .put ("DELETE_LWT_IF_EXISTS" , s -> s .prepare (STMT_DELETE_IF_EXISTS ).bind (2 , 2 ));
217+
218+ List <String > testErrors = new ArrayList <>();
219+ for (Map .Entry <String , Supplier <CqlSession >> sessionEntry : sessions .entrySet ()) {
220+ for (Map .Entry <String , Function <CqlSession , Statement >> stmtEntry : statements .entrySet ()) {
221+ if (stmtEntry .getKey ().contains ("CONCRETE" )
222+ && !stmtEntry .getKey ().contains ("CK_CONCRETE" )) {
223+ // Scylla does not return tablet info for queries with PK built into query
224+ continue ;
225+ }
226+ if (stmtEntry .getKey ().contains ("LWT" )) {
227+ // LWT is not yet supported by scylla on tables with tablets
228+ continue ;
229+ }
230+ if (sessionEntry .getKey ().equals ("REGULAR" ) && stmtEntry .getKey ().contains ("NO_KS" )) {
231+ // Preparation of the statements without KS will fail on the session with no ks specified
232+ continue ;
233+ }
234+ CqlSession session = sessionEntry .getValue ().get ();
235+ // Empty out tablets information
236+ session .getMetadata ().getTabletMap ().removeByKeyspace (CqlIdentifier .fromCql (KEYSPACE_NAME ));
237+ Statement stmt ;
238+ try {
239+ stmt = stmtEntry .getValue ().apply (session );
240+ } catch (Exception e ) {
241+ RuntimeException ex =
242+ new RuntimeException (
243+ String .format (
244+ "Failed to build statement %s on session %s" ,
245+ stmtEntry .getKey (), sessionEntry .getKey ()));
246+ ex .addSuppressed (e );
247+ throw ex ;
248+ }
249+ try {
250+ if (!executeOnAllHostsAndReturnIfResultHasTabletsInfo (session , stmt )) {
251+ testErrors .add (
252+ String .format (
253+ "Statement %s on session %s got no tablet info" ,
254+ stmtEntry .getKey (), sessionEntry .getKey ()));
255+ continue ;
256+ }
257+ } catch (Exception e ) {
258+ testErrors .add (
259+ String .format (
260+ "Failed to execute statement %s on session %s: %s" ,
261+ stmtEntry .getKey (), sessionEntry .getKey (), e ));
262+ continue ;
263+ }
264+ if (!waitSessionLearnedTabletInfo (session )) {
265+ testErrors .add (
266+ String .format (
267+ "Statement %s on session %s did not trigger session tablets update" ,
268+ stmtEntry .getKey (), sessionEntry .getKey ()));
269+ }
270+ }
96271 }
97272
98- PreparedStatement preparedStatement =
99- session . prepare (
100- SimpleStatement . builder (
101- "select pk,ck from "
102- + KEYSPACE_NAME
103- + "."
104- + TABLE_NAME
105- + " WHERE pk = ? AND ck = ?" )
106- . setTracing ( true )
107- . build ());
108- // preparedStatement.enableTracing ();
273+ if (! testErrors . isEmpty ()) {
274+ throw new AssertionError (
275+ String . format (
276+ "Found queries that got no tablet info: \n %s" , String . join ( " \n " , testErrors )));
277+ }
278+ }
279+
280+ @ Test
281+ public void should_receive_each_tablet_exactly_once () {
282+ CqlSession session =
283+ CqlSession . builder (). addContactEndPoints ( CCM_RULE . getContactPoints ()). build ();
109284 int counter = 0 ;
285+ PreparedStatement preparedStatement = session .prepare (STMT_INSERT );
110286 for (int i = 1 ; i <= QUERIES ; i ++) {
111- ResultSet rs = session .execute (preparedStatement .bind (i , i ).setTracing (true ));
112- Map <String , ByteBuffer > payload = rs .getExecutionInfo ().getIncomingPayload ();
113- if (payload .containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY )) {
287+ if (executeAndReturnIfResultHasTabletsInfo (session , preparedStatement .bind (i , i ))) {
288+ counter ++;
289+ }
290+ }
291+ Assert .assertEquals (INITIAL_TABLETS , counter );
292+ assertSessionTabletMapIsFilled (session );
293+ session .close ();
294+
295+ session = CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ();
296+ counter = 0 ;
297+ preparedStatement = session .prepare (STMT_SELECT );
298+ for (int i = 1 ; i <= QUERIES ; i ++) {
299+ if (executeAndReturnIfResultHasTabletsInfo (session , preparedStatement .bind (i , i ))) {
114300 counter ++;
115301 }
116302 }
@@ -119,7 +305,57 @@ public void should_receive_each_tablet_exactly_once() {
119305
120306 // With enough queries we should hit a wrong node for each tablet exactly once.
121307 Assert .assertEquals (INITIAL_TABLETS , counter );
308+ assertSessionTabletMapIsFilled (session );
122309
310+ // All tablet information should be available by now (unless for some reason cluster did sth on
311+ // its own)
312+ // We should not receive any tablet payloads now, since they are sent only on mismatch.
313+ for (int i = 1 ; i <= QUERIES ; i ++) {
314+
315+ ResultSet rs = session .execute (preparedStatement .bind (i , i ));
316+ Map <String , ByteBuffer > payload = rs .getExecutionInfo ().getIncomingPayload ();
317+
318+ if (payload .containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY )) {
319+ throw new RuntimeException (
320+ "Received non empty payload with tablets routing information: " + payload );
321+ }
322+ }
323+ }
324+
325+ private static boolean waitSessionLearnedTabletInfo (CqlSession session ) {
326+ if (isSessionLearnedTabletInfo (session )) {
327+ return true ;
328+ }
329+ // Wait till tablet update, which is async, is completed
330+ try {
331+ Thread .sleep (200 );
332+ } catch (InterruptedException e ) {
333+ Thread .currentThread ().interrupt ();
334+ }
335+ return isSessionLearnedTabletInfo (session );
336+ }
337+
338+ private static boolean isSessionLearnedTabletInfo (CqlSession session ) {
339+ ConcurrentMap <KeyspaceTableNamePair , ConcurrentSkipListSet <Tablet >> tabletMapping =
340+ session .getMetadata ().getTabletMap ().getMapping ();
341+ KeyspaceTableNamePair ktPair =
342+ new KeyspaceTableNamePair (
343+ CqlIdentifier .fromCql (KEYSPACE_NAME ), CqlIdentifier .fromCql (TABLE_NAME ));
344+
345+ Set <Tablet > tablets = tabletMapping .get (ktPair );
346+ if (tablets == null || tablets .isEmpty ()) {
347+ return false ;
348+ }
349+
350+ for (Tablet tab : tablets ) {
351+ if (tab .getReplicaNodes ().size () >= REPLICATION_FACTOR ) {
352+ return true ;
353+ }
354+ }
355+ return false ;
356+ }
357+
358+ private static void assertSessionTabletMapIsFilled (CqlSession session ) {
123359 ConcurrentMap <KeyspaceTableNamePair , ConcurrentSkipListSet <Tablet >> tabletMapping =
124360 session .getMetadata ().getTabletMap ().getMapping ();
125361 KeyspaceTableNamePair ktPair =
@@ -133,19 +369,31 @@ public void should_receive_each_tablet_exactly_once() {
133369 for (Tablet tab : tablets ) {
134370 Assert .assertEquals (REPLICATION_FACTOR , tab .getReplicaNodes ().size ());
135371 }
372+ }
136373
137- // All tablet information should be available by now (unless for some reason cluster did sth on
138- // its own)
139- // We should not receive any tablet payloads now, since they are sent only on mismatch.
140- for (int i = 1 ; i <= QUERIES ; i ++) {
374+ private static boolean executeOnAllHostsAndReturnIfResultHasTabletsInfo (
375+ CqlSession session , Statement stmt ) {
376+ session .refreshSchema ();
377+ for (Node node : session .getMetadata ().getNodes ().values ()) {
378+ if (executeAndReturnIfResultHasTabletsInfo (session , stmt .setNode (node ))) {
379+ return true ;
380+ }
381+ }
382+ return false ;
383+ }
141384
142- ResultSet rs = session .execute (preparedStatement .bind (i , i ));
143- Map <String , ByteBuffer > payload = rs .getExecutionInfo ().getIncomingPayload ();
385+ private static boolean executeAndReturnIfResultHasTabletsInfo (
386+ CqlSession session , Statement statement ) {
387+ ResultSet rs = session .execute (statement );
388+ return rs .getExecutionInfo ()
389+ .getIncomingPayload ()
390+ .containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY );
391+ }
144392
145- if (payload .containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY )) {
146- throw new RuntimeException (
147- "Received non empty payload with tablets routing information: " + payload );
148- }
393+ private static SimpleStatement buildStatement (String statement ) {
394+ if (statement .contains ("%s.%s" )) {
395+ return SimpleStatement .builder (String .format (statement , KEYSPACE_NAME , TABLE_NAME )).build ();
149396 }
397+ return SimpleStatement .builder (String .format (statement , TABLE_NAME )).build ();
150398 }
151399}
0 commit comments