diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 298e2390864..df694cad6cb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -242,7 +242,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); protected final BrokerController brokerController; protected Set configBlackList = new HashSet<>(); - private final ExecutorService asyncExecuteWorker = new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); + private final ExecutorService asyncExecuteWorker = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, + new SynchronousQueue<>()); + public AdminBrokerProcessor(final BrokerController brokerController) { this.brokerController = brokerController; @@ -778,6 +780,25 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, } } + final Boolean syncDelete = requestHeader.getSyncDelete(); + if (Boolean.TRUE.equals(syncDelete)) { + return doDeleteTopic(topic, true); + } else { + asyncExecuteWorker.execute(() -> { + try { + doDeleteTopic(topic, syncDelete); + } catch (Exception e) { + LOGGER.error(String.format("delete topic %s failed for ", topic), e); + } + }); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + } + + private RemotingCommand doDeleteTopic(String topic,boolean isSyncDelete) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); List topicsToClean = new ArrayList<>(); topicsToClean.add(topic); @@ -798,7 +819,10 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, try { for (String topicToClean : topicsToClean) { // delete topic - deleteTopicInBroker(topicToClean); + deleteTopicInBroker(topicToClean,isSyncDelete); + } + if (!isSyncDelete) { + batchSyncMetaData(); } } catch (Throwable t) { return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); @@ -808,15 +832,21 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, return response; } - private void deleteTopicInBroker(String topic) { - this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); - this.brokerController.getTopicQueueMappingManager().delete(topic); + private void deleteTopicInBroker(String topic, boolean isSyncDelete) { + this.brokerController.getTopicConfigManager().deleteTopicConfig(topic, isSyncDelete); + this.brokerController.getTopicQueueMappingManager().delete(topic, isSyncDelete); this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic); this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic); this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic)); this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().removeTimingCount(topic); } + private void batchSyncMetaData() { + this.brokerController.getTopicConfigManager().persist(); + this.brokerController.getTopicQueueMappingManager().persist(); + this.brokerController.getConsumerOffsetManager().persist(); + } + private RemotingCommand getUnknownCmdResponse(ChannelHandlerContext ctx, RemotingCommand request) { String error = " request type " + request.getCode() + " not supported"; final RemotingCommand response = diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index ed46dfdc49c..46095ce4993 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -595,17 +595,23 @@ public boolean isOrderTopic(final String topic) { } } - public void deleteTopicConfig(final String topic) { + public void deleteTopicConfig(final String topic, boolean isSync) { TopicConfig old = removeTopicConfig(topic); if (old != null) { log.info("delete topic config OK, topic: {}", old); updateDataVersion(); - this.persist(); + if (isSync) { + this.persist(); + } } else { log.warn("delete topic config failed, topic: {} not exists", topic); } } + public void deleteTopicConfig(final String topic) { + deleteTopicConfig(topic, true); + } + public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() { TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 4b0714decb6..6bd696f08df 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -136,17 +136,23 @@ public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean f } - public void delete(final String topic) { + public void delete(final String topic, boolean isSync) { TopicQueueMappingDetail old = this.topicQueueMappingTable.remove(topic); if (old != null) { log.info("delete topic queue mapping OK, static topic queue mapping: {}", old); this.dataVersion.nextVersion(); - this.persist(); + if (isSync) { + this.persist(); + } } else { log.warn("delete topic queue mapping failed, static topic: {} not exists", topic); } } + public void delete(final String topic) { + delete(topic,true); + } + public TopicQueueMappingDetail getTopicQueueMapping(String topic) { return topicQueueMappingTable.get(topic); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java index ea66ed94c7e..4f2738fc573 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteTopicRequestHeader.java @@ -34,6 +34,7 @@ public class DeleteTopicRequestHeader extends TopicRequestHeader { @CFNotNull @RocketMQResource(ResourceType.TOPIC) private String topic; + private Boolean syncDelete = true; @Override public void checkFields() throws RemotingCommandException { @@ -46,4 +47,12 @@ public String getTopic() { public void setTopic(String topic) { this.topic = topic; } + + public Boolean getSyncDelete() { + return syncDelete; + } + + public void setSyncDelete(Boolean syncDelete) { + this.syncDelete = syncDelete; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 4d13acf225d..b736123642e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1374,7 +1374,7 @@ public long now() { * dispatched to consume queue. */ @Override - public int deleteTopics(final Set deleteTopics) { + public synchronized int deleteTopics(final Set deleteTopics) { if (deleteTopics == null || deleteTopics.isEmpty()) { return 0; } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index c272a302344..27852f84d22 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -16,7 +16,8 @@ */ package org.apache.rocketmq.store.stats; -import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -140,7 +141,7 @@ public class BrokerStatsManager { private ScheduledExecutorService accountExecutor; private ScheduledExecutorService cleanResourceExecutor; - private final HashMap statsTable = new HashMap<>(); + private final Map statsTable = new ConcurrentHashMap<>(); private final String clusterName; private final boolean enableQueueStat; private MomentStatsItemSet momentStatsItemSetFallSize;