Skip to content

Commit 9acaf3a

Browse files
committed
Added test for segment cleanup and page file re-use.
1 parent e48b32a commit 9acaf3a

File tree

1 file changed

+51
-0
lines changed

1 file changed

+51
-0
lines changed

broker/src/test/java/io/moquette/broker/unsafequeues/QueueTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,57 @@ public void reopenQueueWithFragmentation() throws QueueException, IOException {
562562
assertEquals("(0, 0), (0, 4194304)", segmentRefs);
563563
}
564564

565+
@Test
566+
public void testPageFileReuse() throws QueueException, IOException {
567+
// Use smaller segmants and pages for quicker testing.
568+
final int kb = 1024;
569+
// Pages with only 4 segments, for quicker testing
570+
final int queuePageSize = 16 * kb;
571+
final int queueSegmentSize = 4 * kb;
572+
// write 2 segments, consume one segment, next segment allocated should be one just freed.0
573+
QueuePool queuePool = QueuePool.loadQueues(tempQueueFolder, queuePageSize, queueSegmentSize);
574+
Queue queue1 = queuePool.getOrCreate("test_external_fragmentation");
575+
576+
byte[] bytes = new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n'};
577+
578+
// fill seven segments (almost the full 8 we have)
579+
// This should force two files to open.
580+
for (int i = 0; i < 7; i++) {
581+
queue1.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2])));
582+
queue1.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2 + 1])));
583+
}
584+
585+
// Create a new queue
586+
final Queue queue2 = queuePool.getOrCreate("test_external_fragmentation2");
587+
588+
// Write one segment (filling the second page)
589+
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[0])));
590+
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[1])));
591+
592+
// Release the first queue
593+
queue1.force();
594+
queue1.close();
595+
596+
// Fill another six segments.
597+
// This should not open more files.
598+
for (int i = 1; i < 7; i++) {
599+
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2])));
600+
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2 + 1])));
601+
}
602+
603+
queue2.force();
604+
queuePool.close();
605+
606+
// Verify
607+
// checkpoint contains che correct order, (0,0), (0, 4194304)
608+
Properties checkpointProps3 = loadCheckpointFile(tempQueueFolder);
609+
610+
// We should now have segments 6, 5, 4, 3, 2, 1, 8
611+
// or, 2.2, 2.1, 1.4, 1.3, 1.2, 1.1, 2.4
612+
final String segmentRefs = checkpointProps3.getProperty("queues.0.segments");
613+
assertEquals("(1, 4096), (1, 0), (0, 12288), (0, 8192), (0, 4096), (0, 0), (1, 12288)", segmentRefs);
614+
}
615+
565616
private Properties loadCheckpointFile(Path dir) throws IOException {
566617
final Path checkpointPath = dir.resolve("checkpoint.properties");
567618
final FileReader fileReader = new FileReader(checkpointPath.toFile());

0 commit comments

Comments
 (0)