Skip to content

Commit 98eb757

Browse files
committed
strong values of cache
1 parent 3289fda commit 98eb757

File tree

2 files changed

+28
-184
lines changed

2 files changed

+28
-184
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,19 @@ protected CqlPrepareAsyncProcessor(
8383
});
8484
}
8585

86+
protected CqlPrepareAsyncProcessor(
87+
Optional<? extends DefaultDriverContext> context, CacheBuilder<Object, Object> cacheBuilder) {
88+
this.cache = cacheBuilder.build();
89+
context.ifPresent(
90+
(ctx) -> {
91+
LOG.info("Adding handler to invalidate cached prepared statements on type changes");
92+
EventExecutor adminExecutor = ctx.getNettyOptions().adminEventExecutorGroup().next();
93+
ctx.getEventBus()
94+
.register(
95+
TypeChangeEvent.class, RunOrSchedule.on(adminExecutor, this::onTypeChanged));
96+
});
97+
}
98+
8699
private static boolean typeMatches(UserDefinedType oldType, DataType typeToCheck) {
87100

88101
switch (typeToCheck.getProtocolCode()) {

integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java

Lines changed: 15 additions & 184 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.datastax.oss.driver.internal.core.session.BuiltInRequestProcessors;
4141
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
4242
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
43+
import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder;
4344
import com.datastax.oss.driver.shaded.guava.common.cache.RemovalListener;
4445
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles;
4546
import com.google.common.collect.ImmutableList;
@@ -72,8 +73,6 @@
7273
@Category(IsolatedTests.class)
7374
public class PreparedStatementCachingIT {
7475

75-
private static final Logger LOG = LoggerFactory.getLogger(PreparedStatementCachingIT.class);
76-
7776
private CustomCcmRule ccmRule = CustomCcmRule.builder().build();
7877

7978
private SessionRule<CqlSession> sessionRule =
@@ -123,30 +122,11 @@ private static RemovalListener<Object, Object> buildCacheRemoveCallback(
123122
@NonNull Optional<DefaultDriverContext> context) {
124123
return (evt) -> {
125124
try {
126-
LOG.error(
127-
"Cache removal callback triggered, cause: {}, key: {}", evt.getCause(), evt.getKey());
128125
CompletableFuture<PreparedStatement> future =
129126
(CompletableFuture<PreparedStatement>) evt.getValue();
130-
131-
// Add more detailed logging about the future state
132-
LOG.error(
133-
"Future state - done: {}, cancelled: {}, completedExceptionally: {}",
134-
future.isDone(),
135-
future.isCancelled(),
136-
future.isCompletedExceptionally());
137-
138-
if (future.isDone() && !future.isCompletedExceptionally() && !future.isCancelled()) {
139-
ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId();
140-
LOG.error("Firing PreparedStatementRemovalEvent for queryId: {}", queryId);
141-
context.ifPresent(
142-
ctx -> {
143-
LOG.error("About to fire PreparedStatementRemovalEvent on event bus");
144-
ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId));
145-
LOG.error("PreparedStatementRemovalEvent fired successfully");
146-
});
147-
} else {
148-
LOG.error("Skipping removal event - future not in valid state for extraction");
149-
}
127+
ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId();
128+
context.ifPresent(
129+
ctx -> ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId)));
150130
} catch (Exception e) {
151131
LOG.error("Unable to register removal handler", e);
152132
}
@@ -156,7 +136,8 @@ private static RemovalListener<Object, Object> buildCacheRemoveCallback(
156136
public TestCqlPrepareAsyncProcessor(@NonNull Optional<DefaultDriverContext> context) {
157137
// Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so
158138
// to prevent cache entries from unexpectedly disappearing mid-test.
159-
super(context, builder -> builder.removalListener(buildCacheRemoveCallback(context)));
139+
// TODO: it was still weak value cuz it's only a decorator.
140+
super(context, CacheBuilder.newBuilder().removalListener(buildCacheRemoveCallback(context)));
160141
}
161142
}
162143

@@ -211,83 +192,6 @@ public static SessionBuilder builder() {
211192
return new TestSessionBuilder();
212193
}
213194

214-
private void debugCacheInvalidation(CqlSession session, TypeChangeEvent event) {
215-
try {
216-
DefaultDriverContext ctx = (DefaultDriverContext) session.getContext();
217-
// Get the processor to check cache state
218-
RequestProcessorRegistry registry = ctx.getRequestProcessorRegistry();
219-
220-
LOG.error(
221-
"Debug: TypeChangeEvent received for type: {} (changeType: {})",
222-
event.oldType.getName(),
223-
event.changeType);
224-
LOG.error("Debug: Current cache size: {}", getPreparedCacheSize(session));
225-
226-
// Force cache cleanup to trigger any pending removals
227-
if (registry != null) {
228-
LOG.error("Debug: Forcing cache cleanup...");
229-
// We can't directly access the cache from here, but we can log that we're trying
230-
}
231-
} catch (Exception e) {
232-
LOG.error("Debug: Error during cache invalidation debugging", e);
233-
}
234-
}
235-
236-
private boolean waitForCacheRemovalWithCleanup(
237-
CountDownLatch latch, CqlSession session, long timeout, TimeUnit unit) {
238-
long timeoutNanos = unit.toNanos(timeout);
239-
long startTime = System.nanoTime();
240-
long remainingNanos = timeoutNanos;
241-
242-
LOG.error("Starting cache removal wait, initial latch count: {}", latch.getCount());
243-
244-
// First check if the latch is already at 0
245-
if (latch.getCount() == 0) {
246-
LOG.error("Cache removal latch already at 0, returning success immediately");
247-
return true;
248-
}
249-
250-
while (remainingNanos > 0 && latch.getCount() > 0) {
251-
// Wait for a short period
252-
long waitTime = Math.min(remainingNanos, TimeUnit.SECONDS.toNanos(5));
253-
LOG.error(
254-
"Waiting for cache removal latch, current count: {}, wait time: {}s",
255-
latch.getCount(),
256-
TimeUnit.NANOSECONDS.toSeconds(waitTime));
257-
258-
boolean success =
259-
Uninterruptibles.awaitUninterruptibly(latch, waitTime, TimeUnit.NANOSECONDS);
260-
261-
if (success) {
262-
LOG.error("Cache removal latch triggered successfully, final count: {}", latch.getCount());
263-
return true;
264-
}
265-
266-
// If we haven't succeeded yet, try to force cache cleanup
267-
LOG.error(
268-
"Cache removal latch not triggered yet (count: {}), forcing cleanup. Current cache size: {}",
269-
latch.getCount(),
270-
getPreparedCacheSize(session));
271-
272-
try {
273-
// Force garbage collection to help with weak references
274-
System.gc();
275-
Thread.sleep(100);
276-
} catch (InterruptedException e) {
277-
Thread.currentThread().interrupt();
278-
break;
279-
}
280-
281-
remainingNanos = timeoutNanos - (System.nanoTime() - startTime);
282-
}
283-
284-
LOG.error(
285-
"Cache removal latch failed to trigger within timeout. Final latch count: {}, cache size: {}",
286-
latch.getCount(),
287-
getPreparedCacheSize(session));
288-
return latch.getCount() == 0; // Return true if latch reached 0 even if await timed out
289-
}
290-
291195
private void invalidationResultSetTest(
292196
Consumer<CqlSession> setupTestSchema, Set<String> expectedChangedTypes) {
293197
invalidationTestInner(
@@ -317,18 +221,13 @@ private void invalidationTestInner(
317221

318222
try (CqlSession session = sessionWithCacheSizeMetric()) {
319223

320-
// Ensure we start with a clean cache
321-
long initialCacheSize = getPreparedCacheSize(session);
322-
LOG.error("Starting test with cache size: {}", initialCacheSize);
323-
assertThat(initialCacheSize).isEqualTo(0);
324-
325-
// Force garbage collection to ensure clean state
326-
System.gc();
327-
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
328-
224+
assertThat(getPreparedCacheSize(session)).isEqualTo(0);
329225
setupTestSchema.accept(session);
330226

331-
// Set up event handlers BEFORE creating prepared statements to avoid race conditions
227+
session.prepare(preparedStmtQueryType1);
228+
ByteBuffer queryId2 = session.prepare(preparedStmtQueryType2).getId();
229+
assertThat(getPreparedCacheSize(session)).isEqualTo(2);
230+
332231
CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch(1);
333232
CountDownLatch typeChangeEventLatch = new CountDownLatch(expectedChangedTypes.size());
334233

@@ -340,21 +239,11 @@ private void invalidationTestInner(
340239
new AtomicReference<>(Optional.empty());
341240
AtomicReference<Optional<String>> removedQueryEventError =
342241
new AtomicReference<>(Optional.empty());
343-
344-
LOG.error("Registering event handlers before creating prepared statements");
345242
ctx.getEventBus()
346243
.register(
347244
TypeChangeEvent.class,
348245
(e) -> {
349246
// expect one event per type changed and for every parent type that nests it
350-
LOG.error(
351-
"Received TypeChangeEvent for type: {} (changeType: {})",
352-
e.oldType.getName(),
353-
e.changeType);
354-
355-
// Add detailed debugging for cache invalidation
356-
debugCacheInvalidation(session, e);
357-
358247
if (Boolean.TRUE.equals(
359248
changedTypes.putIfAbsent(e.oldType.getName().toString(), true))) {
360249
// store an error if we see duplicate change event
@@ -367,85 +256,27 @@ private void invalidationTestInner(
367256
.register(
368257
PreparedStatementRemovalEvent.class,
369258
(e) -> {
370-
LOG.error("Received PreparedStatementRemovalEvent for queryId: {}", e.queryId);
371259
if (!removedQueryIds.compareAndSet(Optional.empty(), Optional.of(e.queryId))) {
372260
// store an error if we see multiple cache invalidation events
373261
// any non-empty error will fail the test so it's OK to do this multiple times
374262
removedQueryEventError.set(
375263
Optional.of("Unable to set reference for PS removal event"));
376-
LOG.warn(
377-
"Multiple PreparedStatementRemovalEvents received, ignoring subsequent ones");
378264
}
379-
LOG.error(
380-
"About to countdown preparedStmtCacheRemoveLatch, current count: {}",
381-
preparedStmtCacheRemoveLatch.getCount());
382265
preparedStmtCacheRemoveLatch.countDown();
383-
LOG.error(
384-
"Countdown completed, new count: {}", preparedStmtCacheRemoveLatch.getCount());
385266
});
386267

387-
// Now create the prepared statements
388-
PreparedStatement stmt1 = session.prepare(preparedStmtQueryType1);
389-
PreparedStatement stmt2 = session.prepare(preparedStmtQueryType2);
390-
ByteBuffer queryId2 = stmt2.getId();
391-
assertThat(getPreparedCacheSize(session)).isEqualTo(2);
392-
393-
LOG.error("Prepared statements in cache:");
394-
LOG.error(" Statement 1: {} (queryId: {})", preparedStmtQueryType1, stmt1.getId());
395-
LOG.error(" Statement 2: {} (queryId: {})", preparedStmtQueryType2, stmt2.getId());
396-
397268
// alter test_type_caching_2 to trigger cache invalidation and above events
398-
LOG.error("Executing ALTER TYPE test_type_caching_2 add i blob");
399-
LOG.error("Expected to invalidate statement 2 (queryId: {}) due to type change", queryId2);
400269
session.execute("ALTER TYPE test_type_caching_2 add i blob");
401270

402-
// Give a longer delay to allow the schema change to propagate before checking agreement
403-
LOG.error("Waiting for schema change to propagate...");
404-
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
405271
session.checkSchemaAgreement();
406272

407273
// wait for latches and fail if they don't reach zero before timeout
408-
// Use longer timeout for cache removal as it depends on complex event chain
409-
boolean typeChangeSuccess =
410-
Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 60, TimeUnit.SECONDS);
411-
412-
// For cache removal, use a more robust waiting mechanism with periodic cleanup
413-
boolean cacheRemovalSuccess =
414-
waitForCacheRemovalWithCleanup(
415-
preparedStmtCacheRemoveLatch, session, 180, TimeUnit.SECONDS);
416-
417-
// Provide detailed diagnostics if either latch fails
418-
if (!cacheRemovalSuccess || !typeChangeSuccess) {
419-
String diagnostics =
420-
String.format(
421-
"Test failure diagnostics:\n"
422-
+ " - Cache removal latch success: %s (count: %d)\n"
423-
+ " - Type change latch success: %s (count: %d)\n"
424-
+ " - Current cache size: %d\n"
425-
+ " - Expected changed types: %s\n"
426-
+ " - Actual changed types detected: %s\n"
427-
+ " - Expected removed query ID: %s\n"
428-
+ " - Actual removed query IDs: %s\n"
429-
+ " - Type change errors: %s\n"
430-
+ " - Removal event errors: %s",
431-
cacheRemovalSuccess,
432-
preparedStmtCacheRemoveLatch.getCount(),
433-
typeChangeSuccess,
434-
typeChangeEventLatch.getCount(),
435-
getPreparedCacheSize(session),
436-
expectedChangedTypes,
437-
changedTypes.keySet(),
438-
queryId2,
439-
removedQueryIds.get(),
440-
typeChangeEventError.get(),
441-
removedQueryEventError.get());
442-
LOG.error("Prepared statement cache invalidation test failed: {}", diagnostics);
443-
}
444-
445-
assertThat(cacheRemovalSuccess)
274+
assertThat(
275+
Uninterruptibles.awaitUninterruptibly(
276+
preparedStmtCacheRemoveLatch, 120, TimeUnit.SECONDS))
446277
.withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout")
447278
.isTrue();
448-
assertThat(typeChangeSuccess)
279+
assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 20, TimeUnit.SECONDS))
449280
.withFailMessage("typeChangeEventLatch did not trigger before timeout")
450281
.isTrue();
451282

0 commit comments

Comments
 (0)