diff --git a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java index 287a55cbdb3..91c408f3dba 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java @@ -24,14 +24,19 @@ import static com.datastax.driver.core.ProtocolVersion.V4; import com.datastax.driver.core.policies.RetryPolicy; +import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DefaultPreparedStatement implements PreparedStatement { - + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPreparedStatement.class); private static final String SCYLLA_CDC_LOG_SUFFIX = "_scylla_cdc_log"; + private static final Splitter SPACE_SPLITTER = Splitter.onPattern("\\s+"); + private static final Splitter COMMA_SPLITTER = Splitter.onPattern(","); final PreparedId preparedId; @@ -50,6 +55,7 @@ public class DefaultPreparedStatement implements PreparedStatement { volatile RetryPolicy retryPolicy; volatile ImmutableMap outgoingPayload; volatile Boolean idempotent; + volatile boolean skipMetadata; private DefaultPreparedStatement( PreparedId id, @@ -66,6 +72,7 @@ private DefaultPreparedStatement( this.cluster = cluster; this.isLWT = isLWT; this.partitioner = partitioner; + this.skipMetadata = this.calculateSkipMetadata(); } static DefaultPreparedStatement fromMessage( @@ -172,6 +179,62 @@ private static Token.Factory partitioner(ColumnDefinitions defs, Cluster cluster return null; } + private boolean calculateSkipMetadata() { + if (cluster.manager.protocolVersion() == ProtocolVersion.V1 + || preparedId.resultSetMetadata.variables == null) { + // CQL1 does not support it. + // If no rows returned there is no reason to send this flag, consequently, no metadata. + return false; + } + + if (preparedId.resultSetMetadata.id != null + && preparedId.resultSetMetadata.id.bytes.length > 0) { + // It is CQL 5 or higher. + // Prepared statement invalidation works perfectly no need to disable skip metadata + return true; + } + + switch (cluster.getConfiguration().getQueryOptions().getSkipCQL4MetadataResolveMethod()) { + case ENABLED: + return true; + case DISABLED: + return false; + } + + if (isWildcardSelect(query)) { + LOGGER.warn( + "Prepared statement {} is a wildcard select, which can cause prepared statement invalidation issues when executed on CQL4. " + + "These issues may lead to broken deserialization or data corruption. " + + "To mitigate this, the driver ensures that the server returns metadata with each query for such statements, " + + "though this negatively impacts performance. " + + "To avoid this, consider using a targeted select instead. " + + "Alternatively, you can enable the skip-cql4-metadata-resolve-method option in the execution profile by setting it to `always-on`, " + + "allowing the driver to ignore this issue and proceed regardless, risking broken deserialization or data corruption.", + query); + return false; + } + // Disable skipping metadata if results contains udt and + for (ColumnDefinitions.Definition columnDefinition : preparedId.resultSetMetadata.variables) { + if (containsUDT(columnDefinition.getType())) { + LOGGER.warn( + "Prepared statement {} contains UDT in result, which can cause prepared statement invalidation issues when executed on CQL4. " + + "These issues may lead to broken deserialization or data corruption. " + + "To mitigate this, the driver ensures that the server returns metadata with each query for such statements, " + + "though this negatively impacts performance. " + + "To avoid this, consider using a targeted select instead. " + + "Alternatively, you can enable the skip-cql4-metadata-resolve-method option in the execution profile by setting it to `always-on`, " + + "allowing the driver to ignore this issue and proceed regardless, risking broken deserialization or data corruption.", + query); + return false; + } + } + return true; + } + + public boolean isSkipMetadata() { + return skipMetadata; + } + @Override public ColumnDefinitions getVariables() { return preparedId.boundValuesMetadata.variables; @@ -315,4 +378,44 @@ public Boolean isIdempotent() { public boolean isLWT() { return isLWT; } + + private static boolean containsUDT(DataType dataType) { + if (dataType.isCollection()) { + for (DataType elementType : dataType.getTypeArguments()) { + if (containsUDT(elementType)) { + return true; + } + } + return false; + } + return dataType instanceof UserType; + } + + private static boolean isWildcardSelect(String query) { + List chunks = SPACE_SPLITTER.splitToList(query.trim().toLowerCase()); + if (chunks.size() < 2) { + // Weird query, assuming no result expected + return false; + } + + if (!chunks.get(0).equals("select")) { + // In case if non-select sneaks in, disable skip metadata for it no result expected. + return false; + } + + for (String chunk : chunks) { + if (chunk.equals("from")) { + return false; + } + if (chunk.equals("*")) { + return true; + } + for (String part : COMMA_SPLITTER.split(chunk)) { + if (part.equals("*")) { + return true; + } + } + } + return false; + } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java index c8cc565c36b..3fa3e335b51 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java +++ b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java @@ -69,6 +69,8 @@ public class QueryOptions { private volatile boolean reprepareOnUp = true; private volatile Cluster.Manager manager; private volatile boolean prepareOnAllHosts = true; + private volatile CQL4SkipMetadataResolveMethod skipCQL4MetadataResolveMethod = + CQL4SkipMetadataResolveMethod.SMART; private volatile boolean schemaQueriesPaged = true; @@ -193,6 +195,38 @@ public boolean getDefaultIdempotence() { return defaultIdempotence; } + /** + * There is known problem in CQL 4.x when prepared statement invalidation could be voided: more info When it happens metadata + * on client side does not match data and deserialization can go wrong in many ways To avoid + * driver can disable skip metadata flag to make server respond with metadata on every query. + * Unfortunately it causes excessive network traffic and CPU overhead on both server and driver + * side. This option controls how driver resolves skip metadata flag for CQL4 prepared statements. + * **SMART** - disable flag only for wildcard selects (select * from) and selects that return + * UDTs, including collections of UDTs and maps that contain UDTs **ENABLED** - flag is always set + * **DISABLED** - flag is always disabled Default is SMART Required: yes Modifiable at runtime: + * yes, the new value will be used for requests issued after the change. Overridable in a profile: + * yes + * + * @param method the new value to set as skip metadata resolve method. + * @return this {@code QueryOptions} instance. + */ + public QueryOptions setSkipCQL4MetadataResolveMethod(CQL4SkipMetadataResolveMethod method) { + this.skipCQL4MetadataResolveMethod = method; + return this; + } + + /** + * Skip metadata resolve method . + * + *

It defaults to {@link #skipCQL4MetadataResolveMethod.SMART}. + * + * @return the default idempotence for queries. + */ + public CQL4SkipMetadataResolveMethod getSkipCQL4MetadataResolveMethod() { + return this.skipCQL4MetadataResolveMethod; + } + /** * Set whether the driver should prepare statements on all hosts in the cluster. * @@ -583,4 +617,10 @@ public int hashCode() { public boolean isConsistencySet() { return consistencySet; } + + public enum CQL4SkipMetadataResolveMethod { + ENABLED, + DISABLED, + SMART + } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java index 1bf36867558..54cceda6126 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java @@ -654,11 +654,16 @@ else if (fetchSize != Integer.MAX_VALUE) } if (protocolVersion.compareTo(ProtocolVersion.V4) < 0) bs.ensureAllSet(); - // skip resultset metadata if version > 1 (otherwise this feature is not supported) - // and if we already have metadata for the prepared statement being executed. - boolean skipMetadata = - protocolVersion != ProtocolVersion.V1 - && bs.statement.getPreparedId().resultSetMetadata.variables != null; + boolean skipMetadata; + if (bs.statement instanceof DefaultPreparedStatement) { + skipMetadata = ((DefaultPreparedStatement) bs.statement).isSkipMetadata(); + } else { + skipMetadata = + protocolVersion != ProtocolVersion.V1 + && bs.statement.getPreparedId().resultSetMetadata.variables != null; + // skip resultset metadata if version > 1 (otherwise this feature is not supported) + // and if we already have metadata for the prepared statement being executed. + } Requests.QueryProtocolOptions options = new Requests.QueryProtocolOptions( diff --git a/driver-core/src/test/java/com/datastax/driver/core/PreparedStatementInvalidationTest.java b/driver-core/src/test/java/com/datastax/driver/core/PreparedStatementInvalidationTest.java index 86bc7c49648..55d045cd647 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/PreparedStatementInvalidationTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/PreparedStatementInvalidationTest.java @@ -32,15 +32,16 @@ package com.datastax.driver.core; import static com.datastax.driver.core.Assertions.assertThat; +import static com.datastax.driver.core.ProtocolVersion.V4; import static junit.framework.TestCase.fail; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.core.utils.CassandraVersion; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -@CassandraVersion("4.0") public class PreparedStatementInvalidationTest extends CCMTestsSupport { @BeforeMethod(groups = "short", alwaysRun = true) @@ -57,6 +58,7 @@ public void teardown() throws Exception { execute("DROP TABLE prepared_statement_invalidation_test"); } + @CassandraVersion("4.0") @Test(groups = "short") public void should_update_statement_id_when_metadata_changed_across_executions() { // given @@ -79,6 +81,7 @@ public void should_update_statement_id_when_metadata_changed_across_executions() assertThat(rows.getColumnDefinitions()).hasSize(4).containsVariable("d", DataType.cint()); } + @CassandraVersion("4.0") @Test(groups = "short") public void should_update_statement_id_when_metadata_changed_across_pages() throws Exception { // given @@ -114,6 +117,7 @@ public void should_update_statement_id_when_metadata_changed_across_pages() thro assertThat(definitionsAfter).hasSize(4).containsVariable("d", DataType.cint()); } + @CassandraVersion("4.0") @Test(groups = "short") public void should_update_statement_id_when_metadata_changed_across_sessions() { Session session1 = cluster().connect(); @@ -164,6 +168,7 @@ public void should_update_statement_id_when_metadata_changed_across_sessions() { assertThat(rows2.getColumnDefinitions()).hasSize(4).containsVariable("d", DataType.cint()); } + @CassandraVersion("4.0") @Test(groups = "short", expectedExceptions = NoHostAvailableException.class) public void should_not_reprepare_invalid_statements() { // given @@ -176,6 +181,7 @@ public void should_not_reprepare_invalid_statements() { session().execute(ps.bind()); } + @CassandraVersion("4.0") @Test(groups = "short") public void should_never_update_statement_id_for_conditional_updates_in_modern_protocol() { should_never_update_statement_id_for_conditional_updates(session()); @@ -240,6 +246,7 @@ private void should_never_update_statement_id_for_conditional_updates(Session se assertThat(ps.getPreparedId().resultSetMetadata.id).isEqualTo(idBefore); } + @CassandraVersion("4.0") @Test(groups = "short") public void should_never_update_statement_for_conditional_updates_in_legacy_protocols() { // Given @@ -248,9 +255,151 @@ public void should_never_update_statement_for_conditional_updates_in_legacy_prot Cluster.builder() .addContactPoints(getContactPoints()) .withPort(ccm().getBinaryPort()) - .withProtocolVersion(ccm().getProtocolVersion(ProtocolVersion.V4)) + .withProtocolVersion(ccm().getProtocolVersion(V4)) .build()); Session session = cluster.connect(keyspace); should_never_update_statement_id_for_conditional_updates(session); } + + @DataProvider(name = "resolverName") + public static Object[][] resolverName() { + return new Object[][] { + { + QueryOptions.CQL4SkipMetadataResolveMethod.SMART, + }, + { + QueryOptions.CQL4SkipMetadataResolveMethod.ENABLED, + }, + { + QueryOptions.CQL4SkipMetadataResolveMethod.DISABLED, + } + }; + } + + @Test(groups = "short", dataProvider = "resolverName") + public void prepared_stmt_metadata_update_loopholes_test( + QueryOptions.CQL4SkipMetadataResolveMethod resolver) { + // v0 is an int column, but we'll bind a String to it + try (Session session = sessionWithSkipCQL4MetadataResolveMethod(resolver)) { + String resolverNameFixed = resolver.name().toLowerCase().replace("-", "_"); + + String udtName = String.format("skip_metadata_test_%s_udt", resolverNameFixed); + String udtTable = String.format("skip_metadata_test_%s_udttable", resolverNameFixed); + String table = String.format("skip_metadata_test_%s_table", resolverNameFixed); + session.execute(String.format("CREATE TYPE IF NOT EXISTS %s (x int, y int)", udtName)); + + session.execute( + String.format("CREATE TABLE %s (pk int, v %s, PRIMARY KEY (pk))", udtTable, udtName)); + session.execute(String.format("CREATE TABLE %s (pk int, v int, PRIMARY KEY (pk))", table)); + + session.execute(String.format("INSERT INTO %s (pk, v) VALUES (1, 1)", table)); + session.execute(String.format("INSERT INTO %s (pk, v) VALUES (1, {x: 1, y: 1})", udtTable)); + + PreparedStatement stmtRegularTableWCS = + session.prepare(String.format("SELECT * FROM %s WHERE pk = ?", table)); + PreparedStatement stmtRegularTableTS = + session.prepare(String.format("SELECT pk, v FROM %s WHERE pk = ?", table)); + PreparedStatement stmtUDTTableWCS = + session.prepare(String.format("SELECT * FROM %s WHERE pk = ?", udtTable)); + PreparedStatement stmtUDTTableTS = + session.prepare(String.format("SELECT pk, v FROM %s WHERE pk = ?", udtTable)); + + boolean isCQL4orLower = stmtRegularTableWCS.getPreparedId().resultSetMetadata.id == null; + boolean isPreparedStatementInvalidationBroken = + isCQL4orLower && resolver == QueryOptions.CQL4SkipMetadataResolveMethod.ENABLED; + + if (isCQL4orLower) { + switch (resolver) { + case ENABLED: + assertThat(((DefaultPreparedStatement) stmtRegularTableTS).isSkipMetadata()) + .isEqualTo(true); + assertThat(((DefaultPreparedStatement) stmtRegularTableWCS).isSkipMetadata()) + .isEqualTo(true); + assertThat(((DefaultPreparedStatement) stmtUDTTableWCS).isSkipMetadata()) + .isEqualTo(true); + assertThat(((DefaultPreparedStatement) stmtUDTTableTS).isSkipMetadata()) + .isEqualTo(true); + break; + case DISABLED: + assertThat(((DefaultPreparedStatement) stmtRegularTableTS).isSkipMetadata()) + .isEqualTo(false); + assertThat(((DefaultPreparedStatement) stmtRegularTableWCS).isSkipMetadata()) + .isEqualTo(false); + assertThat(((DefaultPreparedStatement) stmtUDTTableWCS).isSkipMetadata()) + .isEqualTo(false); + assertThat(((DefaultPreparedStatement) stmtUDTTableTS).isSkipMetadata()) + .isEqualTo(false); + break; + default: // SMART + assertThat(((DefaultPreparedStatement) stmtRegularTableTS).isSkipMetadata()) + .isEqualTo(true); + assertThat(((DefaultPreparedStatement) stmtRegularTableWCS).isSkipMetadata()) + .isEqualTo(false); + assertThat(((DefaultPreparedStatement) stmtUDTTableWCS).isSkipMetadata()) + .isEqualTo(false); + assertThat(((DefaultPreparedStatement) stmtUDTTableTS).isSkipMetadata()) + .isEqualTo(false); + } + } + + Row row = session.execute(stmtUDTTableWCS.bind(1)).one(); + assertThat(row.getColumnDefinitions().size()).isEqualTo(2); + assertThat(getUDTColumnCount(row.getColumnDefinitions().asList().get(1))).isEqualTo(2); + row = session.execute(stmtUDTTableTS.bind(1)).one(); + assertThat(getUDTColumnCount(row.getColumnDefinitions().asList().get(1))).isEqualTo(2); + assertThat(row.getColumnDefinitions().size()).isEqualTo(2); + row = session.execute(stmtRegularTableWCS.bind(1)).one(); + assertThat(row.getColumnDefinitions().size()).isEqualTo(2); + + session.execute(String.format("ALTER TABLE %s ADD z int;", table)); + session.execute(String.format("ALTER TYPE %s ADD z int;", udtName)); + + int expectedUDTColumnCount = 3; + int expectedTableColumnCount = 3; + if (isPreparedStatementInvalidationBroken) { + // When case of CQL4 and skip metadata is set prepared statements will not be invalidated. + expectedUDTColumnCount = 2; + expectedTableColumnCount = 2; + } + + row = session.execute(stmtUDTTableWCS.bind(1)).one(); + assertThat(row.getUDTValue(1).getType().getFieldNames().size()) + .isEqualTo(expectedUDTColumnCount); + assertThat(row.getColumnDefinitions().size()).isEqualTo(2); + assertThat(getUDTColumnCount(row.getColumnDefinitions().asList().get(1))) + .isEqualTo(expectedUDTColumnCount); + + row = session.execute(stmtUDTTableTS.bind(1)).one(); + assertThat(row.getUDTValue(1).getType().getFieldNames().size()) + .isEqualTo(expectedUDTColumnCount); + assertThat(row.getColumnDefinitions().size()).isEqualTo(2); + assertThat(getUDTColumnCount(row.getColumnDefinitions().asList().get(1))) + .isEqualTo(expectedUDTColumnCount); + + row = session.execute(stmtRegularTableWCS.bind(1)).one(); + assertThat(row.getColumnDefinitions().size()).isEqualTo(expectedTableColumnCount); + } + } + + private int getUDTColumnCount(ColumnDefinitions.Definition cd) { + return ((UserType) cd.getType()).getFieldNames().size(); + } + + private Session sessionWithSkipCQL4MetadataResolveMethod( + QueryOptions.CQL4SkipMetadataResolveMethod resolver) { + Cluster cluster = + register( + Cluster.builder() + .addContactPoints(getContactPoints()) + .withPort(ccm().getBinaryPort()) + .withProtocolVersion(V4) + .withQueryOptions(new QueryOptions().setSkipCQL4MetadataResolveMethod(resolver)) + .build()) + .init(); + Session session = cluster.connect(); + session.execute( + "CREATE KEYSPACE IF NOT EXISTS cql4_loopholes_test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': '1' }"); + session.execute("USE cql4_loopholes_test"); + return session; + } }