From 32253e72b006cb4326f802d2f312856d4395c001 Mon Sep 17 00:00:00 2001 From: shixiaoxiao Date: Tue, 23 Sep 2025 21:58:02 +0800 Subject: [PATCH] fix: Broker startup failed when the consumequeue last mappfile was empty --- .../apache/rocketmq/store/ConsumeQueue.java | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 02f90cef1df..614c96154f4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -565,18 +565,24 @@ public void correctMinOffset(long minCommitLogOffset) { SelectMappedBufferResult lastRecord = null; try { int maxReadablePosition = lastMappedFile.getReadPosition(); - lastRecord = lastMappedFile.selectMappedBuffer(maxReadablePosition - ConsumeQueue.CQ_STORE_UNIT_SIZE, - ConsumeQueue.CQ_STORE_UNIT_SIZE); - if (null != lastRecord) { - ByteBuffer buffer = lastRecord.getByteBuffer(); - long commitLogOffset = buffer.getLong(); - if (commitLogOffset < minCommitLogOffset) { - // Keep the largest known consume offset, even if this consume-queue contains no valid entries at - // all. Let minLogicOffset point to a future slot. - this.minLogicOffset = lastMappedFile.getFileFromOffset() + maxReadablePosition; - log.info("ConsumeQueue[topic={}, queue-id={}] contains no valid entries. Min-offset is assigned as: {}.", - topic, queueId, getMinOffsetInQueue()); - return; + /* + if maxReadablePosition is less than ConsumeQueue.CQ_STORE_UNIT_SIZE, + it means the last file is empty and we need skip it otherwise selectMappedBuffer will throw IllegalArgumentException + */ + if (maxReadablePosition >= ConsumeQueue.CQ_STORE_UNIT_SIZE) { + lastRecord = lastMappedFile.selectMappedBuffer(maxReadablePosition - ConsumeQueue.CQ_STORE_UNIT_SIZE, + ConsumeQueue.CQ_STORE_UNIT_SIZE); + if (null != lastRecord) { + ByteBuffer buffer = lastRecord.getByteBuffer(); + long commitLogOffset = buffer.getLong(); + if (commitLogOffset < minCommitLogOffset) { + // Keep the largest known consume offset, even if this consume-queue contains no valid entries at + // all. Let minLogicOffset point to a future slot. + this.minLogicOffset = lastMappedFile.getFileFromOffset() + maxReadablePosition; + log.info("ConsumeQueue[topic={}, queue-id={}] contains no valid entries. Min-offset is assigned as: {}.", + topic, queueId, getMinOffsetInQueue()); + return; + } } } } finally {