Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.19-SNAPSHOT
[fix] Fixed SegmentedQueues not being cleaned up on session purge. (#833)

Version 0.18:
[fix] Update Netty to 4.1.116 and H2 2.1.214
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ public boolean isEmpty() {
}
}

/**
* Close the Queue and release all resources.
*/
public void close() {
queuePool.purgeQueue(name);
headSegment = null;
tailSegment = null;
}

/**
* Read next message or return null if the queue has no data.
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,24 @@ public Queue getOrCreate(String queueName) throws QueueException {
}
}

void purgeQueue(String name) {
final QueueName queueName = new QueueName(name);
final LinkedList<SegmentRef> segmentRefs = queueSegments.remove(queueName);
SegmentRef segmentRef = segmentRefs.pollLast();
segmentsAllocationLock.lock();
LOG.debug("Purging segments for queue {}", queueName);
try {
while (segmentRef != null) {
LOG.debug("Purging segment {} from queue {}", segmentRef, queueName);
recycledSegments.add(segmentRef);
segmentRef = segmentRefs.pollLast();
}
} finally {
segmentsAllocationLock.unlock();
}
queues.remove(queueName);
}

/**
* Free mapped files
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,6 @@ public boolean isEmpty() {
@Override
public void closeAndPurge() {
closed = true;
segmentedQueue.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,57 @@ public void reopenQueueWithFragmentation() throws QueueException, IOException {
assertEquals("(0, 0), (0, 4194304)", segmentRefs);
}

@Test
public void testPageFileReuse() throws QueueException, IOException {
// Use smaller segmants and pages for quicker testing.
final int kb = 1024;
// Pages with only 4 segments, for quicker testing
final int queuePageSize = 16 * kb;
final int queueSegmentSize = 4 * kb;
// write 2 segments, consume one segment, next segment allocated should be one just freed.0
QueuePool queuePool = QueuePool.loadQueues(tempQueueFolder, queuePageSize, queueSegmentSize);
Queue queue1 = queuePool.getOrCreate("test_external_fragmentation");

byte[] bytes = new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n'};

// fill seven segments (almost the full 8 we have)
// This should force two files to open.
for (int i = 0; i < 7; i++) {
queue1.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2])));
queue1.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2 + 1])));
}

// Create a new queue
final Queue queue2 = queuePool.getOrCreate("test_external_fragmentation2");

// Write one segment (filling the second page)
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[0])));
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[1])));

// Release the first queue
queue1.force();
queue1.close();

// Fill another six segments.
// This should not open more files.
for (int i = 1; i < 7; i++) {
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2])));
queue2.enqueue(ByteBuffer.wrap(generatePayload(queueSegmentSize / 2 - LENGTH_HEADER_SIZE, bytes[i * 2 + 1])));
}

queue2.force();
queuePool.close();

// Verify
// checkpoint contains che correct order, (0,0), (0, 4194304)
Properties checkpointProps3 = loadCheckpointFile(tempQueueFolder);

// We should now have segments 6, 5, 4, 3, 2, 1, 8
// or, 2.2, 2.1, 1.4, 1.3, 1.2, 1.1, 2.4
final String segmentRefs = checkpointProps3.getProperty("queues.0.segments");
assertEquals("(1, 4096), (1, 0), (0, 12288), (0, 8192), (0, 4096), (0, 0), (1, 12288)", segmentRefs);
}

private Properties loadCheckpointFile(Path dir) throws IOException {
final Path checkpointPath = dir.resolve("checkpoint.properties");
final FileReader fileReader = new FileReader(checkpointPath.toFile());
Expand Down
Loading