diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index c1eee844647..80eece271a8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -480,17 +480,27 @@ private void setFinalResult( totalLatencyNanos, TimeUnit.NANOSECONDS); } - if (resultSet.getColumnDefinitions().size() > 0 - && resultSet - .getExecutionInfo() - .getIncomingPayload() - .containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { - context - .getMetadataManager() - .addTabletFromPayload( - resultSet.getColumnDefinitions().get(0).getKeyspace(), - resultSet.getColumnDefinitions().get(0).getTable(), - resultSet.getExecutionInfo().getIncomingPayload()); + if (resultSet + .getExecutionInfo() + .getIncomingPayload() + .containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { + CqlIdentifier keyspace = resultSet.getExecutionInfo().getRequest().getRoutingKeyspace(); + if (keyspace == null) { + keyspace = resultSet.getExecutionInfo().getRequest().getKeyspace(); + if (keyspace == null && resultSet.getColumnDefinitions().size() > 0) { + keyspace = resultSet.getColumnDefinitions().get(0).getKeyspace(); + } + } + CqlIdentifier table = resultSet.getExecutionInfo().getRequest().getRoutingTable(); + if (table == null && resultSet.getColumnDefinitions().size() > 0) { + table = resultSet.getColumnDefinitions().get(0).getTable(); + } + if (keyspace != null && table != null) { + context + .getMetadataManager() + .addTabletFromPayload( + keyspace, table, resultSet.getExecutionInfo().getIncomingPayload()); + } } } // log the warnings if they have NOT been disabled diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java index 1612caef687..bf772914f4b 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java @@ -6,7 +6,9 @@ import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.metadata.KeyspaceTableNamePair; +import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.Tablet; import com.datastax.oss.driver.api.testinfra.CassandraSkip; import com.datastax.oss.driver.api.testinfra.ScyllaRequirement; @@ -16,11 +18,17 @@ import com.datastax.oss.driver.internal.core.protocol.TabletInfo; import java.nio.ByteBuffer; import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Function; +import java.util.function.Supplier; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.RuleChain; @@ -55,9 +63,9 @@ public class DefaultMetadataTabletMapIT { private static final int INITIAL_TABLETS = 32; private static final int QUERIES = 1600; private static final int REPLICATION_FACTOR = 2; - private static String KEYSPACE_NAME = "tabletsTest"; - private static String TABLE_NAME = "tabletsTable"; - private static String CREATE_KEYSPACE_QUERY = + private static final String KEYSPACE_NAME = "tabletsTest"; + private static final String TABLE_NAME = "tabletsTable"; + private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE_NAME + " WITH replication = {'class': " @@ -68,49 +76,233 @@ public class DefaultMetadataTabletMapIT { + "{'initial': " + INITIAL_TABLETS + "};"; - private static String CREATE_TABLE_QUERY = + private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS " + KEYSPACE_NAME + "." + TABLE_NAME - + " (pk int, ck int, PRIMARY KEY(pk, ck));"; + + " (pk int, ck int, val int, PRIMARY KEY(pk, ck));"; - @Test - public void should_receive_each_tablet_exactly_once() { - CqlSession session = SESSION_RULE.session(); + private static final SimpleStatement STMT_INSERT = + buildStatement("INSERT INTO %s.%s (pk, ck) VALUES (?, ?);"); + + private static final SimpleStatement STMT_INSERT_NO_KS = + buildStatement("INSERT INTO %s (pk, ck) VALUES (?, ?);"); + + private static final SimpleStatement STMT_INSERT_CONCRETE = + buildStatement("INSERT INTO %s.%s (pk, ck) VALUES (1, 1);"); + + private static final SimpleStatement STMT_INSERT_PK_CONCRETE = + buildStatement("INSERT INTO %s.%s (pk, ck) VALUES (1, ?);"); + + private static final SimpleStatement STMT_INSERT_CK_CONCRETE = + buildStatement("INSERT INTO %s.%s (pk, ck) VALUES (?, 1);"); + + private static final SimpleStatement STMT_INSERT_LWT_IF_NOT_EXISTS = + buildStatement("INSERT INTO %s.%s (pk, ck) VALUES (?, ?) IF NOT EXISTS;"); + + private static final SimpleStatement STMT_SELECT = + buildStatement("SELECT pk,ck FROM %s.%s WHERE pk = ? AND ck = ?"); + + private static final SimpleStatement STMT_SELECT_NO_KS = + buildStatement("SELECT pk, ck FROM %s WHERE pk = ? AND ck = ?"); + + private static final SimpleStatement STMT_SELECT_CONCRETE = + buildStatement("SELECT pk,ck FROM %s.%s WHERE pk = 1 AND ck = 1"); + + private static final SimpleStatement STMT_SELECT_PK_CONCRETE = + buildStatement("SELECT pk, ck FROM %s.%s WHERE pk = 1 AND ck = ?"); + + private static final SimpleStatement STMT_SELECT_CK_CONCRETE = + buildStatement("SELECT pk, ck FROM %s.%s WHERE pk = ? AND ck = 1"); + + private static final SimpleStatement STMT_UPDATE = + buildStatement("UPDATE %s.%s SET val = 1 WHERE pk = ? AND ck = ?"); + + private static final SimpleStatement STMT_UPDATE_NO_KS = + buildStatement("UPDATE %s SET val = 1 WHERE pk = ? AND ck = ?"); + + private static final SimpleStatement STMT_UPDATE_CONCRETE = + buildStatement("UPDATE %s.%s SET val = 1 WHERE pk = 1 AND ck = 1"); + + private static final SimpleStatement STMT_UPDATE_PK_CONCRETE = + buildStatement("UPDATE %s.%s SET val = 1 WHERE pk = 1 AND ck = ?"); + + private static final SimpleStatement STMT_UPDATE_CK_CONCRETE = + buildStatement("UPDATE %s.%s SET val = 1 WHERE pk = ? AND ck = 1"); + + private static final SimpleStatement STMT_UPDATE_LWT_IF_EXISTS = + buildStatement("UPDATE %s.%s SET val = 1 WHERE pk = ? AND ck = ? IF EXISTS"); + + private static final SimpleStatement STMT_UPDATE_LWT_IF_VAL = + buildStatement("UPDATE %s.%s SET val = 1 WHERE pk = ? AND ck = ? IF val = 2"); + + private static final SimpleStatement STMT_DELETE = + buildStatement("DELETE FROM %s.%s WHERE pk = ? AND ck = ?"); + + private static final SimpleStatement STMT_DELETE_NO_KS = + buildStatement("DELETE FROM %s WHERE pk = ? AND ck = ?"); + + private static final SimpleStatement STMT_DELETE_CONCRETE = + buildStatement("DELETE FROM %s.%s WHERE pk = 1 AND ck = 1"); + + private static final SimpleStatement STMT_DELETE_PK_CONCRETE = + buildStatement("DELETE FROM %s.%s WHERE pk = 1 AND ck = ?"); + + private static final SimpleStatement STMT_DELETE_CK_CONCRETE = + buildStatement("DELETE FROM %s.%s WHERE pk = ? AND ck = 1"); + private static final SimpleStatement STMT_DELETE_IF_EXISTS = + buildStatement("DELETE FROM %s.%s WHERE pk = ? AND ck = ? IF EXISTS"); + + @BeforeClass + public static void setup() { + CqlSession session = SESSION_RULE.session(); session.execute(CREATE_KEYSPACE_QUERY); session.execute(CREATE_TABLE_QUERY); + } - for (int i = 1; i <= QUERIES; i++) { - session.execute( - "INSERT INTO " - + KEYSPACE_NAME - + "." - + TABLE_NAME - + " (pk,ck) VALUES (" - + i - + "," - + i - + ");"); + @Test + public void every_statement_should_deliver_tablet_info() { + Map> sessions = new HashMap<>(); + sessions.put( + "REGULAR", + () -> CqlSession.builder().addContactEndPoints(CCM_RULE.getContactPoints()).build()); + sessions.put( + "WITH_KEYSPACE", + () -> + CqlSession.builder() + .addContactEndPoints(CCM_RULE.getContactPoints()) + .withKeyspace(KEYSPACE_NAME) + .build()); + sessions.put( + "USE_KEYSPACE", + () -> { + CqlSession s = + CqlSession.builder().addContactEndPoints(CCM_RULE.getContactPoints()).build(); + s.execute("USE " + KEYSPACE_NAME); + return s; + }); + + Map> statements = new HashMap<>(); + statements.put("SELECT_CONCRETE", s -> STMT_SELECT_CONCRETE); + statements.put("SELECT_PREPARED", s -> s.prepare(STMT_SELECT).bind(2, 2)); + statements.put("SELECT_NO_KS_PREPARED", s -> s.prepare(STMT_SELECT_NO_KS).bind(2, 2)); + statements.put("SELECT_CONCRETE_PREPARED", s -> s.prepare(STMT_SELECT_CONCRETE).bind()); + statements.put("SELECT_PK_CONCRETE_PREPARED", s -> s.prepare(STMT_SELECT_PK_CONCRETE).bind(2)); + statements.put("SELECT_CK_CONCRETE_PREPARED", s -> s.prepare(STMT_SELECT_CK_CONCRETE).bind(2)); + statements.put("INSERT_CONCRETE", s -> STMT_INSERT_CONCRETE); + statements.put("INSERT_PREPARED", s -> s.prepare(STMT_INSERT).bind(2, 2)); + statements.put("INSERT_NO_KS_PREPARED", s -> s.prepare(STMT_INSERT_NO_KS).bind(2, 2)); + statements.put("INSERT_CONCRETE_PREPARED", s -> s.prepare(STMT_INSERT_CONCRETE).bind()); + statements.put("INSERT_PK_CONCRETE_PREPARED", s -> s.prepare(STMT_INSERT_PK_CONCRETE).bind(2)); + statements.put("INSERT_CK_CONCRETE_PREPARED", s -> s.prepare(STMT_INSERT_CK_CONCRETE).bind(2)); + statements.put( + "INSERT_LWT_IF_NOT_EXISTS", s -> s.prepare(STMT_INSERT_LWT_IF_NOT_EXISTS).bind(2, 2)); + statements.put("UPDATE_CONCRETE", s -> STMT_UPDATE_CONCRETE); + statements.put("UPDATE_PREPARED", s -> s.prepare(STMT_UPDATE).bind(2, 2)); + statements.put("UPDATE_NO_KS_PREPARED", s -> s.prepare(STMT_UPDATE_NO_KS).bind(2, 2)); + statements.put("UPDATE_CONCRETE_PREPARED", s -> s.prepare(STMT_UPDATE_CONCRETE).bind()); + statements.put("UPDATE_PK_CONCRETE_PREPARED", s -> s.prepare(STMT_UPDATE_PK_CONCRETE).bind(2)); + statements.put("UPDATE_CK_CONCRETE_PREPARED", s -> s.prepare(STMT_UPDATE_CK_CONCRETE).bind(2)); + statements.put("UPDATE_LWT_IF_EXISTS", s -> s.prepare(STMT_UPDATE_LWT_IF_EXISTS).bind(2, 2)); + statements.put("STMT_UPDATE_LWT_IF_VAL", s -> s.prepare(STMT_UPDATE_LWT_IF_VAL).bind(2, 2)); + statements.put("DELETE_CONCRETE", s -> STMT_DELETE_CONCRETE); + statements.put("DELETE_PREPARED", s -> s.prepare(STMT_DELETE).bind(2, 2)); + statements.put("DELETE_NO_KS_PREPARED", s -> s.prepare(STMT_DELETE_NO_KS).bind(2, 2)); + statements.put("DELETE_CONCRETE_PREPARED", s -> s.prepare(STMT_DELETE_CONCRETE).bind()); + statements.put("DELETE_PK_CONCRETE_PREPARED", s -> s.prepare(STMT_DELETE_PK_CONCRETE).bind(2)); + statements.put("DELETE_CK_CONCRETE_PREPARED", s -> s.prepare(STMT_DELETE_CK_CONCRETE).bind(2)); + statements.put("DELETE_LWT_IF_EXISTS", s -> s.prepare(STMT_DELETE_IF_EXISTS).bind(2, 2)); + + List testErrors = new ArrayList<>(); + for (Map.Entry> sessionEntry : sessions.entrySet()) { + for (Map.Entry> stmtEntry : statements.entrySet()) { + if (stmtEntry.getKey().contains("CONCRETE") + && !stmtEntry.getKey().contains("CK_CONCRETE")) { + // Scylla does not return tablet info for queries with PK built into query + continue; + } + if (stmtEntry.getKey().contains("LWT")) { + // LWT is not yet supported by scylla on tables with tablets + continue; + } + if (sessionEntry.getKey().equals("REGULAR") && stmtEntry.getKey().contains("NO_KS")) { + // Preparation of the statements without KS will fail on the session with no ks specified + continue; + } + CqlSession session = sessionEntry.getValue().get(); + // Empty out tablets information + if (session.getMetadata().getTabletMap().isPresent()) { + session + .getMetadata() + .getTabletMap() + .get() + .removeByKeyspace(CqlIdentifier.fromCql(KEYSPACE_NAME)); + } + Statement stmt; + try { + stmt = stmtEntry.getValue().apply(session); + } catch (Exception e) { + RuntimeException ex = + new RuntimeException( + String.format( + "Failed to build statement %s on session %s", + stmtEntry.getKey(), sessionEntry.getKey())); + ex.addSuppressed(e); + throw ex; + } + try { + if (!executeOnAllHostsAndReturnIfResultHasTabletsInfo(session, stmt)) { + testErrors.add( + String.format( + "Statement %s on session %s got no tablet info", + stmtEntry.getKey(), sessionEntry.getKey())); + continue; + } + } catch (Exception e) { + testErrors.add( + String.format( + "Failed to execute statement %s on session %s: %s", + stmtEntry.getKey(), sessionEntry.getKey(), e)); + continue; + } + if (!waitSessionLearnedTabletInfo(session)) { + testErrors.add( + String.format( + "Statement %s on session %s did not trigger session tablets update", + stmtEntry.getKey(), sessionEntry.getKey())); + } + } } - PreparedStatement preparedStatement = - session.prepare( - SimpleStatement.builder( - "select pk,ck from " - + KEYSPACE_NAME - + "." - + TABLE_NAME - + " WHERE pk = ? AND ck = ?") - .setTracing(true) - .build()); - // preparedStatement.enableTracing(); + if (!testErrors.isEmpty()) { + throw new AssertionError( + String.format( + "Found queries that got no tablet info: \n%s", String.join("\n", testErrors))); + } + } + + @Test + public void should_receive_each_tablet_exactly_once() { + CqlSession session = + CqlSession.builder().addContactEndPoints(CCM_RULE.getContactPoints()).build(); int counter = 0; + PreparedStatement preparedStatement = session.prepare(STMT_INSERT); for (int i = 1; i <= QUERIES; i++) { - ResultSet rs = session.execute(preparedStatement.bind(i, i).setTracing(true)); - Map payload = rs.getExecutionInfo().getIncomingPayload(); - if (payload.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { + if (executeAndReturnIfResultHasTabletsInfo(session, preparedStatement.bind(i, i))) { + counter++; + } + } + Assert.assertEquals(INITIAL_TABLETS, counter); + assertSessionTabletMapIsFilled(session); + session.close(); + + session = CqlSession.builder().addContactEndPoints(CCM_RULE.getContactPoints()).build(); + counter = 0; + preparedStatement = session.prepare(STMT_SELECT); + for (int i = 1; i <= QUERIES; i++) { + if (executeAndReturnIfResultHasTabletsInfo(session, preparedStatement.bind(i, i))) { counter++; } } @@ -119,7 +311,60 @@ public void should_receive_each_tablet_exactly_once() { // With enough queries we should hit a wrong node for each tablet exactly once. Assert.assertEquals(INITIAL_TABLETS, counter); + assertSessionTabletMapIsFilled(session); + // All tablet information should be available by now (unless for some reason cluster did sth on + // its own) + // We should not receive any tablet payloads now, since they are sent only on mismatch. + for (int i = 1; i <= QUERIES; i++) { + + ResultSet rs = session.execute(preparedStatement.bind(i, i)); + Map payload = rs.getExecutionInfo().getIncomingPayload(); + + if (payload.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { + throw new RuntimeException( + "Received non empty payload with tablets routing information: " + payload); + } + } + } + + private static boolean waitSessionLearnedTabletInfo(CqlSession session) { + if (isSessionLearnedTabletInfo(session)) { + return true; + } + // Wait till tablet update, which is async, is completed + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return isSessionLearnedTabletInfo(session); + } + + private static boolean isSessionLearnedTabletInfo(CqlSession session) { + if (!session.getMetadata().getTabletMap().isPresent()) { + return false; + } + ConcurrentMap> tabletMapping = + session.getMetadata().getTabletMap().get().getMapping(); + KeyspaceTableNamePair ktPair = + new KeyspaceTableNamePair( + CqlIdentifier.fromCql(KEYSPACE_NAME), CqlIdentifier.fromCql(TABLE_NAME)); + + Set tablets = tabletMapping.get(ktPair); + if (tablets == null || tablets.isEmpty()) { + return false; + } + + for (Tablet tab : tablets) { + if (tab.getReplicaNodes().size() >= REPLICATION_FACTOR) { + return true; + } + } + return false; + } + + private static void assertSessionTabletMapIsFilled(CqlSession session) { Assert.assertTrue(session.getMetadata().getTabletMap().isPresent()); ConcurrentMap> tabletMapping = session.getMetadata().getTabletMap().get().getMapping(); @@ -134,19 +379,31 @@ public void should_receive_each_tablet_exactly_once() { for (Tablet tab : tablets) { Assert.assertEquals(REPLICATION_FACTOR, tab.getReplicaNodes().size()); } + } - // All tablet information should be available by now (unless for some reason cluster did sth on - // its own) - // We should not receive any tablet payloads now, since they are sent only on mismatch. - for (int i = 1; i <= QUERIES; i++) { + private static boolean executeOnAllHostsAndReturnIfResultHasTabletsInfo( + CqlSession session, Statement stmt) { + session.refreshSchema(); + for (Node node : session.getMetadata().getNodes().values()) { + if (executeAndReturnIfResultHasTabletsInfo(session, stmt.setNode(node))) { + return true; + } + } + return false; + } - ResultSet rs = session.execute(preparedStatement.bind(i, i)); - Map payload = rs.getExecutionInfo().getIncomingPayload(); + private static boolean executeAndReturnIfResultHasTabletsInfo( + CqlSession session, Statement statement) { + ResultSet rs = session.execute(statement); + return rs.getExecutionInfo() + .getIncomingPayload() + .containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY); + } - if (payload.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { - throw new RuntimeException( - "Received non empty payload with tablets routing information: " + payload); - } + private static SimpleStatement buildStatement(String statement) { + if (statement.contains("%s.%s")) { + return SimpleStatement.builder(String.format(statement, KEYSPACE_NAME, TABLE_NAME)).build(); } + return SimpleStatement.builder(String.format(statement, TABLE_NAME)).build(); } }