From 16b78835de984f7f2cb239b7168a4585c41e378a Mon Sep 17 00:00:00 2001 From: janehe Date: Tue, 24 Jun 2025 16:31:29 -0700 Subject: [PATCH 01/24] be generous for should_evict_down_node_metrics_when_timeout_fires --- .../datastax/oss/driver/core/metrics/MetricsITBase.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java index e6121217619..ade4d974480 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java @@ -174,11 +174,13 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception // trigger node1 UP -> DOWN eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.DOWN, node1)); - Thread.sleep(expireAfter.toMillis()); + Thread.sleep(expireAfter.toMillis() + 100); // then node-level metrics should be evicted from node1, but // node2 and node3 metrics should not have been evicted - await().untilAsserted(() -> assertNodeMetricsEvicted(session, node1)); + await() + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertNodeMetricsEvicted(session, node1)); assertNodeMetricsNotEvicted(session, node2); assertNodeMetricsNotEvicted(session, node3); @@ -226,7 +228,8 @@ public void should_not_evict_down_node_metrics_when_node_is_back_up_before_timeo eventBus.fire(NodeStateEvent.changed(NodeState.FORCED_DOWN, NodeState.UP, node2)); eventBus.fire(NodeStateEvent.added(node3)); - Thread.sleep(expireAfter.toMillis()); + // Add a small buffer to ensure the timeout would have fired if it wasn't cancelled + Thread.sleep(expireAfter.toMillis() + 100); // then no node-level metrics should be evicted assertNodeMetricsNotEvicted(session, node1); From 6f31d18410e4a7b393132fbdb1d893d75a521e5d Mon Sep 17 00:00:00 2001 From: janehe Date: Tue, 24 Jun 2025 18:09:26 -0700 Subject: [PATCH 02/24] mavenbundle start level 1 --- .../internal/osgi/support/BundleOptions.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java b/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java index 3e6171ca530..04a8afaa046 100644 --- a/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java +++ b/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java @@ -35,12 +35,14 @@ public class BundleOptions { public static CompositeOption commonBundles() { return () -> options( - mavenBundle("org.apache.cassandra", "java-driver-guava-shaded").versionAsInProject(), - mavenBundle("io.dropwizard.metrics", "metrics-core").versionAsInProject(), - mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), - mavenBundle("org.hdrhistogram", "HdrHistogram").versionAsInProject(), - mavenBundle("com.typesafe", "config").versionAsInProject(), - mavenBundle("com.datastax.oss", "native-protocol").versionAsInProject(), + mavenBundle("org.apache.cassandra", "java-driver-guava-shaded") + .versionAsInProject() + .startLevel(1), + mavenBundle("io.dropwizard.metrics", "metrics-core").versionAsInProject().startLevel(1), + mavenBundle("org.slf4j", "slf4j-api").versionAsInProject().startLevel(1), + mavenBundle("org.hdrhistogram", "HdrHistogram").versionAsInProject().startLevel(1), + mavenBundle("com.typesafe", "config").versionAsInProject().startLevel(1), + mavenBundle("com.datastax.oss", "native-protocol").versionAsInProject().startLevel(1), logbackBundles(), debugOptions()); } @@ -51,7 +53,7 @@ public static CompositeOption applicationBundle() { systemProperty("cassandra.contactpoints").value("127.0.0.1"), systemProperty("cassandra.port").value("9042"), systemProperty("cassandra.keyspace").value("test_osgi"), - bundle("reference:file:target/classes")); + bundle("reference:file:target/classes").startLevel(3)); } public static UrlProvisionOption driverCoreBundle() { @@ -59,15 +61,15 @@ public static UrlProvisionOption driverCoreBundle() { } public static UrlProvisionOption driverCoreShadedBundle() { - return bundle("reference:file:../core-shaded/target/classes"); + return bundle("reference:file:../core-shaded/target/classes").startLevel(1); } public static UrlProvisionOption driverQueryBuilderBundle() { - return bundle("reference:file:../query-builder/target/classes"); + return bundle("reference:file:../query-builder/target/classes").startLevel(2); } public static UrlProvisionOption driverMapperRuntimeBundle() { - return bundle("reference:file:../mapper-runtime/target/classes"); + return bundle("reference:file:../mapper-runtime/target/classes").startLevel(2); } public static UrlProvisionOption driverTestInfraBundle() { From ff0cc04714249eecf5a5dc6ef1f0a3b0cf892bb6 Mon Sep 17 00:00:00 2001 From: janehe Date: Tue, 24 Jun 2025 23:51:25 -0700 Subject: [PATCH 03/24] should_evict_down_node_metrics_when_timeout_fires to 40 seconds --- .../com/datastax/oss/driver/core/metrics/MetricsITBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java index ade4d974480..9c177b680f8 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java @@ -179,7 +179,7 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception // then node-level metrics should be evicted from node1, but // node2 and node3 metrics should not have been evicted await() - .atMost(Duration.ofSeconds(15)) + .atMost(Duration.ofSeconds(40)) .untilAsserted(() -> assertNodeMetricsEvicted(session, node1)); assertNodeMetricsNotEvicted(session, node2); assertNodeMetricsNotEvicted(session, node3); From 6594f3b6f01e0ea34b1a3c3eeda3080b08d12321 Mon Sep 17 00:00:00 2001 From: janehe Date: Tue, 24 Jun 2025 23:57:21 -0700 Subject: [PATCH 04/24] udt codec IT simple statement raise time out to 20 seconds --- .../internal/core/type/codec/UdtCodecIT.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java index 804a078bbe0..6ada959de4c 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java @@ -22,12 +22,14 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.data.UdtValue; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.core.type.codec.TypeCodec; import com.datastax.oss.driver.api.testinfra.ccm.CcmRule; import com.datastax.oss.driver.api.testinfra.session.SessionRule; import com.datastax.oss.driver.categories.ParallelizableTests; +import java.time.Duration; import java.util.Objects; import org.junit.Rule; import org.junit.Test; @@ -47,8 +49,13 @@ public class UdtCodecIT { @Test public void should_decoding_udt_be_backward_compatible() { CqlSession session = sessionRule.session(); - session.execute("CREATE TYPE test_type_1 (a text, b int)"); - session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen)"); + session.execute( + SimpleStatement.newInstance("CREATE TYPE test_type_1 (a text, b int)") + .setTimeout(Duration.ofSeconds(20))); + session.execute( + SimpleStatement.newInstance( + "CREATE TABLE test_table_1 (e int primary key, f frozen)") + .setTimeout(Duration.ofSeconds(20))); // insert a row using version 1 of the UDT schema session.execute("INSERT INTO test_table_1(e, f) VALUES(1, {a: 'a', b: 1})"); UserDefinedType udt = @@ -59,7 +66,9 @@ public void should_decoding_udt_be_backward_compatible() { .orElseThrow(IllegalStateException::new); TypeCodec oldCodec = session.getContext().getCodecRegistry().codecFor(udt); // update UDT schema - session.execute("ALTER TYPE test_type_1 add i text"); + session.execute( + SimpleStatement.newInstance("ALTER TYPE test_type_1 add i text") + .setTimeout(Duration.ofSeconds(20))); // insert a row using version 2 of the UDT schema session.execute("INSERT INTO test_table_1(e, f) VALUES(2, {a: 'b', b: 2, i: 'b'})"); Row row = From faeda6ed17b0f9ce47fc623f810d9125f6a25457 Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 25 Jun 2025 15:46:38 -0700 Subject: [PATCH 05/24] check schema agreement for PreparedStatementCachingIT --- .../oss/driver/core/cql/PreparedStatementCachingIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index 617d489fb95..e929d205970 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -266,6 +266,8 @@ private void invalidationTestInner( // alter test_type_2 to trigger cache invalidation and above events session.execute("ALTER TYPE test_type_2 add i blob"); + session.checkSchemaAgreement(); + // wait for latches and fail if they don't reach zero before timeout assertThat( Uninterruptibles.awaitUninterruptibly( From 935cf34aa40b99e78f7aa1680552253189247846 Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 25 Jun 2025 16:32:06 -0700 Subject: [PATCH 06/24] DefaultReactiveResultSetIT raise METADATA_SCHEMA_REQUEST_TIMEOUT? --- .../core/cql/reactive/DefaultReactiveResultSetIT.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java index c00cf064e51..e3ffa39e824 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java @@ -34,12 +34,14 @@ import com.datastax.oss.driver.api.testinfra.ccm.CcmRule; import com.datastax.oss.driver.api.testinfra.ccm.SchemaChangeSynchronizer; import com.datastax.oss.driver.api.testinfra.session.SessionRule; +import com.datastax.oss.driver.api.testinfra.session.SessionUtils; import com.datastax.oss.driver.categories.ParallelizableTests; import com.datastax.oss.driver.internal.core.cql.EmptyColumnDefinitions; import com.tngtech.java.junit.dataprovider.DataProvider; import com.tngtech.java.junit.dataprovider.DataProviderRunner; import edu.umd.cs.findbugs.annotations.NonNull; import io.reactivex.Flowable; +import java.time.Duration; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -58,7 +60,14 @@ public class DefaultReactiveResultSetIT { private static CcmRule ccmRule = CcmRule.getInstance(); - private static SessionRule sessionRule = SessionRule.builder(ccmRule).build(); + private static SessionRule sessionRule = + SessionRule.builder(ccmRule) + .withConfigLoader( + SessionUtils.configLoaderBuilder() + .withDuration( + DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofSeconds(20)) + .build()) + .build(); @ClassRule public static TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule); From ab7b1c1ac7a348e8196790ff5363b404030f5316 Mon Sep 17 00:00:00 2001 From: janehe Date: Mon, 30 Jun 2025 12:55:07 -0700 Subject: [PATCH 07/24] DefaultReactiveResultSetIT raise METADATA_SCHEMA_REQUEST_TIMEOUT? --- .../oss/driver/core/cql/PreparedStatementCachingIT.java | 4 ++-- .../com/datastax/oss/driver/core/metadata/SchemaIT.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index e929d205970..dad7edd794f 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -271,10 +271,10 @@ private void invalidationTestInner( // wait for latches and fail if they don't reach zero before timeout assertThat( Uninterruptibles.awaitUninterruptibly( - preparedStmtCacheRemoveLatch, 10, TimeUnit.SECONDS)) + preparedStmtCacheRemoveLatch, 20, TimeUnit.SECONDS)) .withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout") .isTrue(); - assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 10, TimeUnit.SECONDS)) + assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 20, TimeUnit.SECONDS)) .withFailMessage("typeChangeEventLatch did not trigger before timeout") .isTrue(); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java index df5571974c1..728bd3c6225 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java @@ -151,11 +151,11 @@ public void should_disable_schema_programmatically_when_enabled_in_config() { sessionRule .session() .execute( - SimpleStatement.builder("CREATE TABLE foo(k int primary key)") + SimpleStatement.builder("CREATE TABLE foo_schema_it(k int primary key)") .setExecutionProfile(slowProfile) .build()); assertThat(session.getMetadata().getKeyspace(sessionRule.keyspace()).get().getTables()) - .doesNotContainKey(CqlIdentifier.fromInternal("foo")); + .doesNotContainKey(CqlIdentifier.fromInternal("foo_schema_it")); // Reset to config value (true), should refresh and load the new table session.setSchemaMetadataEnabled(null); @@ -167,7 +167,7 @@ public void should_disable_schema_programmatically_when_enabled_in_config() { () -> assertThat( session.getMetadata().getKeyspace(sessionRule.keyspace()).get().getTables()) - .containsKey(CqlIdentifier.fromInternal("foo"))); + .containsKey(CqlIdentifier.fromInternal("foo_schema_it"))); } @Test From 7ac257a08da1a1e77f8982ca1a5f206368facde7 Mon Sep 17 00:00:00 2001 From: janehe Date: Mon, 30 Jun 2025 16:37:53 -0700 Subject: [PATCH 08/24] change timeout to 120s or sth. --- .../core/cql/PreparedStatementCachingIT.java | 2 +- .../reactive/DefaultReactiveResultSetIT.java | 34 ++++++++----------- .../driver/core/metrics/MetricsITBase.java | 2 +- .../internal/core/type/codec/UdtCodecIT.java | 4 +-- 4 files changed, 18 insertions(+), 24 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index dad7edd794f..b2fdeb6394a 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -271,7 +271,7 @@ private void invalidationTestInner( // wait for latches and fail if they don't reach zero before timeout assertThat( Uninterruptibles.awaitUninterruptibly( - preparedStmtCacheRemoveLatch, 20, TimeUnit.SECONDS)) + preparedStmtCacheRemoveLatch, 120, TimeUnit.SECONDS)) .withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout") .isTrue(); assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 20, TimeUnit.SECONDS)) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java index e3ffa39e824..77a449de44f 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java @@ -31,17 +31,16 @@ import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.testinfra.ccm.CcmRule; import com.datastax.oss.driver.api.testinfra.ccm.SchemaChangeSynchronizer; import com.datastax.oss.driver.api.testinfra.session.SessionRule; -import com.datastax.oss.driver.api.testinfra.session.SessionUtils; import com.datastax.oss.driver.categories.ParallelizableTests; import com.datastax.oss.driver.internal.core.cql.EmptyColumnDefinitions; import com.tngtech.java.junit.dataprovider.DataProvider; import com.tngtech.java.junit.dataprovider.DataProviderRunner; import edu.umd.cs.findbugs.annotations.NonNull; import io.reactivex.Flowable; -import java.time.Duration; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -60,14 +59,7 @@ public class DefaultReactiveResultSetIT { private static CcmRule ccmRule = CcmRule.getInstance(); - private static SessionRule sessionRule = - SessionRule.builder(ccmRule) - .withConfigLoader( - SessionUtils.configLoaderBuilder() - .withDuration( - DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofSeconds(20)) - .build()) - .build(); + private static SessionRule sessionRule = SessionRule.builder(ccmRule).build(); @ClassRule public static TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule); @@ -76,19 +68,15 @@ public static void initialize() { CqlSession session = sessionRule.session(); SchemaChangeSynchronizer.withLock( () -> { - session.execute("DROP TABLE IF EXISTS test_reactive_read"); - session.execute("DROP TABLE IF EXISTS test_reactive_write"); + session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_read")); + session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_write")); session.checkSchemaAgreement(); session.execute( - SimpleStatement.builder( - "CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))") - .setExecutionProfile(sessionRule.slowProfile()) - .build()); + createSlowStatement( + "CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")); session.execute( - SimpleStatement.builder( - "CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))") - .setExecutionProfile(sessionRule.slowProfile()) - .build()); + createSlowStatement( + "CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")); session.checkSchemaAgreement(); }); for (int i = 0; i < 1000; i++) { @@ -101,6 +89,12 @@ public static void initialize() { } } + static Statement createSlowStatement(String statement) { + return SimpleStatement.builder(statement) + .setExecutionProfile(sessionRule.slowProfile()) + .build(); + } + @Before public void truncateTables() throws Exception { CqlSession session = sessionRule.session(); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java index 9c177b680f8..8d5022c2393 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java @@ -179,7 +179,7 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception // then node-level metrics should be evicted from node1, but // node2 and node3 metrics should not have been evicted await() - .atMost(Duration.ofSeconds(40)) + .atMost(Duration.ofSeconds(120)) .untilAsserted(() -> assertNodeMetricsEvicted(session, node1)); assertNodeMetricsNotEvicted(session, node2); assertNodeMetricsNotEvicted(session, node3); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java index 6ada959de4c..c2945b56ed7 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java @@ -51,11 +51,11 @@ public void should_decoding_udt_be_backward_compatible() { CqlSession session = sessionRule.session(); session.execute( SimpleStatement.newInstance("CREATE TYPE test_type_1 (a text, b int)") - .setTimeout(Duration.ofSeconds(20))); + .setTimeout(Duration.ofSeconds(120))); session.execute( SimpleStatement.newInstance( "CREATE TABLE test_table_1 (e int primary key, f frozen)") - .setTimeout(Duration.ofSeconds(20))); + .setTimeout(Duration.ofSeconds(120))); // insert a row using version 1 of the UDT schema session.execute("INSERT INTO test_table_1(e, f) VALUES(1, {a: 'a', b: 1})"); UserDefinedType udt = From 5593f5b6f29116c53e51c1d6da32c67828b7a1a1 Mon Sep 17 00:00:00 2001 From: janehe Date: Tue, 1 Jul 2025 14:45:33 -0700 Subject: [PATCH 09/24] change table and type names --- .../core/cql/PreparedStatementCachingIT.java | 74 +++++++++++-------- .../internal/core/type/codec/UdtCodecIT.java | 19 ++--- 2 files changed, 52 insertions(+), 41 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index b2fdeb6394a..c2c359be280 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -194,8 +194,8 @@ private void invalidationResultSetTest( Consumer setupTestSchema, Set expectedChangedTypes) { invalidationTestInner( setupTestSchema, - "select f from test_table_1 where e = ?", - "select h from test_table_2 where g = ?", + "select f from test_table_caching_1 where e = ?", + "select h from test_table_caching_2 where g = ?", expectedChangedTypes); } @@ -206,8 +206,8 @@ private void invalidationVariableDefsTest( String condition = isCollection ? "contains ?" : "= ?"; invalidationTestInner( setupTestSchema, - String.format("select e from test_table_1 where f %s allow filtering", condition), - String.format("select g from test_table_2 where h %s allow filtering", condition), + String.format("select e from test_table_caching_1 where f %s allow filtering", condition), + String.format("select g from test_table_caching_2 where h %s allow filtering", condition), expectedChangedTypes); } @@ -263,8 +263,8 @@ private void invalidationTestInner( preparedStmtCacheRemoveLatch.countDown(); }); - // alter test_type_2 to trigger cache invalidation and above events - session.execute("ALTER TYPE test_type_2 add i blob"); + // alter test_type_caching_2 to trigger cache invalidation and above events + session.execute("ALTER TYPE test_type_caching_2 add i blob"); session.checkSchemaAgreement(); @@ -297,17 +297,20 @@ private void invalidationTestInner( Consumer setupCacheEntryTestBasic = (session) -> { - session.execute("CREATE TYPE test_type_1 (a text, b int)"); - session.execute("CREATE TYPE test_type_2 (c int, d text)"); - session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen)"); - session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen)"); + session.execute("CREATE TYPE test_type_caching_1 (a text, b int)"); + session.execute("CREATE TYPE test_type_caching_2 (c int, d text)"); + session.execute( + "CREATE TABLE test_table_caching_1 (e int primary key, f frozen)"); + session.execute( + "CREATE TABLE test_table_caching_2 (g int primary key, h frozen)"); }; @Test public void should_invalidate_cache_entry_on_basic_udt_change_result_set() { SchemaChangeSynchronizer.withLock( () -> { - invalidationResultSetTest(setupCacheEntryTestBasic, ImmutableSet.of("test_type_2")); + invalidationResultSetTest( + setupCacheEntryTestBasic, ImmutableSet.of("test_type_caching_2")); }); } @@ -316,25 +319,26 @@ public void should_invalidate_cache_entry_on_basic_udt_change_variable_defs() { SchemaChangeSynchronizer.withLock( () -> { invalidationVariableDefsTest( - setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_2")); + setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_caching_2")); }); } Consumer setupCacheEntryTestCollection = (session) -> { - session.execute("CREATE TYPE test_type_1 (a text, b int)"); - session.execute("CREATE TYPE test_type_2 (c int, d text)"); + session.execute("CREATE TYPE test_type_caching_1 (a text, b int)"); + session.execute("CREATE TYPE test_type_caching_2 (c int, d text)"); session.execute( - "CREATE TABLE test_table_1 (e int primary key, f list>)"); + "CREATE TABLE test_table_caching_1 (e int primary key, f list>)"); session.execute( - "CREATE TABLE test_table_2 (g int primary key, h list>)"); + "CREATE TABLE test_table_caching_2 (g int primary key, h list>)"); }; @Test public void should_invalidate_cache_entry_on_collection_udt_change_result_set() { SchemaChangeSynchronizer.withLock( () -> { - invalidationResultSetTest(setupCacheEntryTestCollection, ImmutableSet.of("test_type_2")); + invalidationResultSetTest( + setupCacheEntryTestCollection, ImmutableSet.of("test_type_caching_2")); }); } @@ -343,25 +347,26 @@ public void should_invalidate_cache_entry_on_collection_udt_change_variable_defs SchemaChangeSynchronizer.withLock( () -> { invalidationVariableDefsTest( - setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_2")); + setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_caching_2")); }); } Consumer setupCacheEntryTestTuple = (session) -> { - session.execute("CREATE TYPE test_type_1 (a text, b int)"); - session.execute("CREATE TYPE test_type_2 (c int, d text)"); + session.execute("CREATE TYPE test_type_caching_1 (a text, b int)"); + session.execute("CREATE TYPE test_type_caching_2 (c int, d text)"); session.execute( - "CREATE TABLE test_table_1 (e int primary key, f tuple)"); + "CREATE TABLE test_table_caching_1 (e int primary key, f tuple)"); session.execute( - "CREATE TABLE test_table_2 (g int primary key, h tuple)"); + "CREATE TABLE test_table_caching_2 (g int primary key, h tuple)"); }; @Test public void should_invalidate_cache_entry_on_tuple_udt_change_result_set() { SchemaChangeSynchronizer.withLock( () -> { - invalidationResultSetTest(setupCacheEntryTestTuple, ImmutableSet.of("test_type_2")); + invalidationResultSetTest( + setupCacheEntryTestTuple, ImmutableSet.of("test_type_caching_2")); }); } @@ -370,18 +375,20 @@ public void should_invalidate_cache_entry_on_tuple_udt_change_variable_defs() { SchemaChangeSynchronizer.withLock( () -> { invalidationVariableDefsTest( - setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_2")); + setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_caching_2")); }); } Consumer setupCacheEntryTestNested = (session) -> { - session.execute("CREATE TYPE test_type_1 (a text, b int)"); - session.execute("CREATE TYPE test_type_2 (c int, d text)"); - session.execute("CREATE TYPE test_type_3 (e frozen, f int)"); - session.execute("CREATE TYPE test_type_4 (g int, h frozen)"); - session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen)"); - session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen)"); + session.execute("CREATE TYPE test_type_caching_1 (a text, b int)"); + session.execute("CREATE TYPE test_type_caching_2 (c int, d text)"); + session.execute("CREATE TYPE test_type_caching_3 (e frozen, f int)"); + session.execute("CREATE TYPE test_type_caching_4 (g int, h frozen)"); + session.execute( + "CREATE TABLE test_table_caching_1 (e int primary key, f frozen)"); + session.execute( + "CREATE TABLE test_table_caching_2 (g int primary key, h frozen)"); }; @Test @@ -389,7 +396,8 @@ public void should_invalidate_cache_entry_on_nested_udt_change_result_set() { SchemaChangeSynchronizer.withLock( () -> { invalidationResultSetTest( - setupCacheEntryTestNested, ImmutableSet.of("test_type_2", "test_type_4")); + setupCacheEntryTestNested, + ImmutableSet.of("test_type_caching_2", "test_type_caching_4")); }); } @@ -398,7 +406,9 @@ public void should_invalidate_cache_entry_on_nested_udt_change_variable_defs() { SchemaChangeSynchronizer.withLock( () -> { invalidationVariableDefsTest( - setupCacheEntryTestNested, false, ImmutableSet.of("test_type_2", "test_type_4")); + setupCacheEntryTestNested, + false, + ImmutableSet.of("test_type_caching_2", "test_type_caching_4")); }); } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java index c2945b56ed7..ff6f3a9d2c5 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java @@ -50,29 +50,30 @@ public class UdtCodecIT { public void should_decoding_udt_be_backward_compatible() { CqlSession session = sessionRule.session(); session.execute( - SimpleStatement.newInstance("CREATE TYPE test_type_1 (a text, b int)") - .setTimeout(Duration.ofSeconds(120))); + SimpleStatement.newInstance("CREATE TYPE test_type_udt_1 (a text, b int)") + .setTimeout(Duration.ofSeconds(20))); session.execute( SimpleStatement.newInstance( - "CREATE TABLE test_table_1 (e int primary key, f frozen)") - .setTimeout(Duration.ofSeconds(120))); + "CREATE TABLE test_table_udt_1 (e int primary key, f frozen)") + .setTimeout(Duration.ofSeconds(20))); // insert a row using version 1 of the UDT schema - session.execute("INSERT INTO test_table_1(e, f) VALUES(1, {a: 'a', b: 1})"); + session.execute("INSERT INTO test_table_udt_1(e, f) VALUES(1, {a: 'a', b: 1})"); UserDefinedType udt = session .getMetadata() .getKeyspace(sessionRule.keyspace()) - .flatMap(ks -> ks.getUserDefinedType("test_type_1")) + .flatMap(ks -> ks.getUserDefinedType("test_type_udt_1")) .orElseThrow(IllegalStateException::new); TypeCodec oldCodec = session.getContext().getCodecRegistry().codecFor(udt); // update UDT schema session.execute( - SimpleStatement.newInstance("ALTER TYPE test_type_1 add i text") + SimpleStatement.newInstance("ALTER TYPE test_type_udt_1 add i text") .setTimeout(Duration.ofSeconds(20))); // insert a row using version 2 of the UDT schema - session.execute("INSERT INTO test_table_1(e, f) VALUES(2, {a: 'b', b: 2, i: 'b'})"); + session.execute("INSERT INTO test_table_udt_1(e, f) VALUES(2, {a: 'b', b: 2, i: 'b'})"); Row row = - Objects.requireNonNull(session.execute("SELECT f FROM test_table_1 WHERE e = ?", 2).one()); + Objects.requireNonNull( + session.execute("SELECT f FROM test_table_udt_1 WHERE e = ?", 2).one()); // Try to read new row with old codec. Using row.getUdtValue() would not cause any issues, // because new codec will be automatically registered (using all 3 attributes). // If application leverages generic row.get(String, Codec) method, data reading with old codec From 646ffc9c830101370ad20bdca950ac1c46c25dc3 Mon Sep 17 00:00:00 2001 From: janehe Date: Tue, 1 Jul 2025 15:10:47 -0700 Subject: [PATCH 10/24] DefaultReactiveResultSetIT not paralell --- .../driver/core/cql/reactive/DefaultReactiveResultSetIT.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java index 77a449de44f..5c32638bf36 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java @@ -35,7 +35,6 @@ import com.datastax.oss.driver.api.testinfra.ccm.CcmRule; import com.datastax.oss.driver.api.testinfra.ccm.SchemaChangeSynchronizer; import com.datastax.oss.driver.api.testinfra.session.SessionRule; -import com.datastax.oss.driver.categories.ParallelizableTests; import com.datastax.oss.driver.internal.core.cql.EmptyColumnDefinitions; import com.tngtech.java.junit.dataprovider.DataProvider; import com.tngtech.java.junit.dataprovider.DataProviderRunner; @@ -48,13 +47,11 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.RuleChain; import org.junit.rules.TestRule; import org.junit.runner.RunWith; @RunWith(DataProviderRunner.class) -@Category(ParallelizableTests.class) public class DefaultReactiveResultSetIT { private static CcmRule ccmRule = CcmRule.getInstance(); From 02dedffb214f2fa61ca1c4f883e864ec7aa3afed Mon Sep 17 00:00:00 2001 From: janehe Date: Tue, 1 Jul 2025 15:27:01 -0700 Subject: [PATCH 11/24] empty From e2b506200fc499b0eb3ebb4665c2a066dd8086ff Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 2 Jul 2025 12:52:11 -0700 Subject: [PATCH 12/24] compilation warning --- .../com/datastax/oss/driver/core/metrics/MetricsITBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java index 8d5022c2393..716dc1b66a6 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java @@ -179,7 +179,7 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception // then node-level metrics should be evicted from node1, but // node2 and node3 metrics should not have been evicted await() - .atMost(Duration.ofSeconds(120)) + .atMost(Duration.ofMinutes(2)) .untilAsserted(() -> assertNodeMetricsEvicted(session, node1)); assertNodeMetricsNotEvicted(session, node2); assertNodeMetricsNotEvicted(session, node3); From eb8822bf269649bf86d40692b8fd602586b7594c Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 2 Jul 2025 13:51:16 -0700 Subject: [PATCH 13/24] empty From d7165f26ff3ea60fc3cae14c83e136f0adea1520 Mon Sep 17 00:00:00 2001 From: janehe Date: Mon, 7 Jul 2025 22:01:52 -0700 Subject: [PATCH 14/24] add logging --- .../core/cql/PreparedStatementCachingIT.java | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index c2c359be280..02d0a5249a6 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -72,6 +72,8 @@ @Category(IsolatedTests.class) public class PreparedStatementCachingIT { + private static final Logger LOG = LoggerFactory.getLogger(PreparedStatementCachingIT.class); + private CustomCcmRule ccmRule = CustomCcmRule.builder().build(); private SessionRule sessionRule = @@ -266,15 +268,46 @@ private void invalidationTestInner( // alter test_type_caching_2 to trigger cache invalidation and above events session.execute("ALTER TYPE test_type_caching_2 add i blob"); + // Give a small delay to allow the schema change to propagate before checking agreement + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); session.checkSchemaAgreement(); // wait for latches and fail if they don't reach zero before timeout - assertThat( - Uninterruptibles.awaitUninterruptibly( - preparedStmtCacheRemoveLatch, 120, TimeUnit.SECONDS)) + // Use longer timeout for cache removal as it depends on complex event chain + boolean cacheRemovalSuccess = + Uninterruptibles.awaitUninterruptibly( + preparedStmtCacheRemoveLatch, 180, TimeUnit.SECONDS); + boolean typeChangeSuccess = + Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 60, TimeUnit.SECONDS); + + // Provide detailed diagnostics if either latch fails + if (!cacheRemovalSuccess || !typeChangeSuccess) { + String diagnostics = + String.format( + "Test failure diagnostics:\n" + + " - Cache removal latch success: %s (count: %d)\n" + + " - Type change latch success: %s (count: %d)\n" + + " - Current cache size: %d\n" + + " - Changed types detected: %s\n" + + " - Removed query IDs: %s\n" + + " - Type change errors: %s\n" + + " - Removal event errors: %s", + cacheRemovalSuccess, + preparedStmtCacheRemoveLatch.getCount(), + typeChangeSuccess, + typeChangeEventLatch.getCount(), + getPreparedCacheSize(session), + changedTypes.keySet(), + removedQueryIds.get(), + typeChangeEventError.get(), + removedQueryEventError.get()); + LOG.error("Prepared statement cache invalidation test failed: {}", diagnostics); + } + + assertThat(cacheRemovalSuccess) .withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout") .isTrue(); - assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 20, TimeUnit.SECONDS)) + assertThat(typeChangeSuccess) .withFailMessage("typeChangeEventLatch did not trigger before timeout") .isTrue(); From ec7a107014fa5c91b28bee7b0a717ebc42568b8e Mon Sep 17 00:00:00 2001 From: janehe Date: Tue, 8 Jul 2025 01:05:44 -0700 Subject: [PATCH 15/24] add logging --- .../core/cql/PreparedStatementCachingIT.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index 02d0a5249a6..b23b6886b05 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -123,9 +123,11 @@ private static RemovalListener buildCacheRemoveCallback( @NonNull Optional context) { return (evt) -> { try { + LOG.info("Cache removal callback triggered, cause: {}", evt.getCause()); CompletableFuture future = (CompletableFuture) evt.getValue(); ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId(); + LOG.info("Firing PreparedStatementRemovalEvent for queryId: {}", queryId); context.ifPresent( ctx -> ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId))); } catch (Exception e) { @@ -224,10 +226,15 @@ private void invalidationTestInner( assertThat(getPreparedCacheSize(session)).isEqualTo(0); setupTestSchema.accept(session); - session.prepare(preparedStmtQueryType1); - ByteBuffer queryId2 = session.prepare(preparedStmtQueryType2).getId(); + PreparedStatement stmt1 = session.prepare(preparedStmtQueryType1); + PreparedStatement stmt2 = session.prepare(preparedStmtQueryType2); + ByteBuffer queryId2 = stmt2.getId(); assertThat(getPreparedCacheSize(session)).isEqualTo(2); + LOG.info("Prepared statements in cache:"); + LOG.info(" Statement 1: {} (queryId: {})", preparedStmtQueryType1, stmt1.getId()); + LOG.info(" Statement 2: {} (queryId: {})", preparedStmtQueryType2, stmt2.getId()); + CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch(1); CountDownLatch typeChangeEventLatch = new CountDownLatch(expectedChangedTypes.size()); @@ -244,6 +251,8 @@ private void invalidationTestInner( TypeChangeEvent.class, (e) -> { // expect one event per type changed and for every parent type that nests it + LOG.info("Received TypeChangeEvent for type: {} (changeType: {})", + e.oldType.getName(), e.changeType); if (Boolean.TRUE.equals( changedTypes.putIfAbsent(e.oldType.getName().toString(), true))) { // store an error if we see duplicate change event @@ -256,16 +265,20 @@ private void invalidationTestInner( .register( PreparedStatementRemovalEvent.class, (e) -> { + LOG.info("Received PreparedStatementRemovalEvent for queryId: {}", e.queryId); if (!removedQueryIds.compareAndSet(Optional.empty(), Optional.of(e.queryId))) { // store an error if we see multiple cache invalidation events // any non-empty error will fail the test so it's OK to do this multiple times removedQueryEventError.set( Optional.of("Unable to set reference for PS removal event")); + LOG.warn("Multiple PreparedStatementRemovalEvents received, ignoring subsequent ones"); } preparedStmtCacheRemoveLatch.countDown(); }); // alter test_type_caching_2 to trigger cache invalidation and above events + LOG.info("Executing ALTER TYPE test_type_caching_2 add i blob"); + LOG.info("Expected to invalidate statement 2 (queryId: {}) due to type change", queryId2); session.execute("ALTER TYPE test_type_caching_2 add i blob"); // Give a small delay to allow the schema change to propagate before checking agreement @@ -288,8 +301,10 @@ private void invalidationTestInner( + " - Cache removal latch success: %s (count: %d)\n" + " - Type change latch success: %s (count: %d)\n" + " - Current cache size: %d\n" - + " - Changed types detected: %s\n" - + " - Removed query IDs: %s\n" + + " - Expected changed types: %s\n" + + " - Actual changed types detected: %s\n" + + " - Expected removed query ID: %s\n" + + " - Actual removed query IDs: %s\n" + " - Type change errors: %s\n" + " - Removal event errors: %s", cacheRemovalSuccess, @@ -297,7 +312,9 @@ private void invalidationTestInner( typeChangeSuccess, typeChangeEventLatch.getCount(), getPreparedCacheSize(session), + expectedChangedTypes, changedTypes.keySet(), + queryId2, removedQueryIds.get(), typeChangeEventError.get(), removedQueryEventError.get()); From cf054fa174d56df50a46aeb8a909d745f6965871 Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 9 Jul 2025 08:47:35 -0700 Subject: [PATCH 16/24] empty From 904526568fd58e1fbd77b8564dbfbda726a31666 Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 9 Jul 2025 08:47:37 -0700 Subject: [PATCH 17/24] empty From 933bafbf67d6b91b28de8b9b63393f2c0aa9905f Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 9 Jul 2025 09:04:52 -0700 Subject: [PATCH 18/24] fmt --- .../oss/driver/core/cql/PreparedStatementCachingIT.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index b23b6886b05..02818ec40ef 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -251,8 +251,10 @@ private void invalidationTestInner( TypeChangeEvent.class, (e) -> { // expect one event per type changed and for every parent type that nests it - LOG.info("Received TypeChangeEvent for type: {} (changeType: {})", - e.oldType.getName(), e.changeType); + LOG.info( + "Received TypeChangeEvent for type: {} (changeType: {})", + e.oldType.getName(), + e.changeType); if (Boolean.TRUE.equals( changedTypes.putIfAbsent(e.oldType.getName().toString(), true))) { // store an error if we see duplicate change event @@ -271,7 +273,8 @@ private void invalidationTestInner( // any non-empty error will fail the test so it's OK to do this multiple times removedQueryEventError.set( Optional.of("Unable to set reference for PS removal event")); - LOG.warn("Multiple PreparedStatementRemovalEvents received, ignoring subsequent ones"); + LOG.warn( + "Multiple PreparedStatementRemovalEvents received, ignoring subsequent ones"); } preparedStmtCacheRemoveLatch.countDown(); }); From 6ec3c738168d0028676cdbee1b93e29bb1957a83 Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 9 Jul 2025 17:41:38 -0700 Subject: [PATCH 19/24] error --- .../core/cql/PreparedStatementCachingIT.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index 02818ec40ef..f21a0f9fe23 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -123,11 +123,11 @@ private static RemovalListener buildCacheRemoveCallback( @NonNull Optional context) { return (evt) -> { try { - LOG.info("Cache removal callback triggered, cause: {}", evt.getCause()); + LOG.error("Cache removal callback triggered, cause: {}", evt.getCause()); CompletableFuture future = (CompletableFuture) evt.getValue(); ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId(); - LOG.info("Firing PreparedStatementRemovalEvent for queryId: {}", queryId); + LOG.error("Firing PreparedStatementRemovalEvent for queryId: {}", queryId); context.ifPresent( ctx -> ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId))); } catch (Exception e) { @@ -231,9 +231,9 @@ private void invalidationTestInner( ByteBuffer queryId2 = stmt2.getId(); assertThat(getPreparedCacheSize(session)).isEqualTo(2); - LOG.info("Prepared statements in cache:"); - LOG.info(" Statement 1: {} (queryId: {})", preparedStmtQueryType1, stmt1.getId()); - LOG.info(" Statement 2: {} (queryId: {})", preparedStmtQueryType2, stmt2.getId()); + LOG.error("Prepared statements in cache:"); + LOG.error(" Statement 1: {} (queryId: {})", preparedStmtQueryType1, stmt1.getId()); + LOG.error(" Statement 2: {} (queryId: {})", preparedStmtQueryType2, stmt2.getId()); CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch(1); CountDownLatch typeChangeEventLatch = new CountDownLatch(expectedChangedTypes.size()); @@ -251,7 +251,7 @@ private void invalidationTestInner( TypeChangeEvent.class, (e) -> { // expect one event per type changed and for every parent type that nests it - LOG.info( + LOG.error( "Received TypeChangeEvent for type: {} (changeType: {})", e.oldType.getName(), e.changeType); @@ -267,7 +267,7 @@ private void invalidationTestInner( .register( PreparedStatementRemovalEvent.class, (e) -> { - LOG.info("Received PreparedStatementRemovalEvent for queryId: {}", e.queryId); + LOG.error("Received PreparedStatementRemovalEvent for queryId: {}", e.queryId); if (!removedQueryIds.compareAndSet(Optional.empty(), Optional.of(e.queryId))) { // store an error if we see multiple cache invalidation events // any non-empty error will fail the test so it's OK to do this multiple times @@ -280,8 +280,8 @@ private void invalidationTestInner( }); // alter test_type_caching_2 to trigger cache invalidation and above events - LOG.info("Executing ALTER TYPE test_type_caching_2 add i blob"); - LOG.info("Expected to invalidate statement 2 (queryId: {}) due to type change", queryId2); + LOG.error("Executing ALTER TYPE test_type_caching_2 add i blob"); + LOG.error("Expected to invalidate statement 2 (queryId: {}) due to type change", queryId2); session.execute("ALTER TYPE test_type_caching_2 add i blob"); // Give a small delay to allow the schema change to propagate before checking agreement From 9ee5a8b8e3687aaf2181b853c813d286fe749da9 Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 9 Jul 2025 22:44:13 -0700 Subject: [PATCH 20/24] add logging --- .../core/cql/PreparedStatementCachingIT.java | 120 ++++++++++++++++-- 1 file changed, 109 insertions(+), 11 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index f21a0f9fe23..a8261b290fc 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -123,13 +123,30 @@ private static RemovalListener buildCacheRemoveCallback( @NonNull Optional context) { return (evt) -> { try { - LOG.error("Cache removal callback triggered, cause: {}", evt.getCause()); + LOG.error( + "Cache removal callback triggered, cause: {}, key: {}", evt.getCause(), evt.getKey()); CompletableFuture future = (CompletableFuture) evt.getValue(); - ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId(); - LOG.error("Firing PreparedStatementRemovalEvent for queryId: {}", queryId); - context.ifPresent( - ctx -> ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId))); + + // Add more detailed logging about the future state + LOG.error( + "Future state - done: {}, cancelled: {}, completedExceptionally: {}", + future.isDone(), + future.isCancelled(), + future.isCompletedExceptionally()); + + if (future.isDone() && !future.isCompletedExceptionally() && !future.isCancelled()) { + ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId(); + LOG.error("Firing PreparedStatementRemovalEvent for queryId: {}", queryId); + context.ifPresent( + ctx -> { + LOG.error("About to fire PreparedStatementRemovalEvent on event bus"); + ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId)); + LOG.error("PreparedStatementRemovalEvent fired successfully"); + }); + } else { + LOG.error("Skipping removal event - future not in valid state for extraction"); + } } catch (Exception e) { LOG.error("Unable to register removal handler", e); } @@ -194,6 +211,68 @@ public static SessionBuilder builder() { return new TestSessionBuilder(); } + private void debugCacheInvalidation(CqlSession session, TypeChangeEvent event) { + try { + DefaultDriverContext ctx = (DefaultDriverContext) session.getContext(); + // Get the processor to check cache state + RequestProcessorRegistry registry = ctx.getRequestProcessorRegistry(); + + LOG.error( + "Debug: TypeChangeEvent received for type: {} (changeType: {})", + event.oldType.getName(), + event.changeType); + LOG.error("Debug: Current cache size: {}", getPreparedCacheSize(session)); + + // Force cache cleanup to trigger any pending removals + if (registry != null) { + LOG.error("Debug: Forcing cache cleanup..."); + // We can't directly access the cache from here, but we can log that we're trying + } + } catch (Exception e) { + LOG.error("Debug: Error during cache invalidation debugging", e); + } + } + + private boolean waitForCacheRemovalWithCleanup( + CountDownLatch latch, CqlSession session, long timeout, TimeUnit unit) { + long timeoutNanos = unit.toNanos(timeout); + long startTime = System.nanoTime(); + long remainingNanos = timeoutNanos; + + while (remainingNanos > 0 && latch.getCount() > 0) { + // Wait for a short period + long waitTime = Math.min(remainingNanos, TimeUnit.SECONDS.toNanos(5)); + boolean success = + Uninterruptibles.awaitUninterruptibly(latch, waitTime, TimeUnit.NANOSECONDS); + + if (success) { + LOG.error("Cache removal latch triggered successfully"); + return true; + } + + // If we haven't succeeded yet, try to force cache cleanup + LOG.error( + "Cache removal latch not triggered yet, forcing cleanup. Current cache size: {}", + getPreparedCacheSize(session)); + + try { + // Force garbage collection to help with weak references + System.gc(); + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + + remainingNanos = timeoutNanos - (System.nanoTime() - startTime); + } + + LOG.error( + "Cache removal latch failed to trigger within timeout. Final cache size: {}", + getPreparedCacheSize(session)); + return false; + } + private void invalidationResultSetTest( Consumer setupTestSchema, Set expectedChangedTypes) { invalidationTestInner( @@ -223,7 +302,15 @@ private void invalidationTestInner( try (CqlSession session = sessionWithCacheSizeMetric()) { - assertThat(getPreparedCacheSize(session)).isEqualTo(0); + // Ensure we start with a clean cache + long initialCacheSize = getPreparedCacheSize(session); + LOG.error("Starting test with cache size: {}", initialCacheSize); + assertThat(initialCacheSize).isEqualTo(0); + + // Force garbage collection to ensure clean state + System.gc(); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + setupTestSchema.accept(session); PreparedStatement stmt1 = session.prepare(preparedStmtQueryType1); @@ -255,6 +342,10 @@ private void invalidationTestInner( "Received TypeChangeEvent for type: {} (changeType: {})", e.oldType.getName(), e.changeType); + + // Add detailed debugging for cache invalidation + debugCacheInvalidation(session, e); + if (Boolean.TRUE.equals( changedTypes.putIfAbsent(e.oldType.getName().toString(), true))) { // store an error if we see duplicate change event @@ -284,18 +375,25 @@ private void invalidationTestInner( LOG.error("Expected to invalidate statement 2 (queryId: {}) due to type change", queryId2); session.execute("ALTER TYPE test_type_caching_2 add i blob"); - // Give a small delay to allow the schema change to propagate before checking agreement - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + // Give a longer delay to allow the schema change to propagate before checking agreement + LOG.error("Waiting for schema change to propagate..."); + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); session.checkSchemaAgreement(); + // Additional delay to allow event processing to complete + LOG.error("Waiting for event processing to complete..."); + Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); + // wait for latches and fail if they don't reach zero before timeout // Use longer timeout for cache removal as it depends on complex event chain - boolean cacheRemovalSuccess = - Uninterruptibles.awaitUninterruptibly( - preparedStmtCacheRemoveLatch, 180, TimeUnit.SECONDS); boolean typeChangeSuccess = Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 60, TimeUnit.SECONDS); + // For cache removal, use a more robust waiting mechanism with periodic cleanup + boolean cacheRemovalSuccess = + waitForCacheRemovalWithCleanup( + preparedStmtCacheRemoveLatch, session, 180, TimeUnit.SECONDS); + // Provide detailed diagnostics if either latch fails if (!cacheRemovalSuccess || !typeChangeSuccess) { String diagnostics = From 3289fda9fb48142ffedf0d0b8f2f4f42c8a0a1a6 Mon Sep 17 00:00:00 2001 From: janehe Date: Thu, 10 Jul 2025 07:27:23 -0700 Subject: [PATCH 21/24] register listener first, create prepare later --- .../core/cql/PreparedStatementCachingIT.java | 54 +++++++++++++------ 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index a8261b290fc..1551af06466 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -239,20 +239,34 @@ private boolean waitForCacheRemovalWithCleanup( long startTime = System.nanoTime(); long remainingNanos = timeoutNanos; + LOG.error("Starting cache removal wait, initial latch count: {}", latch.getCount()); + + // First check if the latch is already at 0 + if (latch.getCount() == 0) { + LOG.error("Cache removal latch already at 0, returning success immediately"); + return true; + } + while (remainingNanos > 0 && latch.getCount() > 0) { // Wait for a short period long waitTime = Math.min(remainingNanos, TimeUnit.SECONDS.toNanos(5)); + LOG.error( + "Waiting for cache removal latch, current count: {}, wait time: {}s", + latch.getCount(), + TimeUnit.NANOSECONDS.toSeconds(waitTime)); + boolean success = Uninterruptibles.awaitUninterruptibly(latch, waitTime, TimeUnit.NANOSECONDS); if (success) { - LOG.error("Cache removal latch triggered successfully"); + LOG.error("Cache removal latch triggered successfully, final count: {}", latch.getCount()); return true; } // If we haven't succeeded yet, try to force cache cleanup LOG.error( - "Cache removal latch not triggered yet, forcing cleanup. Current cache size: {}", + "Cache removal latch not triggered yet (count: {}), forcing cleanup. Current cache size: {}", + latch.getCount(), getPreparedCacheSize(session)); try { @@ -268,9 +282,10 @@ private boolean waitForCacheRemovalWithCleanup( } LOG.error( - "Cache removal latch failed to trigger within timeout. Final cache size: {}", + "Cache removal latch failed to trigger within timeout. Final latch count: {}, cache size: {}", + latch.getCount(), getPreparedCacheSize(session)); - return false; + return latch.getCount() == 0; // Return true if latch reached 0 even if await timed out } private void invalidationResultSetTest( @@ -313,15 +328,7 @@ private void invalidationTestInner( setupTestSchema.accept(session); - PreparedStatement stmt1 = session.prepare(preparedStmtQueryType1); - PreparedStatement stmt2 = session.prepare(preparedStmtQueryType2); - ByteBuffer queryId2 = stmt2.getId(); - assertThat(getPreparedCacheSize(session)).isEqualTo(2); - - LOG.error("Prepared statements in cache:"); - LOG.error(" Statement 1: {} (queryId: {})", preparedStmtQueryType1, stmt1.getId()); - LOG.error(" Statement 2: {} (queryId: {})", preparedStmtQueryType2, stmt2.getId()); - + // Set up event handlers BEFORE creating prepared statements to avoid race conditions CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch(1); CountDownLatch typeChangeEventLatch = new CountDownLatch(expectedChangedTypes.size()); @@ -333,6 +340,8 @@ private void invalidationTestInner( new AtomicReference<>(Optional.empty()); AtomicReference> removedQueryEventError = new AtomicReference<>(Optional.empty()); + + LOG.error("Registering event handlers before creating prepared statements"); ctx.getEventBus() .register( TypeChangeEvent.class, @@ -367,9 +376,24 @@ private void invalidationTestInner( LOG.warn( "Multiple PreparedStatementRemovalEvents received, ignoring subsequent ones"); } + LOG.error( + "About to countdown preparedStmtCacheRemoveLatch, current count: {}", + preparedStmtCacheRemoveLatch.getCount()); preparedStmtCacheRemoveLatch.countDown(); + LOG.error( + "Countdown completed, new count: {}", preparedStmtCacheRemoveLatch.getCount()); }); + // Now create the prepared statements + PreparedStatement stmt1 = session.prepare(preparedStmtQueryType1); + PreparedStatement stmt2 = session.prepare(preparedStmtQueryType2); + ByteBuffer queryId2 = stmt2.getId(); + assertThat(getPreparedCacheSize(session)).isEqualTo(2); + + LOG.error("Prepared statements in cache:"); + LOG.error(" Statement 1: {} (queryId: {})", preparedStmtQueryType1, stmt1.getId()); + LOG.error(" Statement 2: {} (queryId: {})", preparedStmtQueryType2, stmt2.getId()); + // alter test_type_caching_2 to trigger cache invalidation and above events LOG.error("Executing ALTER TYPE test_type_caching_2 add i blob"); LOG.error("Expected to invalidate statement 2 (queryId: {}) due to type change", queryId2); @@ -380,10 +404,6 @@ private void invalidationTestInner( Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); session.checkSchemaAgreement(); - // Additional delay to allow event processing to complete - LOG.error("Waiting for event processing to complete..."); - Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); - // wait for latches and fail if they don't reach zero before timeout // Use longer timeout for cache removal as it depends on complex event chain boolean typeChangeSuccess = From 98eb7570b80977ae1c6b837f387eb7fae4fdc724 Mon Sep 17 00:00:00 2001 From: janehe Date: Fri, 11 Jul 2025 12:18:15 -0700 Subject: [PATCH 22/24] strong values of cache --- .../core/cql/CqlPrepareAsyncProcessor.java | 13 ++ .../core/cql/PreparedStatementCachingIT.java | 199 ++---------------- 2 files changed, 28 insertions(+), 184 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java index a3d11cff054..4c09852c7e2 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java @@ -83,6 +83,19 @@ protected CqlPrepareAsyncProcessor( }); } + protected CqlPrepareAsyncProcessor( + Optional context, CacheBuilder cacheBuilder) { + this.cache = cacheBuilder.build(); + context.ifPresent( + (ctx) -> { + LOG.info("Adding handler to invalidate cached prepared statements on type changes"); + EventExecutor adminExecutor = ctx.getNettyOptions().adminEventExecutorGroup().next(); + ctx.getEventBus() + .register( + TypeChangeEvent.class, RunOrSchedule.on(adminExecutor, this::onTypeChanged)); + }); + } + private static boolean typeMatches(UserDefinedType oldType, DataType typeToCheck) { switch (typeToCheck.getProtocolCode()) { diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index 1551af06466..d424613c18b 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -40,6 +40,7 @@ import com.datastax.oss.driver.internal.core.session.BuiltInRequestProcessors; import com.datastax.oss.driver.internal.core.session.RequestProcessor; import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry; +import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder; import com.datastax.oss.driver.shaded.guava.common.cache.RemovalListener; import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles; import com.google.common.collect.ImmutableList; @@ -72,8 +73,6 @@ @Category(IsolatedTests.class) public class PreparedStatementCachingIT { - private static final Logger LOG = LoggerFactory.getLogger(PreparedStatementCachingIT.class); - private CustomCcmRule ccmRule = CustomCcmRule.builder().build(); private SessionRule sessionRule = @@ -123,30 +122,11 @@ private static RemovalListener buildCacheRemoveCallback( @NonNull Optional context) { return (evt) -> { try { - LOG.error( - "Cache removal callback triggered, cause: {}, key: {}", evt.getCause(), evt.getKey()); CompletableFuture future = (CompletableFuture) evt.getValue(); - - // Add more detailed logging about the future state - LOG.error( - "Future state - done: {}, cancelled: {}, completedExceptionally: {}", - future.isDone(), - future.isCancelled(), - future.isCompletedExceptionally()); - - if (future.isDone() && !future.isCompletedExceptionally() && !future.isCancelled()) { - ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId(); - LOG.error("Firing PreparedStatementRemovalEvent for queryId: {}", queryId); - context.ifPresent( - ctx -> { - LOG.error("About to fire PreparedStatementRemovalEvent on event bus"); - ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId)); - LOG.error("PreparedStatementRemovalEvent fired successfully"); - }); - } else { - LOG.error("Skipping removal event - future not in valid state for extraction"); - } + ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId(); + context.ifPresent( + ctx -> ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId))); } catch (Exception e) { LOG.error("Unable to register removal handler", e); } @@ -156,7 +136,8 @@ private static RemovalListener buildCacheRemoveCallback( public TestCqlPrepareAsyncProcessor(@NonNull Optional context) { // Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so // to prevent cache entries from unexpectedly disappearing mid-test. - super(context, builder -> builder.removalListener(buildCacheRemoveCallback(context))); + // TODO: it was still weak value cuz it's only a decorator. + super(context, CacheBuilder.newBuilder().removalListener(buildCacheRemoveCallback(context))); } } @@ -211,83 +192,6 @@ public static SessionBuilder builder() { return new TestSessionBuilder(); } - private void debugCacheInvalidation(CqlSession session, TypeChangeEvent event) { - try { - DefaultDriverContext ctx = (DefaultDriverContext) session.getContext(); - // Get the processor to check cache state - RequestProcessorRegistry registry = ctx.getRequestProcessorRegistry(); - - LOG.error( - "Debug: TypeChangeEvent received for type: {} (changeType: {})", - event.oldType.getName(), - event.changeType); - LOG.error("Debug: Current cache size: {}", getPreparedCacheSize(session)); - - // Force cache cleanup to trigger any pending removals - if (registry != null) { - LOG.error("Debug: Forcing cache cleanup..."); - // We can't directly access the cache from here, but we can log that we're trying - } - } catch (Exception e) { - LOG.error("Debug: Error during cache invalidation debugging", e); - } - } - - private boolean waitForCacheRemovalWithCleanup( - CountDownLatch latch, CqlSession session, long timeout, TimeUnit unit) { - long timeoutNanos = unit.toNanos(timeout); - long startTime = System.nanoTime(); - long remainingNanos = timeoutNanos; - - LOG.error("Starting cache removal wait, initial latch count: {}", latch.getCount()); - - // First check if the latch is already at 0 - if (latch.getCount() == 0) { - LOG.error("Cache removal latch already at 0, returning success immediately"); - return true; - } - - while (remainingNanos > 0 && latch.getCount() > 0) { - // Wait for a short period - long waitTime = Math.min(remainingNanos, TimeUnit.SECONDS.toNanos(5)); - LOG.error( - "Waiting for cache removal latch, current count: {}, wait time: {}s", - latch.getCount(), - TimeUnit.NANOSECONDS.toSeconds(waitTime)); - - boolean success = - Uninterruptibles.awaitUninterruptibly(latch, waitTime, TimeUnit.NANOSECONDS); - - if (success) { - LOG.error("Cache removal latch triggered successfully, final count: {}", latch.getCount()); - return true; - } - - // If we haven't succeeded yet, try to force cache cleanup - LOG.error( - "Cache removal latch not triggered yet (count: {}), forcing cleanup. Current cache size: {}", - latch.getCount(), - getPreparedCacheSize(session)); - - try { - // Force garbage collection to help with weak references - System.gc(); - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - - remainingNanos = timeoutNanos - (System.nanoTime() - startTime); - } - - LOG.error( - "Cache removal latch failed to trigger within timeout. Final latch count: {}, cache size: {}", - latch.getCount(), - getPreparedCacheSize(session)); - return latch.getCount() == 0; // Return true if latch reached 0 even if await timed out - } - private void invalidationResultSetTest( Consumer setupTestSchema, Set expectedChangedTypes) { invalidationTestInner( @@ -317,18 +221,13 @@ private void invalidationTestInner( try (CqlSession session = sessionWithCacheSizeMetric()) { - // Ensure we start with a clean cache - long initialCacheSize = getPreparedCacheSize(session); - LOG.error("Starting test with cache size: {}", initialCacheSize); - assertThat(initialCacheSize).isEqualTo(0); - - // Force garbage collection to ensure clean state - System.gc(); - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - + assertThat(getPreparedCacheSize(session)).isEqualTo(0); setupTestSchema.accept(session); - // Set up event handlers BEFORE creating prepared statements to avoid race conditions + session.prepare(preparedStmtQueryType1); + ByteBuffer queryId2 = session.prepare(preparedStmtQueryType2).getId(); + assertThat(getPreparedCacheSize(session)).isEqualTo(2); + CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch(1); CountDownLatch typeChangeEventLatch = new CountDownLatch(expectedChangedTypes.size()); @@ -340,21 +239,11 @@ private void invalidationTestInner( new AtomicReference<>(Optional.empty()); AtomicReference> removedQueryEventError = new AtomicReference<>(Optional.empty()); - - LOG.error("Registering event handlers before creating prepared statements"); ctx.getEventBus() .register( TypeChangeEvent.class, (e) -> { // expect one event per type changed and for every parent type that nests it - LOG.error( - "Received TypeChangeEvent for type: {} (changeType: {})", - e.oldType.getName(), - e.changeType); - - // Add detailed debugging for cache invalidation - debugCacheInvalidation(session, e); - if (Boolean.TRUE.equals( changedTypes.putIfAbsent(e.oldType.getName().toString(), true))) { // store an error if we see duplicate change event @@ -367,85 +256,27 @@ private void invalidationTestInner( .register( PreparedStatementRemovalEvent.class, (e) -> { - LOG.error("Received PreparedStatementRemovalEvent for queryId: {}", e.queryId); if (!removedQueryIds.compareAndSet(Optional.empty(), Optional.of(e.queryId))) { // store an error if we see multiple cache invalidation events // any non-empty error will fail the test so it's OK to do this multiple times removedQueryEventError.set( Optional.of("Unable to set reference for PS removal event")); - LOG.warn( - "Multiple PreparedStatementRemovalEvents received, ignoring subsequent ones"); } - LOG.error( - "About to countdown preparedStmtCacheRemoveLatch, current count: {}", - preparedStmtCacheRemoveLatch.getCount()); preparedStmtCacheRemoveLatch.countDown(); - LOG.error( - "Countdown completed, new count: {}", preparedStmtCacheRemoveLatch.getCount()); }); - // Now create the prepared statements - PreparedStatement stmt1 = session.prepare(preparedStmtQueryType1); - PreparedStatement stmt2 = session.prepare(preparedStmtQueryType2); - ByteBuffer queryId2 = stmt2.getId(); - assertThat(getPreparedCacheSize(session)).isEqualTo(2); - - LOG.error("Prepared statements in cache:"); - LOG.error(" Statement 1: {} (queryId: {})", preparedStmtQueryType1, stmt1.getId()); - LOG.error(" Statement 2: {} (queryId: {})", preparedStmtQueryType2, stmt2.getId()); - // alter test_type_caching_2 to trigger cache invalidation and above events - LOG.error("Executing ALTER TYPE test_type_caching_2 add i blob"); - LOG.error("Expected to invalidate statement 2 (queryId: {}) due to type change", queryId2); session.execute("ALTER TYPE test_type_caching_2 add i blob"); - // Give a longer delay to allow the schema change to propagate before checking agreement - LOG.error("Waiting for schema change to propagate..."); - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); session.checkSchemaAgreement(); // wait for latches and fail if they don't reach zero before timeout - // Use longer timeout for cache removal as it depends on complex event chain - boolean typeChangeSuccess = - Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 60, TimeUnit.SECONDS); - - // For cache removal, use a more robust waiting mechanism with periodic cleanup - boolean cacheRemovalSuccess = - waitForCacheRemovalWithCleanup( - preparedStmtCacheRemoveLatch, session, 180, TimeUnit.SECONDS); - - // Provide detailed diagnostics if either latch fails - if (!cacheRemovalSuccess || !typeChangeSuccess) { - String diagnostics = - String.format( - "Test failure diagnostics:\n" - + " - Cache removal latch success: %s (count: %d)\n" - + " - Type change latch success: %s (count: %d)\n" - + " - Current cache size: %d\n" - + " - Expected changed types: %s\n" - + " - Actual changed types detected: %s\n" - + " - Expected removed query ID: %s\n" - + " - Actual removed query IDs: %s\n" - + " - Type change errors: %s\n" - + " - Removal event errors: %s", - cacheRemovalSuccess, - preparedStmtCacheRemoveLatch.getCount(), - typeChangeSuccess, - typeChangeEventLatch.getCount(), - getPreparedCacheSize(session), - expectedChangedTypes, - changedTypes.keySet(), - queryId2, - removedQueryIds.get(), - typeChangeEventError.get(), - removedQueryEventError.get()); - LOG.error("Prepared statement cache invalidation test failed: {}", diagnostics); - } - - assertThat(cacheRemovalSuccess) + assertThat( + Uninterruptibles.awaitUninterruptibly( + preparedStmtCacheRemoveLatch, 120, TimeUnit.SECONDS)) .withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout") .isTrue(); - assertThat(typeChangeSuccess) + assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 20, TimeUnit.SECONDS)) .withFailMessage("typeChangeEventLatch did not trigger before timeout") .isTrue(); From 4f22bd6720d64b227eb3b5f40f65921efd09ded9 Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 16 Jul 2025 01:54:28 -0700 Subject: [PATCH 23/24] CancellationIT use strong values cache without cache removal callback --- .../core/cql/PreparedStatementCachingIT.java | 1 - .../cql/PreparedStatementCancellationIT.java | 77 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index d424613c18b..ea00db7c4a3 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -136,7 +136,6 @@ private static RemovalListener buildCacheRemoveCallback( public TestCqlPrepareAsyncProcessor(@NonNull Optional context) { // Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so // to prevent cache entries from unexpectedly disappearing mid-test. - // TODO: it was still weak value cuz it's only a decorator. super(context, CacheBuilder.newBuilder().removalListener(buildCacheRemoveCallback(context))); } } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCancellationIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCancellationIT.java index d7e581e4606..62e78586612 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCancellationIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCancellationIT.java @@ -21,20 +21,34 @@ import static org.junit.Assert.fail; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.cql.PrepareRequest; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.session.ProgrammaticArguments; +import com.datastax.oss.driver.api.core.session.SessionBuilder; import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule; import com.datastax.oss.driver.api.testinfra.session.SessionRule; import com.datastax.oss.driver.api.testinfra.session.SessionUtils; import com.datastax.oss.driver.categories.IsolatedTests; import com.datastax.oss.driver.internal.core.context.DefaultDriverContext; import com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor; +import com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor; +import com.datastax.oss.driver.internal.core.session.BuiltInRequestProcessors; +import com.datastax.oss.driver.internal.core.session.RequestProcessor; +import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry; import com.datastax.oss.driver.shaded.guava.common.base.Predicates; import com.datastax.oss.driver.shaded.guava.common.cache.Cache; +import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder; import com.datastax.oss.driver.shaded.guava.common.collect.Iterables; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -50,6 +64,69 @@ public class PreparedStatementCancellationIT { @Rule public TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule); + private static class TestCqlPrepareAsyncProcessor extends CqlPrepareAsyncProcessor { + + public TestCqlPrepareAsyncProcessor(@NonNull Optional context) { + // Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so + // to prevent cache entries from unexpectedly disappearing mid-test. + super(context, CacheBuilder.newBuilder()); + } + } + + private static class TestDefaultDriverContext extends DefaultDriverContext { + public TestDefaultDriverContext( + DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) { + super(configLoader, programmaticArguments); + } + + @Override + protected RequestProcessorRegistry buildRequestProcessorRegistry() { + // Re-create the processor cache to insert the TestCqlPrepareAsyncProcessor with it's strong + // prepared statement cache, see JAVA-3062 + List> processors = + BuiltInRequestProcessors.createDefaultProcessors(this); + processors.removeIf((processor) -> processor instanceof CqlPrepareAsyncProcessor); + processors.removeIf((processor) -> processor instanceof CqlPrepareSyncProcessor); + CqlPrepareAsyncProcessor asyncProcessor = + new PreparedStatementCancellationIT.TestCqlPrepareAsyncProcessor(Optional.of(this)); + processors.add(2, asyncProcessor); + processors.add(3, new CqlPrepareSyncProcessor(asyncProcessor)); + return new RequestProcessorRegistry( + getSessionName(), processors.toArray(new RequestProcessor[0])); + } + } + + private static class TestSessionBuilder extends SessionBuilder { + + @Override + protected Object wrap(@NonNull CqlSession defaultSession) { + return defaultSession; + } + + @Override + protected DriverContext buildContext( + DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) { + return new PreparedStatementCancellationIT.TestDefaultDriverContext( + configLoader, programmaticArguments); + } + } + + @BeforeClass + public static void setupBeforeClass() { + System.setProperty( + SessionUtils.SESSION_BUILDER_CLASS_PROPERTY, + PreparedStatementCancellationIT.class.getName()); + } + + @AfterClass + public static void teardownAfterClass() { + System.clearProperty(SessionUtils.SESSION_BUILDER_CLASS_PROPERTY); + } + + public static SessionBuilder builder() { + return new PreparedStatementCancellationIT.TestSessionBuilder(); + } + @Before public void setup() { From 9ba82036ba059f1ea41057378d8750cc67b9f6a7 Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 16 Jul 2025 14:13:01 -0700 Subject: [PATCH 24/24] empty