Skip to content

Commit eb72805

Browse files
committed
refactor to PreparedStatementITBase, use strong values for cache
1 parent c0d2f74 commit eb72805

File tree

3 files changed

+196
-130
lines changed

3 files changed

+196
-130
lines changed

integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java

Lines changed: 1 addition & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -22,56 +22,36 @@
2222
import com.codahale.metrics.Gauge;
2323
import com.datastax.oss.driver.api.core.CqlSession;
2424
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
25-
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
26-
import com.datastax.oss.driver.api.core.context.DriverContext;
27-
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
2825
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
29-
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
30-
import com.datastax.oss.driver.api.core.session.SessionBuilder;
3126
import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule;
3227
import com.datastax.oss.driver.api.testinfra.ccm.SchemaChangeSynchronizer;
3328
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
3429
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
3530
import com.datastax.oss.driver.categories.IsolatedTests;
3631
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
37-
import com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor;
38-
import com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor;
3932
import com.datastax.oss.driver.internal.core.metadata.schema.events.TypeChangeEvent;
40-
import com.datastax.oss.driver.internal.core.session.BuiltInRequestProcessors;
41-
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
42-
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
43-
import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder;
44-
import com.datastax.oss.driver.shaded.guava.common.cache.RemovalListener;
4533
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles;
4634
import com.google.common.collect.ImmutableList;
4735
import com.google.common.collect.ImmutableSet;
48-
import edu.umd.cs.findbugs.annotations.NonNull;
4936
import java.nio.ByteBuffer;
5037
import java.time.Duration;
51-
import java.util.List;
5238
import java.util.Map;
53-
import java.util.Objects;
5439
import java.util.Optional;
5540
import java.util.Set;
56-
import java.util.concurrent.CompletableFuture;
5741
import java.util.concurrent.ConcurrentHashMap;
5842
import java.util.concurrent.CountDownLatch;
5943
import java.util.concurrent.TimeUnit;
6044
import java.util.concurrent.atomic.AtomicReference;
6145
import java.util.function.Consumer;
62-
import org.junit.AfterClass;
63-
import org.junit.BeforeClass;
6446
import org.junit.Rule;
6547
import org.junit.Test;
6648
import org.junit.experimental.categories.Category;
6749
import org.junit.rules.RuleChain;
6850
import org.junit.rules.TestRule;
69-
import org.slf4j.Logger;
70-
import org.slf4j.LoggerFactory;
7151

7252
// These tests must be isolated because setup modifies SessionUtils.SESSION_BUILDER_CLASS_PROPERTY
7353
@Category(IsolatedTests.class)
74-
public class PreparedStatementCachingIT {
54+
public class PreparedStatementCachingIT extends PreparedStatementITBase {
7555

7656
private CustomCcmRule ccmRule = CustomCcmRule.builder().build();
7757

@@ -86,112 +66,6 @@ public class PreparedStatementCachingIT {
8666

8767
@Rule public TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule);
8868

89-
private static class PreparedStatementRemovalEvent {
90-
91-
private final ByteBuffer queryId;
92-
93-
public PreparedStatementRemovalEvent(ByteBuffer queryId) {
94-
this.queryId = queryId;
95-
}
96-
97-
@Override
98-
public boolean equals(Object o) {
99-
if (this == o) return true;
100-
if (o == null || !(o instanceof PreparedStatementRemovalEvent)) return false;
101-
PreparedStatementRemovalEvent that = (PreparedStatementRemovalEvent) o;
102-
return Objects.equals(queryId, that.queryId);
103-
}
104-
105-
@Override
106-
public int hashCode() {
107-
return Objects.hash(queryId);
108-
}
109-
110-
@Override
111-
public String toString() {
112-
return "PreparedStatementRemovalEvent{" + "queryId=" + queryId + '}';
113-
}
114-
}
115-
116-
private static class TestCqlPrepareAsyncProcessor extends CqlPrepareAsyncProcessor {
117-
118-
private static final Logger LOG =
119-
LoggerFactory.getLogger(PreparedStatementCachingIT.TestCqlPrepareAsyncProcessor.class);
120-
121-
private static RemovalListener<Object, Object> buildCacheRemoveCallback(
122-
@NonNull Optional<DefaultDriverContext> context) {
123-
return (evt) -> {
124-
try {
125-
CompletableFuture<PreparedStatement> future =
126-
(CompletableFuture<PreparedStatement>) evt.getValue();
127-
ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId();
128-
context.ifPresent(
129-
ctx -> ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId)));
130-
} catch (Exception e) {
131-
LOG.error("Unable to register removal handler", e);
132-
}
133-
};
134-
}
135-
136-
public TestCqlPrepareAsyncProcessor(@NonNull Optional<DefaultDriverContext> context) {
137-
// Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so
138-
// to prevent cache entries from unexpectedly disappearing mid-test.
139-
// TODO: it was still weak value cuz it's only a decorator.
140-
super(context, CacheBuilder.newBuilder().removalListener(buildCacheRemoveCallback(context)));
141-
}
142-
}
143-
144-
private static class TestDefaultDriverContext extends DefaultDriverContext {
145-
public TestDefaultDriverContext(
146-
DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
147-
super(configLoader, programmaticArguments);
148-
}
149-
150-
@Override
151-
protected RequestProcessorRegistry buildRequestProcessorRegistry() {
152-
// Re-create the processor cache to insert the TestCqlPrepareAsyncProcessor with it's strong
153-
// prepared statement cache, see JAVA-3062
154-
List<RequestProcessor<?, ?>> processors =
155-
BuiltInRequestProcessors.createDefaultProcessors(this);
156-
processors.removeIf((processor) -> processor instanceof CqlPrepareAsyncProcessor);
157-
processors.removeIf((processor) -> processor instanceof CqlPrepareSyncProcessor);
158-
CqlPrepareAsyncProcessor asyncProcessor = new TestCqlPrepareAsyncProcessor(Optional.of(this));
159-
processors.add(2, asyncProcessor);
160-
processors.add(3, new CqlPrepareSyncProcessor(asyncProcessor));
161-
return new RequestProcessorRegistry(
162-
getSessionName(), processors.toArray(new RequestProcessor[0]));
163-
}
164-
}
165-
166-
private static class TestSessionBuilder extends SessionBuilder {
167-
168-
@Override
169-
protected Object wrap(@NonNull CqlSession defaultSession) {
170-
return defaultSession;
171-
}
172-
173-
@Override
174-
protected DriverContext buildContext(
175-
DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
176-
return new TestDefaultDriverContext(configLoader, programmaticArguments);
177-
}
178-
}
179-
180-
@BeforeClass
181-
public static void setup() {
182-
System.setProperty(
183-
SessionUtils.SESSION_BUILDER_CLASS_PROPERTY, PreparedStatementCachingIT.class.getName());
184-
}
185-
186-
@AfterClass
187-
public static void teardown() {
188-
System.clearProperty(SessionUtils.SESSION_BUILDER_CLASS_PROPERTY);
189-
}
190-
191-
public static SessionBuilder builder() {
192-
return new TestSessionBuilder();
193-
}
194-
19569
private void invalidationResultSetTest(
19670
Consumer<CqlSession> setupTestSchema, Set<String> expectedChangedTypes) {
19771
invalidationTestInner(

integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCancellationIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.junit.rules.TestRule;
4343

4444
@Category(IsolatedTests.class)
45-
public class PreparedStatementCancellationIT {
45+
public class PreparedStatementCancellationIT extends PreparedStatementITBase {
4646

4747
private CustomCcmRule ccmRule = CustomCcmRule.builder().build();
4848

@@ -51,7 +51,7 @@ public class PreparedStatementCancellationIT {
5151
@Rule public TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule);
5252

5353
@Before
54-
public void setup() {
54+
public void setupEach() {
5555

5656
CqlSession session = SessionUtils.newSession(ccmRule, sessionRule.keyspace());
5757
session.execute("DROP TABLE IF EXISTS test_table_1");
@@ -63,7 +63,7 @@ public void setup() {
6363
}
6464

6565
@After
66-
public void teardown() {
66+
public void teardownEach() {
6767

6868
CqlSession session = SessionUtils.newSession(ccmRule, sessionRule.keyspace());
6969
session.execute("DROP TABLE test_table_1");
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.core.cql;
19+
20+
import com.datastax.oss.driver.api.core.CqlSession;
21+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
22+
import com.datastax.oss.driver.api.core.context.DriverContext;
23+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
24+
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
25+
import com.datastax.oss.driver.api.core.session.SessionBuilder;
26+
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
27+
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
28+
import com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor;
29+
import com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor;
30+
import com.datastax.oss.driver.internal.core.session.BuiltInRequestProcessors;
31+
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
32+
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
33+
import com.datastax.oss.driver.shaded.guava.common.cache.RemovalListener;
34+
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles;
35+
import edu.umd.cs.findbugs.annotations.NonNull;
36+
import java.nio.ByteBuffer;
37+
import java.util.List;
38+
import java.util.Optional;
39+
import java.util.concurrent.CompletableFuture;
40+
import org.junit.AfterClass;
41+
import org.junit.BeforeClass;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
44+
45+
/**
46+
* Base class for prepared statement integration tests that provides a customized
47+
* CqlPrepareAsyncProcessor with strong values cache to prevent cache entries from unexpectedly
48+
* disappearing during tests.
49+
*/
50+
public abstract class PreparedStatementITBase {
51+
52+
private static final Logger LOG = LoggerFactory.getLogger(PreparedStatementITBase.class);
53+
54+
/** Event class for prepared statement removal notifications. */
55+
protected static class PreparedStatementRemovalEvent {
56+
57+
public final ByteBuffer queryId;
58+
59+
public PreparedStatementRemovalEvent(ByteBuffer queryId) {
60+
this.queryId = queryId;
61+
}
62+
63+
@Override
64+
public boolean equals(Object o) {
65+
if (this == o) return true;
66+
if (o == null || !(o instanceof PreparedStatementRemovalEvent)) return false;
67+
PreparedStatementRemovalEvent that = (PreparedStatementRemovalEvent) o;
68+
return java.util.Objects.equals(queryId, that.queryId);
69+
}
70+
71+
@Override
72+
public int hashCode() {
73+
return java.util.Objects.hash(queryId);
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return "PreparedStatementRemovalEvent{" + "queryId=" + queryId + '}';
79+
}
80+
}
81+
82+
/**
83+
* Customized CqlPrepareAsyncProcessor that uses strong values cache instead of weak values to
84+
* prevent cache entries from unexpectedly disappearing during tests.
85+
*/
86+
protected static class TestCqlPrepareAsyncProcessor extends CqlPrepareAsyncProcessor {
87+
88+
private static final Logger LOG =
89+
LoggerFactory.getLogger(PreparedStatementITBase.TestCqlPrepareAsyncProcessor.class);
90+
91+
/**
92+
* Creates a removal listener that fires PreparedStatementRemovalEvent when cache entries are
93+
* removed.
94+
*/
95+
private static RemovalListener<Object, Object> buildCacheRemoveCallback(
96+
@NonNull Optional<DefaultDriverContext> context) {
97+
return (evt) -> {
98+
try {
99+
LOG.error(
100+
"Cache removal callback triggered, cause: {}, key: {}", evt.getCause(), evt.getKey());
101+
CompletableFuture<PreparedStatement> future =
102+
(CompletableFuture<PreparedStatement>) evt.getValue();
103+
104+
// Add more detailed logging about the future state
105+
LOG.error(
106+
"Future state - done: {}, cancelled: {}, completedExceptionally: {}",
107+
future.isDone(),
108+
future.isCancelled(),
109+
future.isCompletedExceptionally());
110+
111+
if (future.isDone() && !future.isCompletedExceptionally() && !future.isCancelled()) {
112+
ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId();
113+
LOG.error("Firing PreparedStatementRemovalEvent for queryId: {}", queryId);
114+
context.ifPresent(
115+
ctx -> {
116+
LOG.error("About to fire PreparedStatementRemovalEvent on event bus");
117+
ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId));
118+
LOG.error("PreparedStatementRemovalEvent fired successfully");
119+
});
120+
} else {
121+
LOG.error("Skipping removal event - future not in valid state for extraction");
122+
}
123+
} catch (Exception e) {
124+
LOG.error("Unable to register removal handler", e);
125+
}
126+
};
127+
}
128+
129+
public TestCqlPrepareAsyncProcessor(@NonNull Optional<DefaultDriverContext> context) {
130+
// Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so
131+
// to prevent cache entries from unexpectedly disappearing mid-test.
132+
super(context, builder -> builder.removalListener(buildCacheRemoveCallback(context)));
133+
}
134+
}
135+
136+
/** Customized DefaultDriverContext that uses TestCqlPrepareAsyncProcessor. */
137+
protected static class TestDefaultDriverContext extends DefaultDriverContext {
138+
public TestDefaultDriverContext(
139+
DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
140+
super(configLoader, programmaticArguments);
141+
}
142+
143+
@Override
144+
protected RequestProcessorRegistry buildRequestProcessorRegistry() {
145+
// Re-create the processor cache to insert the TestCqlPrepareAsyncProcessor with it's strong
146+
// prepared statement cache, see JAVA-3062
147+
List<RequestProcessor<?, ?>> processors =
148+
BuiltInRequestProcessors.createDefaultProcessors(this);
149+
processors.removeIf((processor) -> processor instanceof CqlPrepareAsyncProcessor);
150+
processors.removeIf((processor) -> processor instanceof CqlPrepareSyncProcessor);
151+
CqlPrepareAsyncProcessor asyncProcessor = new TestCqlPrepareAsyncProcessor(Optional.of(this));
152+
processors.add(2, asyncProcessor);
153+
processors.add(3, new CqlPrepareSyncProcessor(asyncProcessor));
154+
return new RequestProcessorRegistry(
155+
getSessionName(), processors.toArray(new RequestProcessor[0]));
156+
}
157+
}
158+
159+
/** Customized SessionBuilder that uses TestDefaultDriverContext. */
160+
protected static class TestSessionBuilder extends SessionBuilder {
161+
162+
@Override
163+
protected Object wrap(@NonNull CqlSession defaultSession) {
164+
return defaultSession;
165+
}
166+
167+
@Override
168+
protected DriverContext buildContext(
169+
DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
170+
return new TestDefaultDriverContext(configLoader, programmaticArguments);
171+
}
172+
}
173+
174+
@BeforeClass
175+
public static void setup() {
176+
System.setProperty(
177+
SessionUtils.SESSION_BUILDER_CLASS_PROPERTY, PreparedStatementITBase.class.getName());
178+
}
179+
180+
@AfterClass
181+
public static void teardown() {
182+
System.clearProperty(SessionUtils.SESSION_BUILDER_CLASS_PROPERTY);
183+
}
184+
185+
/**
186+
* Factory method for creating the custom session builder. This method is called by SessionUtils
187+
* via reflection.
188+
*/
189+
public static SessionBuilder builder() {
190+
return new TestSessionBuilder();
191+
}
192+
}

0 commit comments

Comments
 (0)