@@ -123,13 +123,30 @@ private static RemovalListener<Object, Object> buildCacheRemoveCallback(
123
123
@ NonNull Optional <DefaultDriverContext > context ) {
124
124
return (evt ) -> {
125
125
try {
126
- LOG .error ("Cache removal callback triggered, cause: {}" , evt .getCause ());
126
+ LOG .error (
127
+ "Cache removal callback triggered, cause: {}, key: {}" , evt .getCause (), evt .getKey ());
127
128
CompletableFuture <PreparedStatement > future =
128
129
(CompletableFuture <PreparedStatement >) evt .getValue ();
129
- ByteBuffer queryId = Uninterruptibles .getUninterruptibly (future ).getId ();
130
- LOG .error ("Firing PreparedStatementRemovalEvent for queryId: {}" , queryId );
131
- context .ifPresent (
132
- ctx -> ctx .getEventBus ().fire (new PreparedStatementRemovalEvent (queryId )));
130
+
131
+ // Add more detailed logging about the future state
132
+ LOG .error (
133
+ "Future state - done: {}, cancelled: {}, completedExceptionally: {}" ,
134
+ future .isDone (),
135
+ future .isCancelled (),
136
+ future .isCompletedExceptionally ());
137
+
138
+ if (future .isDone () && !future .isCompletedExceptionally () && !future .isCancelled ()) {
139
+ ByteBuffer queryId = Uninterruptibles .getUninterruptibly (future ).getId ();
140
+ LOG .error ("Firing PreparedStatementRemovalEvent for queryId: {}" , queryId );
141
+ context .ifPresent (
142
+ ctx -> {
143
+ LOG .error ("About to fire PreparedStatementRemovalEvent on event bus" );
144
+ ctx .getEventBus ().fire (new PreparedStatementRemovalEvent (queryId ));
145
+ LOG .error ("PreparedStatementRemovalEvent fired successfully" );
146
+ });
147
+ } else {
148
+ LOG .error ("Skipping removal event - future not in valid state for extraction" );
149
+ }
133
150
} catch (Exception e ) {
134
151
LOG .error ("Unable to register removal handler" , e );
135
152
}
@@ -194,6 +211,68 @@ public static SessionBuilder builder() {
194
211
return new TestSessionBuilder ();
195
212
}
196
213
214
+ private void debugCacheInvalidation (CqlSession session , TypeChangeEvent event ) {
215
+ try {
216
+ DefaultDriverContext ctx = (DefaultDriverContext ) session .getContext ();
217
+ // Get the processor to check cache state
218
+ RequestProcessorRegistry registry = ctx .getRequestProcessorRegistry ();
219
+
220
+ LOG .error (
221
+ "Debug: TypeChangeEvent received for type: {} (changeType: {})" ,
222
+ event .oldType .getName (),
223
+ event .changeType );
224
+ LOG .error ("Debug: Current cache size: {}" , getPreparedCacheSize (session ));
225
+
226
+ // Force cache cleanup to trigger any pending removals
227
+ if (registry != null ) {
228
+ LOG .error ("Debug: Forcing cache cleanup..." );
229
+ // We can't directly access the cache from here, but we can log that we're trying
230
+ }
231
+ } catch (Exception e ) {
232
+ LOG .error ("Debug: Error during cache invalidation debugging" , e );
233
+ }
234
+ }
235
+
236
+ private boolean waitForCacheRemovalWithCleanup (
237
+ CountDownLatch latch , CqlSession session , long timeout , TimeUnit unit ) {
238
+ long timeoutNanos = unit .toNanos (timeout );
239
+ long startTime = System .nanoTime ();
240
+ long remainingNanos = timeoutNanos ;
241
+
242
+ while (remainingNanos > 0 && latch .getCount () > 0 ) {
243
+ // Wait for a short period
244
+ long waitTime = Math .min (remainingNanos , TimeUnit .SECONDS .toNanos (5 ));
245
+ boolean success =
246
+ Uninterruptibles .awaitUninterruptibly (latch , waitTime , TimeUnit .NANOSECONDS );
247
+
248
+ if (success ) {
249
+ LOG .error ("Cache removal latch triggered successfully" );
250
+ return true ;
251
+ }
252
+
253
+ // If we haven't succeeded yet, try to force cache cleanup
254
+ LOG .error (
255
+ "Cache removal latch not triggered yet, forcing cleanup. Current cache size: {}" ,
256
+ getPreparedCacheSize (session ));
257
+
258
+ try {
259
+ // Force garbage collection to help with weak references
260
+ System .gc ();
261
+ Thread .sleep (100 );
262
+ } catch (InterruptedException e ) {
263
+ Thread .currentThread ().interrupt ();
264
+ break ;
265
+ }
266
+
267
+ remainingNanos = timeoutNanos - (System .nanoTime () - startTime );
268
+ }
269
+
270
+ LOG .error (
271
+ "Cache removal latch failed to trigger within timeout. Final cache size: {}" ,
272
+ getPreparedCacheSize (session ));
273
+ return false ;
274
+ }
275
+
197
276
private void invalidationResultSetTest (
198
277
Consumer <CqlSession > setupTestSchema , Set <String > expectedChangedTypes ) {
199
278
invalidationTestInner (
@@ -223,7 +302,15 @@ private void invalidationTestInner(
223
302
224
303
try (CqlSession session = sessionWithCacheSizeMetric ()) {
225
304
226
- assertThat (getPreparedCacheSize (session )).isEqualTo (0 );
305
+ // Ensure we start with a clean cache
306
+ long initialCacheSize = getPreparedCacheSize (session );
307
+ LOG .error ("Starting test with cache size: {}" , initialCacheSize );
308
+ assertThat (initialCacheSize ).isEqualTo (0 );
309
+
310
+ // Force garbage collection to ensure clean state
311
+ System .gc ();
312
+ Uninterruptibles .sleepUninterruptibly (100 , TimeUnit .MILLISECONDS );
313
+
227
314
setupTestSchema .accept (session );
228
315
229
316
PreparedStatement stmt1 = session .prepare (preparedStmtQueryType1 );
@@ -255,6 +342,10 @@ private void invalidationTestInner(
255
342
"Received TypeChangeEvent for type: {} (changeType: {})" ,
256
343
e .oldType .getName (),
257
344
e .changeType );
345
+
346
+ // Add detailed debugging for cache invalidation
347
+ debugCacheInvalidation (session , e );
348
+
258
349
if (Boolean .TRUE .equals (
259
350
changedTypes .putIfAbsent (e .oldType .getName ().toString (), true ))) {
260
351
// store an error if we see duplicate change event
@@ -284,18 +375,25 @@ private void invalidationTestInner(
284
375
LOG .error ("Expected to invalidate statement 2 (queryId: {}) due to type change" , queryId2 );
285
376
session .execute ("ALTER TYPE test_type_caching_2 add i blob" );
286
377
287
- // Give a small delay to allow the schema change to propagate before checking agreement
288
- Uninterruptibles .sleepUninterruptibly (100 , TimeUnit .MILLISECONDS );
378
+ // Give a longer delay to allow the schema change to propagate before checking agreement
379
+ LOG .error ("Waiting for schema change to propagate..." );
380
+ Uninterruptibles .sleepUninterruptibly (500 , TimeUnit .MILLISECONDS );
289
381
session .checkSchemaAgreement ();
290
382
383
+ // Additional delay to allow event processing to complete
384
+ LOG .error ("Waiting for event processing to complete..." );
385
+ Uninterruptibles .sleepUninterruptibly (1000 , TimeUnit .MILLISECONDS );
386
+
291
387
// wait for latches and fail if they don't reach zero before timeout
292
388
// Use longer timeout for cache removal as it depends on complex event chain
293
- boolean cacheRemovalSuccess =
294
- Uninterruptibles .awaitUninterruptibly (
295
- preparedStmtCacheRemoveLatch , 180 , TimeUnit .SECONDS );
296
389
boolean typeChangeSuccess =
297
390
Uninterruptibles .awaitUninterruptibly (typeChangeEventLatch , 60 , TimeUnit .SECONDS );
298
391
392
+ // For cache removal, use a more robust waiting mechanism with periodic cleanup
393
+ boolean cacheRemovalSuccess =
394
+ waitForCacheRemovalWithCleanup (
395
+ preparedStmtCacheRemoveLatch , session , 180 , TimeUnit .SECONDS );
396
+
299
397
// Provide detailed diagnostics if either latch fails
300
398
if (!cacheRemovalSuccess || !typeChangeSuccess ) {
301
399
String diagnostics =
0 commit comments