diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java index a2dbf60faa9..dc56172f583 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java @@ -22,6 +22,7 @@ import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -53,8 +54,15 @@ public void setUp() { when(brokerController.getMessageStoreConfig()).thenReturn(new MessageStoreConfig()); when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig()); offsetManager = new RocksDBConsumerOffsetManager(brokerController); + offsetManager.load(); } + @After + public void tearDown() { + if (offsetManager != null) { + offsetManager.stop(); + } + } @Test public void testQueryOffsetForNonLmq() { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java index 30123dc49a5..1e6b8d04578 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.offset; +import java.io.File; import java.io.IOException; import java.nio.file.Paths; import java.util.HashMap; @@ -31,6 +32,7 @@ import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.StoreType; @@ -43,6 +45,7 @@ import org.apache.rocketmq.store.queue.RocksDBConsumeQueueStore; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.awaitility.Awaitility; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -95,6 +98,32 @@ public void init() throws IOException { rocksdbConsumerOffsetManager = new RocksDBConsumerOffsetManager(brokerController); } + @After + public void tearDown() { + if (notToBeExecuted()) { + return; + } + + if (rocksdbConsumerOffsetManager != null) { + rocksdbConsumerOffsetManager.stop(); + } + + if (consumerOffsetManager != null) { + consumerOffsetManager.stop(); + } + + if (defaultMessageStore != null) { + ConsumeQueueStoreInterface cqStore = defaultMessageStore.getQueueStore(); + cqStore.shutdown(); + cqStore.destroy(false); + + defaultMessageStore.shutdown(); + defaultMessageStore.destroy(); + } + + UtilAll.deleteFile(new File(basePath)); + } + @Test public void testTransferOffset() { if (notToBeExecuted()) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java index 4fbec13860b..a809c3f3832 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java @@ -78,6 +78,15 @@ public void destroy() { if (notToBeExecuted()) { return; } + + if (rocksDBSubscriptionGroupManager != null) { + rocksDBSubscriptionGroupManager.stop(); + } + + if (jsonSubscriptionGroupManager != null) { + jsonSubscriptionGroupManager.stop(); + } + Path pathToBeDeleted = Paths.get(basePath); try { @@ -93,9 +102,6 @@ public void destroy() { } catch (IOException e) { // ignore } - if (rocksDBSubscriptionGroupManager != null) { - rocksDBSubscriptionGroupManager.stop(); - } } @@ -282,7 +288,7 @@ public void theSecondTimeLoadJsonSubscriptionGroupManager() { } @Test - public void theSecondTimeLoadRocksdbTopicConfigManager() { + public void theSecondTimeLoadRocksdbSubscriptionGroupManager() { if (notToBeExecuted()) { return; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java index fa3ef95f55f..64ca96ed8e7 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.topic; +import java.io.File; import java.nio.file.Paths; import java.util.HashMap; import java.util.List; @@ -29,6 +30,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicAttributes; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.Attribute; import org.apache.rocketmq.common.attribute.BooleanAttribute; import org.apache.rocketmq.common.attribute.CQType; @@ -87,6 +89,8 @@ public void destroy() { if (topicConfigManager != null) { topicConfigManager.stop(); } + + UtilAll.deleteFile(new File(basePath)); } @Test diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java index e925ed4bd8a..b9fc76703f5 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java @@ -79,6 +79,15 @@ public void destroy() { if (notToBeExecuted()) { return; } + + if (rocksdbTopicConfigManager != null) { + rocksdbTopicConfigManager.stop(); + } + + if (jsonTopicConfigManager != null) { + jsonTopicConfigManager.stop(); + } + Path pathToBeDeleted = Paths.get(basePath); try { Files.walk(pathToBeDeleted) @@ -93,9 +102,6 @@ public void destroy() { } catch (IOException e) { // ignore } - if (rocksdbTopicConfigManager != null) { - rocksdbTopicConfigManager.stop(); - } } public void initRocksdbTopicConfigManager() { diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java index b1e12d49468..bcdabf949ab 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java @@ -22,9 +22,12 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.queue.offset.OffsetEntryType; import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -62,6 +65,8 @@ public class RocksDBConsumeQueueOffsetTableTest { private static String topicName; + private RocksIterator iterator; + @BeforeClass public static void initDB() throws IOException, RocksDBException { TemporaryFolder tempFolder = new TemporaryFolder(); @@ -98,17 +103,30 @@ public static void initDB() throws IOException, RocksDBException { @AfterClass public static void tearDownDB() throws RocksDBException { - db.closeE(); - RocksDB.destroyDB(dbPath.getAbsolutePath(), new Options()); + if (db != null) { + db.closeE(); + } + + if (dbPath != null) { + RocksDB.destroyDB(dbPath.getAbsolutePath(), new Options()); + UtilAll.deleteFile(dbPath); + } } @Before public void setUp() { - RocksIterator iterator = db.newIterator(); + iterator = db.newIterator(); Mockito.doReturn(iterator).when(rocksDBStorage).seekOffsetCF(); offsetTable = new RocksDBConsumeQueueOffsetTable(consumeQueueTable, rocksDBStorage, messageStore); } + @After + public void tearDown() { + if (iterator != null) { + iterator.close(); + } + } + /** * Verify forEach can expand key-buffer properly and works well for long topic names. *