22
22
import java .nio .file .Files ;
23
23
import java .util .ArrayList ;
24
24
import java .util .List ;
25
+ import java .util .Objects ;
25
26
import java .util .concurrent .CountDownLatch ;
26
27
import java .util .concurrent .Semaphore ;
27
28
import java .util .concurrent .TimeUnit ;
28
29
import java .util .concurrent .atomic .AtomicInteger ;
29
30
import java .util .function .Supplier ;
30
31
31
32
import org .apache .cassandra .config .DatabaseDescriptor ;
33
+ import org .apache .cassandra .inject .Injections ;
34
+ import org .apache .cassandra .inject .InvokePointBuilder ;
32
35
import org .apache .cassandra .io .util .File ;
33
36
import org .junit .After ;
34
37
import org .junit .Before ;
37
40
38
41
import net .openhft .chronicle .queue .ChronicleQueue ;
39
42
import net .openhft .chronicle .queue .ExcerptTailer ;
40
- import net .openhft .chronicle .queue .RollCycles ;
43
+ import net .openhft .chronicle .queue .rollcycles . TestRollCycles ;
41
44
import net .openhft .chronicle .queue .impl .single .SingleChronicleQueueBuilder ;
42
45
import net .openhft .chronicle .wire .WireOut ;
43
46
import org .apache .cassandra .Util ;
44
47
48
+ import static org .apache .cassandra .Util .spinAssert ;
49
+ import static org .assertj .core .api .Assertions .assertThat ;
45
50
import static org .apache .cassandra .utils .Clock .Global .nanoTime ;
46
51
import static org .junit .Assert .assertEquals ;
47
52
import static org .junit .Assert .assertFalse ;
48
53
import static org .junit .Assert .assertNotEquals ;
54
+ import static org .junit .Assert .assertNotSame ;
49
55
import static org .junit .Assert .assertTrue ;
50
56
import static org .junit .Assert .fail ;
51
57
@@ -57,11 +63,16 @@ public static Path tempDir() throws Exception
57
63
}
58
64
59
65
private static final String testString = "ry@nlikestheyankees" ;
60
- private static final String testString2 = testString + "1" ;
66
+ private static final String testString2 = testString + '1' ;
61
67
62
68
private BinLog binLog ;
63
69
private Path path ;
64
70
71
+ static
72
+ {
73
+ DatabaseDescriptor .daemonInitialization (); // needed for Injections to work
74
+ }
75
+
65
76
@ BeforeClass
66
77
public static void setup ()
67
78
{
@@ -74,7 +85,7 @@ public void setUp() throws Exception
74
85
{
75
86
path = tempDir ();
76
87
binLog = new BinLog .Builder ().path (path )
77
- .rollCycle (RollCycles .TEST_SECONDLY .toString ())
88
+ .rollCycle (TestRollCycles .TEST_SECONDLY .toString ())
78
89
.maxQueueWeight (10 )
79
90
.maxLogSize (1024 * 1024 * 128 )
80
91
.blocking (false )
@@ -109,13 +120,13 @@ public void testConstructorNullRollCycle() throws Exception
109
120
@ Test (expected = IllegalArgumentException .class )
110
121
public void testConstructorZeroWeight () throws Exception
111
122
{
112
- new BinLog .Builder ().path (tempDir ()).rollCycle (RollCycles .TEST_SECONDLY .toString ()).maxQueueWeight (0 ).build (false );
123
+ new BinLog .Builder ().path (tempDir ()).rollCycle (TestRollCycles .TEST_SECONDLY .toString ()).maxQueueWeight (0 ).build (false );
113
124
}
114
125
115
126
@ Test (expected = IllegalArgumentException .class )
116
127
public void testConstructorLogSize () throws Exception
117
128
{
118
- new BinLog .Builder ().path (tempDir ()).rollCycle (RollCycles .TEST_SECONDLY .toString ()).maxLogSize (0 ).build (false );
129
+ new BinLog .Builder ().path (tempDir ()).rollCycle (TestRollCycles .TEST_SECONDLY .toString ()).maxLogSize (0 ).build (false );
119
130
}
120
131
121
132
/**
@@ -166,7 +177,7 @@ public void writeMarshallablePayload(WireOut wire)
166
177
});
167
178
t .start ();
168
179
t .join (60 * 1000 );
169
- assertEquals ("BinLog should not take more than 1 minute to stop" , t . getState (), Thread .State .TERMINATED );
180
+ assertEquals ("BinLog should not take more than 1 minute to stop" , Thread .State .TERMINATED , t . getState () );
170
181
171
182
Util .spinAssertEquals (2 , releaseCount ::get , 60 );
172
183
Util .spinAssertEquals (Thread .State .TERMINATED , binLog .binLogThread ::getState , 60 );
@@ -294,7 +305,7 @@ public void writeMarshallablePayload(WireOut wire)
294
305
t .start ();
295
306
Thread .sleep (500 );
296
307
//If the thread is not terminated then it is probably blocked on the queue
297
- assertTrue ( t . getState () != Thread .State .TERMINATED );
308
+ assertNotSame ( Thread .State .TERMINATED , t . getState () );
298
309
}
299
310
finally
300
311
{
@@ -382,7 +393,7 @@ public void writeMarshallablePayload(WireOut wire)
382
393
public void testCleanupOnOversize () throws Exception
383
394
{
384
395
tearDown ();
385
- binLog = new BinLog .Builder ().path (path ).rollCycle (RollCycles .TEST_SECONDLY .toString ()).maxQueueWeight (1 ).maxLogSize (10000 ).blocking (false ).build (false );
396
+ binLog = new BinLog .Builder ().path (path ).rollCycle (TestRollCycles .TEST_SECONDLY .toString ()).maxQueueWeight (1 ).maxLogSize (10000 ).blocking (false ).build (false );
386
397
for (int ii = 0 ; ii < 5 ; ii ++)
387
398
{
388
399
binLog .put (record (String .valueOf (ii )));
@@ -422,48 +433,56 @@ public void testPutAfterStop() throws Exception
422
433
423
434
/**
424
435
* Test for a bug where files were deleted but the space was not reclaimed when tracking so
425
- * all log segemnts were incorrectly deleted when rolled.
426
- *
427
- * Due to some internal state in ChronicleQueue this test is occasionally
428
- * flaky when run in the suite with testPut or testOffer.
436
+ * all log segments were incorrectly deleted when rolled.
429
437
*/
430
438
@ Test
431
- public void testTruncationReleasesLogSpace () throws Exception
439
+ public void testTruncationReleasesLogSpace () throws Throwable
432
440
{
433
- Util .flakyTest (this ::flakyTestTruncationReleasesLogSpace , 2 , "Fails occasionally due to Chronicle internal state, see CASSANDRA-16526" );
434
- }
435
-
436
-
437
- private void flakyTestTruncationReleasesLogSpace ()
438
- {
439
- StringBuilder sb = new StringBuilder ();
440
441
try
441
442
{
442
- for (int ii = 0 ; ii < 1024 * 1024 * 2 ; ii ++)
443
+ Injections .Counter deletionCounter = Injections .newCounter ("binlogSegmentDeletion" )
444
+ .add (InvokePointBuilder .newInvokePoint ()
445
+ .onClass ("org.apache.cassandra.utils.binlog.DeletingArchiver" )
446
+ .onMethod ("onReleased" )
447
+ .atExit ())
448
+ .build ();
449
+ Injections .inject (deletionCounter );
450
+ deletionCounter .reset ();
451
+ deletionCounter .enable ();
452
+
453
+ String queryString = "a" .repeat (1024 * 1024 * 2 );
454
+
455
+ int maxFileCount = 0 ;
456
+
457
+ // This should fill up the log so when it rolls in the future it will always delete the rolled segment;
458
+ // Make sure we don't delete more than one segment.
459
+ // This loop should complete in less than 3 seconds as we're rolling the log every second.
460
+ long startTime = System .currentTimeMillis ();
461
+ while (deletionCounter .get () <= 1 && System .currentTimeMillis () - startTime < 10 * 1000 )
443
462
{
444
- sb .append ('a' );
445
- }
463
+ binLog .put (record (queryString ));
446
464
447
- String queryString = sb .toString ();
465
+ int fileCount = getFileCount (path );
466
+ assertThat (fileCount ).isGreaterThanOrEqualTo (maxFileCount - 1 );
467
+ assertThat (fileCount ).isLessThanOrEqualTo (3 );
468
+ maxFileCount = Math .max (maxFileCount , fileCount );
448
469
449
- //This should fill up the log so when it rolls in the future it will always delete the rolled segment;
450
- for (int ii = 0 ; ii < 129 ; ii ++)
451
- {
452
- binLog .put (record (queryString ));
470
+ spinAssert (() -> assertThat (readBinLogRecords (path ).size ()).isGreaterThanOrEqualTo (1 ), 10 );
453
471
}
454
472
455
- for (int ii = 0 ; ii < 2 ; ii ++)
456
- {
457
- Thread .sleep (2000 );
458
- binLog .put (record (queryString ));
459
- }
473
+ assertThat (deletionCounter .get ()).isGreaterThan (0 );
474
+ assertThat (getFileCount (path )).isGreaterThanOrEqualTo (maxFileCount - 1 );
475
+ spinAssert (() -> assertThat (readBinLogRecords (path ).size ()).isGreaterThanOrEqualTo (1 ), 10 );
460
476
}
461
477
catch (InterruptedException e )
462
478
{
463
479
throw new RuntimeException (e );
464
480
}
481
+ }
465
482
466
- Util .spinAssertEquals (2 , () -> readBinLogRecords (path ).size (), 60 );
483
+ static int getFileCount (Path path )
484
+ {
485
+ return Objects .requireNonNull (path .toFile ().listFiles ()).length ;
467
486
}
468
487
469
488
static BinLog .ReleaseableWriteMarshallable record (String text )
@@ -495,7 +514,7 @@ public void writeMarshallablePayload(WireOut wire)
495
514
List <String > readBinLogRecords (Path path )
496
515
{
497
516
List <String > records = new ArrayList <String >();
498
- try (ChronicleQueue queue = SingleChronicleQueueBuilder .single (path .toFile ()).rollCycle (RollCycles .TEST_SECONDLY ).build ())
517
+ try (ChronicleQueue queue = SingleChronicleQueueBuilder .single (path .toFile ()).rollCycle (TestRollCycles .TEST_SECONDLY ).build ())
499
518
{
500
519
ExcerptTailer tailer = queue .createTailer ();
501
520
while (true )
0 commit comments