diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index 8a5c29e6bc5..66e14fcc05d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -181,7 +181,7 @@ public void putMessagePositionInfoWrapper(DispatchRequest dispatchRequest) { @Override public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) { - ConsumeQueueInterface logic = findOrCreateConsumeQueue(topic, queueId); + ConsumeQueueInterface logic = getConsumeQueue(topic, queueId); if (logic != null) { long resultOffset = logic.getOffsetInQueueByTime(timestamp, boundaryType); // Make sure the result offset is in valid range. @@ -193,7 +193,14 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo } private FileQueueLifeCycle getLifeCycle(String topic, int queueId) { - return findOrCreateConsumeQueue(topic, queueId); + return getLifeCycle(topic, queueId, true); + } + + private FileQueueLifeCycle getLifeCycle(String topic, int queueId, boolean create) { + if (create) { + return findOrCreateConsumeQueue(topic, queueId); + } + return getConsumeQueue(topic, queueId); } public boolean load(ConsumeQueueInterface consumeQueue) { @@ -297,12 +304,12 @@ public long getMaxPhyOffsetInConsumeQueue() { @Override public long getMinOffsetInQueue(String topic, int queueId) { - ConsumeQueueInterface logic = findOrCreateConsumeQueue(topic, queueId); + ConsumeQueueInterface logic = getConsumeQueue(topic, queueId); if (logic != null) { return logic.getMinOffsetInQueue(); } - return -1; + return -1L; } public void checkSelf(ConsumeQueueInterface consumeQueue) { @@ -320,7 +327,10 @@ public void checkSelf() { } public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); + FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false); + if (fileQueueLifeCycle == null) { + return false; + } return fileQueueLifeCycle.flush(flushLeastPages); } @@ -334,17 +344,26 @@ public void flush() throws StoreException { @Override public void destroy(ConsumeQueueInterface consumeQueue) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); + FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false); + if (fileQueueLifeCycle == null) { + return; + } fileQueueLifeCycle.destroy(); } public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long minCommitLogPos) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); + FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false); + if (fileQueueLifeCycle == null) { + return 0; + } return fileQueueLifeCycle.deleteExpiredFile(minCommitLogPos); } public void truncateDirtyLogicFiles(ConsumeQueueInterface consumeQueue, long phyOffset) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); + FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false); + if (fileQueueLifeCycle == null) { + return; + } fileQueueLifeCycle.truncateDirtyLogicFiles(phyOffset); } @@ -360,12 +379,19 @@ public void cleanSwappedMap(ConsumeQueueInterface consumeQueue, long forceCleanS } public boolean isFirstFileAvailable(ConsumeQueueInterface consumeQueue) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); + FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false); + if (fileQueueLifeCycle == null) { + return false; + } + return fileQueueLifeCycle.isFirstFileAvailable(); } public boolean isFirstFileExist(ConsumeQueueInterface consumeQueue) { - FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); + FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false); + if (fileQueueLifeCycle == null) { + return false; + } return fileQueueLifeCycle.isFirstFileExist(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index cf511b1bccc..1212e7319c5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -551,6 +551,7 @@ public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId) @Override public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) { + // since rocksdb cq use lazy loading, we need to create it if not exist return findOrCreateConsumeQueue(topic, queueId); } diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java index 2ca21b265ef..71197b7cd53 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java @@ -130,9 +130,9 @@ public void testIterator() throws Exception { //The initial min max offset, before and after the creation of consume queue Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, queueId)); - Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, queueId)); + Assert.assertEquals(-1, messageStore.getMinOffsetInQueue(topic, queueId)); - ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, queueId); + ConsumeQueueInterface consumeQueue = messageStore.findConsumeQueue(topic, queueId); Assert.assertEquals(CQType.SimpleCQ, consumeQueue.getCQType()); Assert.assertEquals(0, consumeQueue.getMaxOffsetInQueue()); Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue()); @@ -149,7 +149,7 @@ public void testIterator() throws Exception { checkCQ(consumeQueue, msgNum, msgSize); CombineConsumeQueueStore combineConsumeQueueStore = (CombineConsumeQueueStore) messageStore.getQueueStore(); - ConsumeQueueInterface rocksDBConsumeQueue = combineConsumeQueueStore.getRocksDBConsumeQueueStore().getConsumeQueue(topic, queueId); + ConsumeQueueInterface rocksDBConsumeQueue = combineConsumeQueueStore.getRocksDBConsumeQueueStore().findOrCreateConsumeQueue(topic, queueId); Assert.assertEquals(CQType.RocksDBCQ, rocksDBConsumeQueue.getCQType()); Assert.assertEquals(msgNum, rocksDBConsumeQueue.getMaxOffsetInQueue()); checkCQ(rocksDBConsumeQueue, msgNum, msgSize); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java index cae009ab66e..b91ad4c9850 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java @@ -139,9 +139,9 @@ public void testIterator() throws Exception { String topic = UUID.randomUUID().toString(); //The initial min max offset, before and after the creation of consume queue Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, 0)); - Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0)); + Assert.assertEquals(-1, messageStore.getMinOffsetInQueue(topic, 0)); - ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, 0); + ConsumeQueueInterface consumeQueue = messageStore.findConsumeQueue(topic, 0); Assert.assertEquals(CQType.SimpleCQ, consumeQueue.getCQType()); Assert.assertEquals(0, consumeQueue.getMaxOffsetInQueue()); Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue()); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java index 38176c83ede..ae092521bce 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java @@ -172,9 +172,9 @@ public void testBatchSend_SysOuterBatch() throws Exception { Assert.assertEquals(8, brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums()); Assert.assertEquals(8, brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums()); Assert.assertEquals(8, brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums()); - Assert.assertEquals(0, brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); - Assert.assertEquals(0, brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); - Assert.assertEquals(0, brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(-1, brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(-1, brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(-1, brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); Assert.assertEquals(0, brokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0)); Assert.assertEquals(0, brokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0)); Assert.assertEquals(0, brokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));