@@ -239,20 +239,34 @@ private boolean waitForCacheRemovalWithCleanup(
239
239
long startTime = System .nanoTime ();
240
240
long remainingNanos = timeoutNanos ;
241
241
242
+ LOG .error ("Starting cache removal wait, initial latch count: {}" , latch .getCount ());
243
+
244
+ // First check if the latch is already at 0
245
+ if (latch .getCount () == 0 ) {
246
+ LOG .error ("Cache removal latch already at 0, returning success immediately" );
247
+ return true ;
248
+ }
249
+
242
250
while (remainingNanos > 0 && latch .getCount () > 0 ) {
243
251
// Wait for a short period
244
252
long waitTime = Math .min (remainingNanos , TimeUnit .SECONDS .toNanos (5 ));
253
+ LOG .error (
254
+ "Waiting for cache removal latch, current count: {}, wait time: {}s" ,
255
+ latch .getCount (),
256
+ TimeUnit .NANOSECONDS .toSeconds (waitTime ));
257
+
245
258
boolean success =
246
259
Uninterruptibles .awaitUninterruptibly (latch , waitTime , TimeUnit .NANOSECONDS );
247
260
248
261
if (success ) {
249
- LOG .error ("Cache removal latch triggered successfully" );
262
+ LOG .error ("Cache removal latch triggered successfully, final count: {}" , latch . getCount () );
250
263
return true ;
251
264
}
252
265
253
266
// If we haven't succeeded yet, try to force cache cleanup
254
267
LOG .error (
255
- "Cache removal latch not triggered yet, forcing cleanup. Current cache size: {}" ,
268
+ "Cache removal latch not triggered yet (count: {}), forcing cleanup. Current cache size: {}" ,
269
+ latch .getCount (),
256
270
getPreparedCacheSize (session ));
257
271
258
272
try {
@@ -268,9 +282,10 @@ private boolean waitForCacheRemovalWithCleanup(
268
282
}
269
283
270
284
LOG .error (
271
- "Cache removal latch failed to trigger within timeout. Final cache size: {}" ,
285
+ "Cache removal latch failed to trigger within timeout. Final latch count: {}, cache size: {}" ,
286
+ latch .getCount (),
272
287
getPreparedCacheSize (session ));
273
- return false ;
288
+ return latch . getCount () == 0 ; // Return true if latch reached 0 even if await timed out
274
289
}
275
290
276
291
private void invalidationResultSetTest (
@@ -313,15 +328,7 @@ private void invalidationTestInner(
313
328
314
329
setupTestSchema .accept (session );
315
330
316
- PreparedStatement stmt1 = session .prepare (preparedStmtQueryType1 );
317
- PreparedStatement stmt2 = session .prepare (preparedStmtQueryType2 );
318
- ByteBuffer queryId2 = stmt2 .getId ();
319
- assertThat (getPreparedCacheSize (session )).isEqualTo (2 );
320
-
321
- LOG .error ("Prepared statements in cache:" );
322
- LOG .error (" Statement 1: {} (queryId: {})" , preparedStmtQueryType1 , stmt1 .getId ());
323
- LOG .error (" Statement 2: {} (queryId: {})" , preparedStmtQueryType2 , stmt2 .getId ());
324
-
331
+ // Set up event handlers BEFORE creating prepared statements to avoid race conditions
325
332
CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch (1 );
326
333
CountDownLatch typeChangeEventLatch = new CountDownLatch (expectedChangedTypes .size ());
327
334
@@ -333,6 +340,8 @@ private void invalidationTestInner(
333
340
new AtomicReference <>(Optional .empty ());
334
341
AtomicReference <Optional <String >> removedQueryEventError =
335
342
new AtomicReference <>(Optional .empty ());
343
+
344
+ LOG .error ("Registering event handlers before creating prepared statements" );
336
345
ctx .getEventBus ()
337
346
.register (
338
347
TypeChangeEvent .class ,
@@ -367,9 +376,24 @@ private void invalidationTestInner(
367
376
LOG .warn (
368
377
"Multiple PreparedStatementRemovalEvents received, ignoring subsequent ones" );
369
378
}
379
+ LOG .error (
380
+ "About to countdown preparedStmtCacheRemoveLatch, current count: {}" ,
381
+ preparedStmtCacheRemoveLatch .getCount ());
370
382
preparedStmtCacheRemoveLatch .countDown ();
383
+ LOG .error (
384
+ "Countdown completed, new count: {}" , preparedStmtCacheRemoveLatch .getCount ());
371
385
});
372
386
387
+ // Now create the prepared statements
388
+ PreparedStatement stmt1 = session .prepare (preparedStmtQueryType1 );
389
+ PreparedStatement stmt2 = session .prepare (preparedStmtQueryType2 );
390
+ ByteBuffer queryId2 = stmt2 .getId ();
391
+ assertThat (getPreparedCacheSize (session )).isEqualTo (2 );
392
+
393
+ LOG .error ("Prepared statements in cache:" );
394
+ LOG .error (" Statement 1: {} (queryId: {})" , preparedStmtQueryType1 , stmt1 .getId ());
395
+ LOG .error (" Statement 2: {} (queryId: {})" , preparedStmtQueryType2 , stmt2 .getId ());
396
+
373
397
// alter test_type_caching_2 to trigger cache invalidation and above events
374
398
LOG .error ("Executing ALTER TYPE test_type_caching_2 add i blob" );
375
399
LOG .error ("Expected to invalidate statement 2 (queryId: {}) due to type change" , queryId2 );
@@ -380,10 +404,6 @@ private void invalidationTestInner(
380
404
Uninterruptibles .sleepUninterruptibly (500 , TimeUnit .MILLISECONDS );
381
405
session .checkSchemaAgreement ();
382
406
383
- // Additional delay to allow event processing to complete
384
- LOG .error ("Waiting for event processing to complete..." );
385
- Uninterruptibles .sleepUninterruptibly (1000 , TimeUnit .MILLISECONDS );
386
-
387
407
// wait for latches and fail if they don't reach zero before timeout
388
408
// Use longer timeout for cache removal as it depends on complex event chain
389
409
boolean typeChangeSuccess =
0 commit comments