diff --git a/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/domain/Topic.java b/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/domain/Topic.java index cf63a8b22..1dae65b8a 100644 --- a/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/domain/Topic.java +++ b/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/domain/Topic.java @@ -121,6 +121,8 @@ public String toString() { public static class TopicPolicy implements Serializable { private Long storeMaxTime; private Boolean storeCleanKeepUnconsumed; + private Integer produceArchiveTps = -1; + private Integer consumeArchiveTps = -1; private Map params; public Long getStoreMaxTime() { @@ -139,6 +141,22 @@ public Boolean getStoreCleanKeepUnconsumed() { return storeCleanKeepUnconsumed; } + public Integer getProduceArchiveTps() { + return produceArchiveTps; + } + + public void setProduceArchiveTps(Integer produceArchiveTps) { + this.produceArchiveTps = produceArchiveTps; + } + + public Integer getConsumeArchiveTps() { + return consumeArchiveTps; + } + + public void setConsumeArchiveTps(Integer consumeArchiveTps) { + this.consumeArchiveTps = consumeArchiveTps; + } + public void setParams(Map params) { this.params = params; } diff --git a/joyqueue-common/joyqueue-toolkit/src/main/java/org/joyqueue/toolkit/serialize/AbstractSerializer.java b/joyqueue-common/joyqueue-toolkit/src/main/java/org/joyqueue/toolkit/serialize/AbstractSerializer.java index 38dfc0e23..92ceb261f 100644 --- a/joyqueue-common/joyqueue-toolkit/src/main/java/org/joyqueue/toolkit/serialize/AbstractSerializer.java +++ b/joyqueue-common/joyqueue-toolkit/src/main/java/org/joyqueue/toolkit/serialize/AbstractSerializer.java @@ -229,6 +229,18 @@ public static Map toStringMap(final String text) throws IOExcept return new HashMap(properties); } + public static String readString(final byte[] bytes) { + return bytes == null ? null : readString(bytes, 0, bytes.length); + } + + public static String readString(byte[] bytes, int offset, int length) { + if (bytes == null) { + return null; + } else { + return length == 0 ? "" : new String(bytes, offset, length, Charset.forName("UTF-8")); + } + } + /** * 读取字符串,字符长度<=255 * diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/pom.xml b/joyqueue-console/joyqueue-data/joyqueue-data-service/pom.xml index 18c2b09a8..aff86852f 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/pom.xml +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/pom.xml @@ -112,9 +112,13 @@ commons-beanutils + commons-net + commons-net + + org.joyqueue joyqueue-nsr-core @@ -124,10 +128,10 @@ joyqueue-archive-api - + org.joyqueue joyqueue-client-all diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/nsr/impl/ConsumerNameServerServiceImpl.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/nsr/impl/ConsumerNameServerServiceImpl.java index e1dd9b1a2..60c39ec39 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/nsr/impl/ConsumerNameServerServiceImpl.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/nsr/impl/ConsumerNameServerServiceImpl.java @@ -17,6 +17,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import org.apache.commons.lang3.StringUtils; import org.joyqueue.convert.CodeConverter; import org.joyqueue.convert.NsrConsumerConverter; import org.joyqueue.domain.ClientType; @@ -26,7 +27,6 @@ import org.joyqueue.nsr.ConsumerNameServerService; import org.joyqueue.nsr.NameServerBase; import org.joyqueue.nsr.model.ConsumerQuery; -import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Service; import java.util.List; diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/ArchiveServiceImpl.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/ArchiveServiceImpl.java index 7911d5d26..d2f2b859d 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/ArchiveServiceImpl.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/ArchiveServiceImpl.java @@ -24,10 +24,10 @@ import org.joyqueue.model.domain.User; import org.joyqueue.model.query.QApplication; import org.joyqueue.model.query.QArchive; -import org.joyqueue.server.archive.store.QueryCondition; import org.joyqueue.server.archive.store.api.ArchiveStore; import org.joyqueue.server.archive.store.model.ConsumeLog; import org.joyqueue.server.archive.store.model.SendLog; +import org.joyqueue.server.archive.store.query.QueryCondition; import org.joyqueue.service.ApplicationService; import org.joyqueue.service.ArchiveService; import org.joyqueue.service.TopicService; diff --git a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/archive/ArchiveCommand.java b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/archive/ArchiveCommand.java index 05ebb2ca1..98486e26e 100644 --- a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/archive/ArchiveCommand.java +++ b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/archive/ArchiveCommand.java @@ -17,11 +17,11 @@ import com.alibaba.fastjson.JSON; import org.joyqueue.broker.archive.ArchiveUtils; -import org.joyqueue.broker.buffer.Serializer; +import org.joyqueue.util.serializer.Serializer; import org.joyqueue.exception.ServiceException; import org.joyqueue.handler.error.ErrorCode; import org.joyqueue.message.SourceType; -import org.joyqueue.server.archive.store.HBaseSerializer; +import org.joyqueue.server.archive.store.utils.ArchiveSerializer; import org.joyqueue.server.retry.model.RetryMessageModel; import org.joyqueue.handler.Constants; import org.joyqueue.service.MessagePreviewService; @@ -45,7 +45,6 @@ import com.jd.laf.web.vertx.response.Responses; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; -import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; @@ -200,10 +199,10 @@ public BrokerMessage filterBrokerMessage(BrokerMessage brokerMessage, SendLog se } for(BrokerMessage m:msgs){ String msgId=ArchiveUtils.messageId(brokerMessage.getTopic(),m.getPartition(),m.getMsgIndexNo()); - byte[] msgIdMd5Bytes=HBaseSerializer.md5(msgId,null); + byte[] msgIdMd5Bytes= ArchiveSerializer.md5(msgId,null); if(logger.isDebugEnabled()) { logger.debug("current message business id {},message id {},md5 length {},base 64 bytes {},hex {}", m.getBusinessId(), msgId, msgIdMd5Bytes.length, - Base64.getEncoder().encodeToString(msgIdMd5Bytes), HBaseSerializer.byteArrayToHexStr(msgIdMd5Bytes)); + Base64.getEncoder().encodeToString(msgIdMd5Bytes), ArchiveSerializer.byteArrayToHexStr(msgIdMd5Bytes)); } if(Arrays.equals(msgIdMd5Bytes,sendLog.getBytesMessageId())){ return m; @@ -221,7 +220,7 @@ public String preview(BrokerMessage brokerMessage,String messageType){ return messagePreviewService.preview(messageType, brokerMessage.getDecompressedBody()); } catch (Throwable e) { logger.error("parse error",e); - return Bytes.toString(brokerMessage.getDecompressedBody()); + return Serializer.readString(brokerMessage.getDecompressedBody()); } } diff --git a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/broker/BrokerCommand.java b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/broker/BrokerCommand.java index 8bd6b2490..f324a2178 100644 --- a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/broker/BrokerCommand.java +++ b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/broker/BrokerCommand.java @@ -21,6 +21,7 @@ import com.jd.laf.web.vertx.annotation.QueryParam; import com.jd.laf.web.vertx.response.Response; import com.jd.laf.web.vertx.response.Responses; +import org.apache.commons.net.telnet.TelnetClient; import org.joyqueue.handler.annotation.PageQuery; import org.joyqueue.handler.error.ConfigException; import org.joyqueue.handler.routing.command.NsrCommandSupport; @@ -29,7 +30,6 @@ import org.joyqueue.model.domain.Broker; import org.joyqueue.model.query.QBroker; import org.joyqueue.service.BrokerService; -import org.apache.commons.net.telnet.TelnetClient; import static org.joyqueue.handler.Constants.ID; diff --git a/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/QueryCondition.java b/joyqueue-server/joyqueue-archive/joyqueue-archive-api/src/main/java/org/joyqueue/server/archive/store/query/QueryCondition.java similarity index 75% rename from joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/QueryCondition.java rename to joyqueue-server/joyqueue-archive/joyqueue-archive-api/src/main/java/org/joyqueue/server/archive/store/query/QueryCondition.java index 8ed2a420a..4e8e99927 100644 --- a/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/QueryCondition.java +++ b/joyqueue-server/joyqueue-archive/joyqueue-archive-api/src/main/java/org/joyqueue/server/archive/store/query/QueryCondition.java @@ -13,9 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.joyqueue.server.archive.store; +package org.joyqueue.server.archive.store.query; import org.joyqueue.server.archive.store.model.Query; +import org.joyqueue.server.archive.store.utils.ArchiveSerializer; + +import java.util.Arrays; /** * Created by chengzhiliang on 2018/12/4. @@ -72,10 +75,21 @@ public byte[] getStartRowKeyByteArr() { } public void setStartRowKeyByteArr(String startRowKeyByteArr) { - byte[] bytes = HBaseSerializer.hexStrToByteArray(startRowKeyByteArr); + byte[] bytes = ArchiveSerializer.hexStrToByteArray(startRowKeyByteArr); this.startRowKeyByteArr = bytes; } + @Override + public String toString() { + return "QueryCondition{" + + "startRowKey=" + startRowKey + + ", stopRowKey=" + stopRowKey + + ", count=" + count + + ", rowKey=" + rowKey + + ", startRowKeyByteArr=" + Arrays.toString(startRowKeyByteArr) + + '}'; + } + /** * 查询RowKey */ @@ -117,5 +131,14 @@ public void setMessageId(String messageId) { this.messageId = messageId; } + @Override + public String toString() { + return "RowKey{" + + "topic='" + topic + '\'' + + ", time=" + time + + ", businessId='" + businessId + '\'' + + ", messageId='" + messageId + '\'' + + '}'; + } } } diff --git a/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseSerializer.java b/joyqueue-server/joyqueue-archive/joyqueue-archive-api/src/main/java/org/joyqueue/server/archive/store/utils/ArchiveSerializer.java similarity index 92% rename from joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseSerializer.java rename to joyqueue-server/joyqueue-archive/joyqueue-archive-api/src/main/java/org/joyqueue/server/archive/store/utils/ArchiveSerializer.java index d70ca96e6..a4da2c6dd 100644 --- a/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseSerializer.java +++ b/joyqueue-server/joyqueue-archive/joyqueue-archive-api/src/main/java/org/joyqueue/server/archive/store/utils/ArchiveSerializer.java @@ -1,32 +1,15 @@ -/** - * Copyright 2019 The JoyQueue Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.joyqueue.server.archive.store; +package org.joyqueue.server.archive.store.utils; import org.joyqueue.server.archive.store.model.ConsumeLog; import org.joyqueue.server.archive.store.model.SendLog; import org.joyqueue.toolkit.lang.Pair; import org.joyqueue.toolkit.security.Md5; + import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.security.GeneralSecurityException; -/** - * Created by chengzhiliang on 2018/12/12. - */ -public class HBaseSerializer { +public class ArchiveSerializer { public static ConsumeLog readConsumeLog(Pair pair) { ConsumeLog log = new ConsumeLog(); @@ -247,7 +230,7 @@ public static SendLog readSendLog4BizId(Pair pair) { * **/ public static byte[] md5(String content,byte[] key) throws GeneralSecurityException { - return Md5.INSTANCE.encrypt(content.getBytes(Charset.forName("utf-8")), key); + return Md5.INSTANCE.encrypt(content.getBytes(Charset.forName("utf-8")), key); } public static String byteArrayToHexStr(byte[] byteArray) { @@ -278,4 +261,23 @@ public static byte[] hexStrToByteArray(String str) { } return byteArray; } + + public static byte[] reverse(byte[] byteArray) { + if (byteArray == null || byteArray.length == 0) { + return byteArray; + } + byte[] reverseArray = new byte[byteArray.length]; + for (int i = 0; i < byteArray.length; i++) { + reverseArray[i] = byteArray[byteArray.length - i - 1]; + } + return reverseArray; + } + + public static byte[] reverse(ByteBuffer buffer) { + return reverse(buffer.array()); + } + + public static String reverse(String reverseStr) { + return new StringBuffer(reverseStr).reverse().toString(); + } } diff --git a/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseStore.java b/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseStore.java index fa9cc9d88..2d56baafa 100644 --- a/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseStore.java +++ b/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseStore.java @@ -22,6 +22,8 @@ import org.joyqueue.server.archive.store.api.ArchiveStore; import org.joyqueue.monitor.PointTracer; import org.joyqueue.server.archive.store.model.*; +import org.joyqueue.server.archive.store.query.QueryCondition; +import org.joyqueue.server.archive.store.utils.ArchiveSerializer; import org.joyqueue.toolkit.lang.Pair; import org.joyqueue.toolkit.network.IpUtil; import org.joyqueue.toolkit.security.Md5; @@ -118,7 +120,7 @@ public void putConsumeLog(List consumeLogList, PointTracer tracer) t int appId = topicAppMapping.getAppId(app); consumeLog.setAppId(appId); - Pair pair = HBaseSerializer.convertConsumeLogToKVBytes(consumeLog); + Pair pair = ArchiveSerializer.convertConsumeLogToKVBytes(consumeLog); logList.add(pair); } @@ -145,10 +147,10 @@ public void putSendLog(List sendLogList, PointTracer tracer) throws Joy log.setTopicId(topicId); log.setAppId(appId); - Pair pair = HBaseSerializer.convertSendLogToKVBytes(log); + Pair pair = ArchiveSerializer.convertSendLogToKVBytes(log); logList.add(pair); - Pair pair4BizId = HBaseSerializer.convertSendLogToKVBytes4BizId(log); + Pair pair4BizId = ArchiveSerializer.convertSendLogToKVBytes4BizId(log); logList.add(pair4BizId); } @@ -233,13 +235,13 @@ public List scanSendLog(Query query) throws JoyQueueException { for (Pair pair : scan) { SendLog log; if (hasBizId) { - log = HBaseSerializer.readSendLog4BizId(pair); + log = ArchiveSerializer.readSendLog4BizId(pair); } else { - log = HBaseSerializer.readSendLog(pair); + log = ArchiveSerializer.readSendLog(pair); } log.setClientIpStr(toIpString(log.getClientIp())); - log.setRowKeyStart(HBaseSerializer.byteArrayToHexStr(pair.getKey())); + log.setRowKeyStart(ArchiveSerializer.byteArrayToHexStr(pair.getKey())); String topicName = topicAppMapping.getTopicName(log.getTopicId()); log.setTopic(topicName); @@ -412,13 +414,13 @@ public SendLog getOneSendLog(Query query) throws JoyQueueException { allocate.putInt(topicAppMapping.getTopicId(rowKey.getTopic())); allocate.putLong(rowKey.getTime()); allocate.put(Md5.INSTANCE.encrypt(rowKey.getBusinessId().getBytes(Charset.forName("utf-8")), null)); - allocate.put(HBaseSerializer.hexStrToByteArray(rowKey.getMessageId())); + allocate.put(ArchiveSerializer.hexStrToByteArray(rowKey.getMessageId())); // rowKey byte[] bytesRowKey = allocate.array(); Pair bytes = hBaseClient.getKV(namespace, sendLogTable, cf, col, bytesRowKey); - SendLog log = HBaseSerializer.readSendLog(bytes); + SendLog log = ArchiveSerializer.readSendLog(bytes); StringBuilder clientIp = new StringBuilder(); IpUtil.toAddress(log.getClientIp(), clientIp); @@ -452,7 +454,7 @@ public List scanConsumeLog(String messageId, Integer count) throws J scanParameters.setCf(cf); scanParameters.setCol(col); - byte[] messageIdBytes = HBaseSerializer.hexStrToByteArray(messageId); + byte[] messageIdBytes = ArchiveSerializer.hexStrToByteArray(messageId); scanParameters.setStartRowKey(messageIdBytes); ByteBuffer bytebuffer = ByteBuffer.allocate(messageIdBytes.length + 1); @@ -465,8 +467,8 @@ public List scanConsumeLog(String messageId, Integer count) throws J List> scan = hBaseClient.scan(namespace, scanParameters); for (Pair pair : scan) { - ConsumeLog log = HBaseSerializer.readConsumeLog(pair); - log.setMessageId(HBaseSerializer.byteArrayToHexStr(log.getBytesMessageId())); + ConsumeLog log = ArchiveSerializer.readConsumeLog(pair); + log.setMessageId(ArchiveSerializer.byteArrayToHexStr(log.getBytesMessageId())); StringBuilder clientIp = new StringBuilder(); IpUtil.toAddress(log.getClientIp(), clientIp); diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveConfig.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveConfig.java index 3e6854900..52b4d807b 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveConfig.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveConfig.java @@ -15,6 +15,7 @@ */ package org.joyqueue.broker.archive; +import org.apache.commons.lang3.StringUtils; import org.joyqueue.config.BrokerConfigKey; import org.joyqueue.toolkit.config.Property; import org.joyqueue.toolkit.config.PropertySupplier; @@ -25,6 +26,9 @@ * Created by chengzhiliang on 2018/12/6. */ public class ArchiveConfig { + public static final String LOG_DETAIL_PRODUCE_PREFIX = "produce."; + public static final String LOG_DETAIL_CONSUME_PREFIX = "consume."; + private static final String ARCHIVE_PATH ="/archive/"; private PropertySupplier propertySupplier; private String archivePath; @@ -60,6 +64,13 @@ public void setPath(String path) { } } + public boolean getLogDetail(String archiveType, String brokerId) { + return (boolean) PropertySupplier.getValue(propertySupplier, + ArchiveConfigKey.ARCHIVE_TRACE_LOG.getName() + archiveType + brokerId, + ArchiveConfigKey.ARCHIVE_TRACE_LOG.getType(), + ArchiveConfigKey.ARCHIVE_TRACE_LOG.getValue()); + } + public int getConsumeBatchNum() { return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.CONSUME_BATCH_NUM); } @@ -91,6 +102,10 @@ public String getNamespace() { return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_STORE_NAMESPACE); } + public int getStoreFialedRetryCount() { + return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_STORE_RETRY_COUNT); + } + public String getTracerType() { return PropertySupplier.getValue(propertySupplier, BrokerConfigKey.TRACER_TYPE); } @@ -102,4 +117,40 @@ public boolean isReamingEnable() { public boolean isBacklogEnable() { return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_BACKLOG_ENABLE); } + + public int getLogRetainDuration() { + return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_LOG_RETAIN_DURATION); + } + + public int getProduceArchiveRate() { + return propertySupplier.getValue(ArchiveConfigKey.ARCHIVE_PRODUCE_RATE); + } + + public int getConsumeArchiveRate() { + return propertySupplier.getValue(ArchiveConfigKey.ARCHIVE_CONSUME_RATE); + } + + public int getProduceArchiveRate(String topic, String app) { + if (StringUtils.isEmpty(app)) { + return PropertySupplier.getValue(propertySupplier,ArchiveConfigKey.ARCHIVE_PRODUCE_RATE_PREFIX.getName() + String.format("%s",topic), + ArchiveConfigKey.ARCHIVE_PRODUCE_RATE_PREFIX.getType(), ArchiveConfigKey.ARCHIVE_PRODUCE_RATE_PREFIX.getValue() + ); + } else { + return PropertySupplier.getValue(propertySupplier,ArchiveConfigKey.ARCHIVE_PRODUCE_RATE_PREFIX.getName() + String.format("%s.%s",topic,app), + ArchiveConfigKey.ARCHIVE_PRODUCE_RATE_PREFIX.getType(), ArchiveConfigKey.ARCHIVE_PRODUCE_RATE_PREFIX.getValue() + ); + } + } + + public int getConsumeArchiveRate(String topic, String app) { + if (StringUtils.isEmpty(app)) { + return PropertySupplier.getValue(propertySupplier,ArchiveConfigKey.ARCHIVE_CONSUME_RATE_PREFIX.getName() + String.format("%s",topic), + ArchiveConfigKey.ARCHIVE_CONSUME_RATE_PREFIX.getType(), ArchiveConfigKey.ARCHIVE_CONSUME_RATE_PREFIX.getValue() + ); + } else { + return PropertySupplier.getValue(propertySupplier,ArchiveConfigKey.ARCHIVE_CONSUME_RATE_PREFIX.getName() + String.format("%s.%s",topic,app), + ArchiveConfigKey.ARCHIVE_CONSUME_RATE_PREFIX.getType(), ArchiveConfigKey.ARCHIVE_CONSUME_RATE_PREFIX.getValue() + ); + } + } } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveConfigKey.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveConfigKey.java index 370a43811..0a05a9799 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveConfigKey.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveConfigKey.java @@ -25,15 +25,21 @@ public enum ArchiveConfigKey implements PropertyDef { CONSUME_BATCH_NUM("archive.consume.batch.num", 1000, Type.INT), CONSUME_WRITE_DELAY("archive.consume.write.delay", 1, Type.INT), - PRODUCE_BATCH_NUM("archive.produce.batch.num", 10, Type.INT), - LOG_QUEUE_SIZE("archive.send.log.queue.size", 1000, Type.INT), - WRITE_THREAD_NUM("archive.thread.num", 5, Type.INT), + PRODUCE_BATCH_NUM("archive.produce.batch.num", 50, Type.INT), + LOG_QUEUE_SIZE("archive.send.log.queue.size", 10000, Type.INT), + WRITE_THREAD_NUM("archive.thread.num", 10, Type.INT), ARCHIVE_SWITCH("archive.switch", false, Type.BOOLEAN), ARCHIVE_THREAD_POOL_QUEUE_SIZE("archive.thread.pool.queue.size", 10, Type.INT), ARCHIVE_STORE_NAMESPACE("archive.store.namespace", "joyqueue", Type.STRING), - ARCHIVE_REAMING_ENABLE("archive.reaming.enable", false, Type.BOOLEAN), + ARCHIVE_STORE_RETRY_COUNT("archive.store.retry.count", 3, Type.INT), + ARCHIVE_REAMING_ENABLE("archive.reaming.enable", true, Type.BOOLEAN), ARCHIVE_BACKLOG_ENABLE("archive.backlog.enable", false, Type.BOOLEAN), - + ARCHIVE_TRACE_LOG("archive.trace.log.", false, Type.BOOLEAN), + ARCHIVE_LOG_RETAIN_DURATION("archive.log.retain.duration", 24, Type.INT), + ARCHIVE_PRODUCE_RATE("archive.rate.produce", -1, Type.INT), + ARCHIVE_PRODUCE_RATE_PREFIX("archive.rate.produce.", -1, Type.INT), + ARCHIVE_CONSUME_RATE("archive.rate.consume", -1, Type.INT), + ARCHIVE_CONSUME_RATE_PREFIX("archive.rate.consume.", -1, Type.INT), ; private String name; diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveManager.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveManager.java index 1b2bbfd99..e558c3367 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveManager.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveManager.java @@ -16,6 +16,7 @@ package org.joyqueue.broker.archive; import org.joyqueue.broker.BrokerContext; +import org.joyqueue.broker.limit.SubscribeRateLimiter; import org.joyqueue.toolkit.lang.Close; import org.joyqueue.toolkit.service.Service; import org.slf4j.Logger; @@ -41,6 +42,9 @@ public class ArchiveManager extends Service { // 归档配置 private ArchiveConfig archiveConfig; + // 归档限流器 + private SubscribeRateLimiter rateLimiterManager; + public ArchiveManager(BrokerContext context) { this.context = context; } @@ -73,11 +77,14 @@ protected void validate() throws Exception { if (archiveConfig == null) { archiveConfig = new ArchiveConfig(context == null ? null : context.getPropertySupplier()); } + if (rateLimiterManager == null) { + rateLimiterManager = new ArchiveRateLimiterManager(context); + } if (sendArchiveService == null) { - this.sendArchiveService = new ProduceArchiveService(archiveConfig, context.getClusterManager(), context.getConsume(), context.getMessageConvertSupport()); + this.sendArchiveService = new ProduceArchiveService(archiveConfig, context, rateLimiterManager); } if (consumeArchiveService == null) { - this.consumeArchiveService = new ConsumeArchiveService(archiveConfig, context.getClusterManager()); + this.consumeArchiveService = new ConsumeArchiveService(archiveConfig, context, rateLimiterManager); } } @@ -134,4 +141,12 @@ public Map getSendBacklogNumByTopic() { } return sendArchiveService.getArchivePosition(); } + + public ArchiveConfig getArchiveConfig() { + return archiveConfig; + } + + public void setArchiveConfig(ArchiveConfig archiveConfig) { + this.archiveConfig = archiveConfig; + } } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveRateLimiterManager.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveRateLimiterManager.java new file mode 100644 index 000000000..61663e0e2 --- /dev/null +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveRateLimiterManager.java @@ -0,0 +1,151 @@ +package org.joyqueue.broker.archive; + +import org.apache.commons.lang3.StringUtils; +import org.joyqueue.broker.BrokerContext; +import org.joyqueue.broker.cluster.ClusterManager; +import org.joyqueue.broker.limit.RateLimiter; +import org.joyqueue.broker.limit.config.LimiterConfig; +import org.joyqueue.broker.limit.support.AbstractSubscribeRateLimiterManager; +import org.joyqueue.domain.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +/** + * @author majun8 + */ +public class ArchiveRateLimiterManager extends AbstractSubscribeRateLimiterManager { + protected static final Logger logger = LoggerFactory.getLogger(ArchiveRateLimiterManager.class); + + private static final int DEFAULT_LIMIT_RATE = -1; + + private ClusterManager clusterManager; + private ArchiveConfig archiveConfig; + + public ArchiveRateLimiterManager(BrokerContext context) { + super(context); + this.clusterManager = context.getClusterManager(); + this.archiveConfig = context.getArchiveManager().getArchiveConfig(); + } + + public int defaultProducerLimitRate(String topic, String app) { + int archiveRate = archiveConfig.getProduceArchiveRate(topic, app); + if(archiveRate <= 0) { + // get broker level retry rate + archiveRate = archiveConfig.getProduceArchiveRate(); + } + return archiveRate; + } + + public int defaultConsumerLimitRate(String topic, String app) { + int archiveRate = archiveConfig.getConsumeArchiveRate(topic, app); + if(archiveRate <= 0) { + // get broker level retry rate + archiveRate = archiveConfig.getConsumeArchiveRate(); + } + return archiveRate; + } + + @Override + public LimiterConfig getLimiterConfig(String topic, String app, Subscription.Type subscribe) { + TopicConfig topicConfig = clusterManager.getTopicConfig(TopicName.parse(topic)); + switch (subscribe) { + case PRODUCTION: + if (topicConfig != null) { + Topic.TopicPolicy policy = topicConfig.getPolicy(); + Integer tps = DEFAULT_LIMIT_RATE; + if (policy != null && policy.getProduceArchiveTps() != null) { + tps = policy.getProduceArchiveTps(); + } else { + tps = defaultProducerLimitRate(topic, app); + } + if (tps <= 0) { + tps = Integer.MAX_VALUE; + } + return new LimiterConfig(tps, -1); + } + break; + case CONSUMPTION: + if (topicConfig != null) { + Topic.TopicPolicy policy = topicConfig.getPolicy(); + Integer tps = DEFAULT_LIMIT_RATE; + if (policy != null && policy.getConsumeArchiveTps() != null) { + tps = policy.getConsumeArchiveTps(); + } else { + tps = defaultConsumerLimitRate(topic, app); + } + if (tps <= 0) { + tps = Integer.MAX_VALUE; + } + return new LimiterConfig(tps, -1); + } + break; + } + logger.error("unsupported limit type, topic: {}, app: {}, type: {}", topic, app, subscribe.name()); + return null; + } + + @Override + public void cleanRateLimiter(Config config) { + String configKey = config.getKey(); + if (StringUtils.isBlank(configKey)) { + return; + } + + if (StringUtils.equals(configKey, ArchiveConfigKey.ARCHIVE_PRODUCE_RATE.getName())) { + for (Map.Entry> topic : subscribeRateLimiters.entrySet()) { + Iterator> subLimiters = topic.getValue().entrySet().iterator(); + while (subLimiters.hasNext()) { + Map.Entry subLimiter = subLimiters.next(); + String subscribe = subLimiter.getKey(); + if (StringUtils.contains(subscribe, Subscription.Type.PRODUCTION.name() + SPLIT)) { + subLimiters.remove(); + } + } + } + } else if (StringUtils.startsWith(configKey, ArchiveConfigKey.ARCHIVE_PRODUCE_RATE_PREFIX.getName())) { + String[] keys = StringUtils.split(configKey, "\\."); + if (keys != null) { + if (keys.length == 4) { + // topic prefix + String topic = keys[3]; + if (topic != null) { + cleanRateLimiter(topic, null, Subscription.Type.PRODUCTION); + } + } else if (keys.length == 5) { + // topic & app prefix + String topic = keys[3]; + String app = keys[4]; + if (topic != null && app != null) { + cleanRateLimiter(topic, app, Subscription.Type.PRODUCTION); + } + } + } + } + + if (StringUtils.equals(configKey, ArchiveConfigKey.ARCHIVE_CONSUME_RATE.getName())) { + for (Map.Entry> topic : subscribeRateLimiters.entrySet()) { + Iterator> subLimiters = topic.getValue().entrySet().iterator(); + while (subLimiters.hasNext()) { + Map.Entry subLimiter = subLimiters.next(); + String subscribe = subLimiter.getKey(); + if (StringUtils.contains(subscribe, Subscription.Type.CONSUMPTION.name() + SPLIT)) { + subLimiters.remove(); + } + } + } + } else if (StringUtils.startsWith(configKey, ArchiveConfigKey.ARCHIVE_CONSUME_RATE_PREFIX.getName())) { + String[] keys = StringUtils.split(configKey, "\\."); + if (keys != null && keys.length == 5) { + String topic = keys[3]; + String app = keys[4]; + if (topic != null && app != null) { + cleanRateLimiter(topic, app, Subscription.Type.CONSUMPTION); + } + } + } + } +} diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ConsumeArchiveService.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ConsumeArchiveService.java index fd34895ee..44f7bb076 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ConsumeArchiveService.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ConsumeArchiveService.java @@ -18,11 +18,16 @@ import com.google.common.base.Preconditions; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; +import org.joyqueue.broker.BrokerContext; import org.joyqueue.broker.Plugins; import org.joyqueue.broker.cluster.ClusterManager; +import org.joyqueue.broker.limit.RateLimiter; +import org.joyqueue.broker.limit.SubscribeRateLimiter; +import org.joyqueue.domain.Subscription; import org.joyqueue.exception.JoyQueueException; import org.joyqueue.message.MessageLocation; import org.joyqueue.monitor.PointTracer; +import org.joyqueue.monitor.TraceStat; import org.joyqueue.network.session.Connection; import org.joyqueue.server.archive.store.api.ArchiveStore; import org.joyqueue.server.archive.store.model.ConsumeLog; @@ -42,11 +47,7 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.security.GeneralSecurityException; -import java.util.Arrays; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -66,6 +67,7 @@ public class ConsumeArchiveService extends Service { private ClusterManager clusterManager; // 归档日志 private ArchiveConfig archiveConfig; + private BrokerContext brokerContext; // 统计当前读取的字节数,用于异常回滚 private AtomicInteger readByteCounter; @@ -78,11 +80,21 @@ public class ConsumeArchiveService extends Service { private PointTracer tracer; + // 归档限流器 + private SubscribeRateLimiter rateLimiterManager; + public ConsumeArchiveService(ArchiveConfig archiveConfig, ClusterManager clusterManager) { this.clusterManager = clusterManager; this.archiveConfig = archiveConfig; } + public ConsumeArchiveService(ArchiveConfig archiveConfig, BrokerContext brokerContext, SubscribeRateLimiter rateLimiter) { + this.archiveConfig = archiveConfig; + this.brokerContext = brokerContext; + this.clusterManager = brokerContext.getClusterManager(); + this.rateLimiterManager = rateLimiter; + } + @Override protected void validate() throws Exception { super.validate(); @@ -101,18 +113,23 @@ protected void validate() throws Exception { this.readConsumeLogThread = LoopThread.builder() .sleepTime(1, 10) .name("ReadAndPutHBase-ConsumeLog-Thread") + .daemon(true) .onException(e -> { - logger.error(e.getMessage(), e); + logger.error("ReadAndPutHBase-readAndWrite error, happened consume log [{}], error position [{}], error length [{}], exception {}, {}", + repository.rFile.getName(), repository.rMap, readByteCounter.get(), e.getMessage(), e); repository.rollBack(readByteCounter.get()); - logger.info("finish rollback."); + logger.info("Consume-archive: finish rollback consume log [{}], rollback position [{}], rollback length [{}].", + repository.rFile.getName(), repository.rMap, readByteCounter.get()); + readByteCounter.set(0); }) .doWork(this::readAndWrite) .build(); this.cleanConsumeLogFileThread = LoopThread.builder() - .sleepTime(1000 * 10, 1000 * 10) + .sleepTime(1000, 1000 * 10) .name("CleanArchiveFile-ConsumeLog-Thread") - .onException(e -> logger.error(e.getMessage(), e)) + .daemon(true) + .onException(e -> logger.error("CleanArchiveFile-cleanAndRollWriteFile error: {}, {}", e.getMessage(), e)) .doWork(this::cleanAndRollWriteFile) .build(); } @@ -124,6 +141,7 @@ protected void doStart() throws Exception { archiveStore.start(); readConsumeLogThread.start(); cleanConsumeLogFileThread.start(); + logger.info("Consume-archive: service started."); } @@ -134,12 +152,13 @@ protected void doStop() { Close.close(cleanConsumeLogFileThread); Close.close(repository); Close.close(archiveStore); + logger.info("Consume-archive: service stopped."); } /** * 读本地文件写归档存储服务 */ - private void readAndWrite() throws JoyQueueException, InterruptedException { + public void readAndWrite() throws JoyQueueException, InterruptedException { // 读信息,一次读指定条数 int readBatchSize; int batchSize=archiveConfig.getConsumeBatchNum(); @@ -149,12 +168,28 @@ private void readAndWrite() throws JoyQueueException, InterruptedException { if (readBatchSize > 0) { long startTime = SystemClock.now(); - // 调用存储接口写数据 - archiveStore.putConsumeLog(list, tracer); + int count = archiveConfig.getStoreFialedRetryCount(); + do { + try { + // 调用存储接口写数据 + archiveStore.putConsumeLog(list, tracer); + break; + } catch (JoyQueueException e) { + logger.error(String.format( + "Consume-archive: store failed for consume logs, exception size: %s, root cause: %s, cause stack: %s", + list.size(), e.getMessage(), e.getCause()), e); + if (--count == 0) { + throw e; + } + Thread.sleep(new Random().nextInt(count) * 1000); + } + } while (count > 0); long endTime = SystemClock.now(); - logger.debug("Write consumeLogs size:{} to archive store, and elapsed time {}ms", list.size(), endTime - startTime); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Consume-archive: write consumeLogs size: {} to archive store, and elapsed time {}ms", list.size(), endTime - startTime); + } int consumeWriteDelay = archiveConfig.getConsumeWriteDelay(); if (consumeWriteDelay > 0) { @@ -162,10 +197,12 @@ private void readAndWrite() throws JoyQueueException, InterruptedException { } } else { - if (repository.rFile != null && repository.rMap != null) { - logger.debug("read file name {}, read position {}", repository.rFile.getName(), repository.rMap.toString()); - } else { - logger.debug("read file is null."); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, clusterManager.getBrokerId().toString())) { + if (repository.rFile != null && repository.rMap != null) { + logger.info("Consume-archive: read consume log file {}, read position {}", repository.rFile.getName(), repository.rMap.toString()); + } else { + logger.info("Consume-archive: read consume log file is null."); + } } break; } @@ -175,7 +212,7 @@ private void readAndWrite() throws JoyQueueException, InterruptedException { private void cleanAndRollWriteFile() { // 删除已归档文件 repository.delArchivedFile(); - // 5分钟滚动生成一个新的写文件,旧文件可归档 + // 1天滚动生成一个新的写文件,旧文件可归档 repository.tryFinishCurWriteFile(); } @@ -190,6 +227,9 @@ private List readConsumeLog(int count) { readByteCounter.set(0); List list = new LinkedList<>(); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Consume-archive: begin to read consume log batch: {}", count); + } for (int i = 0; i < count; i++) { byte[] bytes = repository.readOne(); // 读到结尾会返回byte[0] @@ -202,6 +242,9 @@ private List readConsumeLog(int count) { break; } } + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Consume-archive: end to read consume log size: {}", list.size()); + } return list; } @@ -227,16 +270,38 @@ public long getRemainConsumeLogFileNum() { public void appendConsumeLog(Connection connection, MessageLocation[] locations) throws JoyQueueException { if (!isStarted()) { // 没有启动消费归档服务,添加消费日志 - logger.debug("ConsumeArchiveService not be started."); + logger.warn("ConsumeArchiveService not be started."); return; } - List logList = convert(connection, locations); - logList.stream().forEach(log -> { - // 序列化 - ByteBuffer buffer = ArchiveSerializer.write(log); - appendLog(buffer); - ArchiveSerializer.release(buffer); - }); + TraceStat stat = tracer.begin("org.joyqueue.server.archive.consume.appendConsumeLog"); + if (locations != null && locations.length > 0) { + if (checkRateLimitAvailable(connection, locations)) { + List logList = convert(connection, locations); + logList.forEach(log -> { + // 序列化 + ByteBuffer buffer = ArchiveSerializer.write(log); + appendLog(buffer); + ArchiveSerializer.release(buffer); + }); + } else { + TraceStat limitBroker = tracer.begin("archive.consume.rate.limited"); + TraceStat limitTopic = tracer.begin(String.format("archive.consume.rate.limited.%s.%s", locations[0].getTopic(), connection.getApp())); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, clusterManager.getBrokerId().toString())) { + logger.warn("Consume-archive: trigger rate limited topic: {}, app: {}", locations[0].getTopic(), connection.getApp()); + } + tracer.end(limitBroker); + tracer.end(limitTopic); + } + } + tracer.end(stat); + } + + private boolean checkRateLimitAvailable(Connection connection, MessageLocation[] locations) { + RateLimiter rateLimiter = rateLimiterManager.getOrCreate(locations[0].getTopic(), connection.getApp(), Subscription.Type.CONSUMPTION); + if (rateLimiter == null || rateLimiter.tryAcquireTps(locations.length)) { + return true; + } + return false; } /** @@ -276,7 +341,7 @@ private byte[] buildMessageId(MessageLocation location) { try { messageIdBytes = Md5.INSTANCE.encrypt(messageId.getBytes(), null); } catch (GeneralSecurityException e) { - logger.error("topic:{}, partition:{}, index:{}, exception:{}", location.getTopic(), location.getPartition(), location.getIndex(), e); + logger.error("Consume-archive: build consume log messageId error, topic:{}, partition:{}, index:{}, exception:{}", location.getTopic(), location.getPartition(), location.getIndex(), e); } return messageIdBytes; } @@ -293,7 +358,7 @@ private synchronized void appendLog(ByteBuffer buffer) { /** * 本地日志日志文件存储 */ - static class ArchiveMappedFileRepository implements Closeable { + class ArchiveMappedFileRepository implements Closeable { // 消费归档文件本地根存储路径 private String baseDir; @@ -309,6 +374,7 @@ static class ArchiveMappedFileRepository implements Closeable { // 读文件 private File rFile; + private File previousCloseReadFile; // 读文件的Mapped private MappedByteBuffer rMap; // 随机读文件 @@ -365,14 +431,14 @@ private void recover() { * @param buffer */ public synchronized void append(ByteBuffer buffer) { - position += buffer.limit(); + //position += buffer.limit(); // 首次创建文件 if (rwMap == null) { newMappedRWFile(); // may notify reader position = 0; append(buffer); - } else if (1 + position >= pageSize) { + } else if ((position + 1 + buffer.limit()) >= pageSize) { // 一个文件结束时(1个字节记录开始记录 + 记录长度) 小于 文件长度 rwMap.put(Byte.MAX_VALUE); rwMap = null; @@ -380,15 +446,15 @@ public synchronized void append(ByteBuffer buffer) { } else { // buffer 为空时直接返回 if (buffer.limit() == 0) { - logger.debug("append buffer limit is zero."); + logger.warn("Consume-archive: append buffer limit is zero."); return; } // 先写一个开始标记 rwMap.put(Byte.MIN_VALUE); - // 记录一个标示位占用长度 - position += 1; // 写入记录内容 rwMap.put(buffer); + // 记录一个标示位占用长度 + position += 1 + buffer.limit(); } } @@ -413,7 +479,7 @@ public void newMappedRWFile() { rwFileChannel = rwRaf.getChannel(); rwMap = rwFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, pageSize); } catch (Exception ex) { - logger.error("create and mapped file error.", ex); + logger.error("Consume-archive: create and mapped file error.", ex); } } @@ -423,7 +489,6 @@ public void newMappedRWFile() { */ private void mappedReadOnlyFile() { try { - closeCurrentReadFile(); rRaf = new RandomAccessFile(rFile, "r"); rFileChannel = rRaf.getChannel(); rMap = rFileChannel.map(FileChannel.MapMode.READ_ONLY, 0, pageSize); @@ -446,21 +511,21 @@ public byte[] readOne() { } } // 检查一条消费日志开始标记 - if (checkStartFlag(rMap)) { + if (checkPositionReadable(rMap)) { int msgLen = rMap.getInt(); byte[] bytes = new byte[msgLen]; rMap.get(bytes); return bytes; } else if (checkFileEndFlag(rMap)) { - logger.debug("Finish reading the file {}.{}", rFile, rMap.toString()); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Consume-archive: finish read consume log file: {}, position: {}", rFile, rMap.toString()); + } - // 换文件 - if (nextFile() != null) { - // 映射新文件 - mappedReadOnlyFile(); - // 继续读取消息 - return readOne(); + try { + closeCurrentReadFile(); + } catch (IOException e) { + logger.error("Consume-archive: close current consume log archive file: {}, error: {}", rFile, e); } } return new byte[0]; @@ -482,6 +547,20 @@ private boolean checkStartFlag(MappedByteBuffer rMap) { return false; } + private boolean checkPositionReadable(MappedByteBuffer rMap) { + if (rwFile == null || !rwFile.exists()) { + return checkStartFlag(rMap); + } + if (rwFile.getName().equals(rFile.getName())) { + if (rMap.position() < position) { + return checkStartFlag(rMap); + } else { + return false; + } + } + return checkStartFlag(rMap); + } + /** * 检查是否到文件结束 * @@ -504,10 +583,16 @@ private boolean checkFileEndFlag(MappedByteBuffer rMap) { * * @param interval */ - private void rollBack(int interval) { + public void rollBack(int interval) { if (rMap != null) { int position = rMap.position(); int newPosition = position - interval; + if (ConsumeArchiveService.this.archiveConfig.getLogDetail( + ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, + ConsumeArchiveService.this.clusterManager.getBrokerId().toString())) { + logger.info("Consume-archive: read consume log rollback, position: [{}], offset: [{}], reset position: [{}]", + position, interval, newPosition); + } rMap.position(newPosition); } } @@ -522,7 +607,7 @@ public void close() { closeCurrentReadFile(); closeCurrentWriteFile(); } catch (IOException e) { - logger.error("delete read file error.", e); + logger.error("Consume-archive: close consume log read&write files error: {}", e); } } @@ -536,6 +621,10 @@ public void closeCurrentReadFile() throws IOException { if (rRaf != null) { rRaf.close(); } + if (rMap != null) { + rMap = null; + } + previousCloseReadFile = rFile; } /** @@ -551,6 +640,9 @@ public void closeCurrentWriteFile() throws IOException { if (rwRaf != null) { rwRaf.close(); } + if (rwMap != null) { + rwMap = null; + } } @@ -562,31 +654,33 @@ public void closeCurrentWriteFile() throws IOException { private File nextFile() { File file = new File(baseDir); String[] list = file.list(); - if (list == null || list.length == 1) { - logger.debug("only one write file."); - // 归档文件目录下的没有文件,或者文件数等于1,则表示没有可归档文件,返回null + if (list == null) { + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Consume-archive: find no consume log files"); + } return null; } - logger.debug("archive file list {}", Arrays.toString(list)); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Consume-archive: find consume log file list: {}", Arrays.toString(list)); + } - final String concurrentFileName = rFile == null ? "" : rFile.getName(); - List sorted = Arrays.asList(list).stream().filter(name -> name.compareTo(concurrentFileName) > 0) + final String previousCloseReadFileName = previousCloseReadFile == null ? "" : previousCloseReadFile.getName(); + List sorted = Arrays.stream(list).filter(name -> name.compareTo(previousCloseReadFileName) > 0) .sorted(Comparator.naturalOrder()).collect(Collectors.toList()); - if (sorted.size() > 1) { - // 未归档文件数大于1,获取第1个未归档文件 + if (sorted.size() > 0) { + // 只要有一个比上一次关闭的文件新(正常情况下新生成的) 或者就只有一个文件(该文件可能因为broker重启后重新打开) String fileName = sorted.get(0); File tempFile = new File(baseDir + fileName); rFile = tempFile; - logger.debug("current read consume event file {}",tempFile); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Consume-archive: find earliest consume log file: {}",tempFile); + } return rFile; - } else { - logger.debug("only one write file."); } return null; - } /** @@ -600,9 +694,12 @@ private File LastFile() { if (list == null) { return null; } - Optional first = Arrays.asList(list).stream().sorted(Comparator.reverseOrder()).findFirst(); - if (first.isPresent()) { - String fileName = first.get(); + Optional last = Arrays.stream(list) + .map(name -> new File(baseDir + name)) + .filter(f -> !f.isDirectory()) + .map(File::getName).max(Comparator.naturalOrder()); + if (last.isPresent()) { + String fileName = last.get(); File tempFile = new File(baseDir + fileName); rwFile = tempFile; return rwFile; @@ -617,9 +714,7 @@ private void delArchivedFile() { // 遍历删除已归档文件 List archivedFileList = getArchivedFileList(); if (CollectionUtils.isNotEmpty(archivedFileList)) { - archivedFileList.stream().forEach(fileName -> { - new File(baseDir + fileName).delete(); - }); + archivedFileList.forEach(fileName -> new File(baseDir + fileName).delete()); } } @@ -629,19 +724,22 @@ private void delArchivedFile() { * @return */ private List getArchivedFileList() { - // 没有调用到readOne方法,不会初始化rFile,防止空指针,加一下判断 - if (rFile == null) { - logger.debug("Can not get archive file list cause by consume archive read file have no init."); - return null; - } - File file = new File(baseDir); String[] list = file.list(); if (list == null) { return null; } + + // 没有调用到readOne方法,不会初始化rFile,防止空指针,加一下判断 + if (rFile == null) { + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_CONSUME_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Consume-archive: there is no archive consume log file list cause current broker is not open archive flag."); + } + return null; + } + // 返回已完成归档的文件集合 - return Arrays.asList(list).stream().filter(name -> name.compareTo(rFile.getName()) < 0).collect(Collectors.toList()); + return Arrays.stream(list).filter(name -> name.compareTo(rFile.getName()) < 0).collect(Collectors.toList()); } /** @@ -695,13 +793,13 @@ public synchronized void tryFinishCurWriteFile() { // 5分钟滚动生成一个新的写文件 String name = rwFile.getName(); long now = SystemClock.now(); - // position > 0 说明有归档记录写入文件,并且5分钟没有写满 - if (position > 0 && now - Long.parseLong(name) >= 1000 * 60 * 1) { + // position > 0 说明有归档记录写入文件,并且1天没有写满 + if (position > 0 && now - Long.parseLong(name) >= archiveConfig.getLogRetainDuration() * 1000 * 60 * 60) { // 直接将当前写文件的位置设置为文件大小,下次一次append的时候会新建一个文件继续写 position = pageSize; append(ByteBuffer.wrap(new byte[0])); - logger.info("reset write file {} position {} to pageSize.", rwFile.getName(), rwMap.toString()); + logger.info("Consume-archive: reset write file {} position {} to pageSize.", rwFile.getName(), rwMap.toString()); } } } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ProduceArchiveService.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ProduceArchiveService.java index b598a1102..7d8dcff94 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ProduceArchiveService.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ProduceArchiveService.java @@ -17,18 +17,23 @@ import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; +import org.joyqueue.broker.BrokerContext; import org.joyqueue.broker.Plugins; import org.joyqueue.broker.buffer.Serializer; import org.joyqueue.broker.cluster.ClusterManager; import org.joyqueue.broker.consumer.Consume; import org.joyqueue.broker.consumer.MessageConvertSupport; import org.joyqueue.broker.consumer.model.PullResult; +import org.joyqueue.broker.limit.RateLimiter; +import org.joyqueue.broker.limit.SubscribeRateLimiter; +import org.joyqueue.domain.Subscription; import org.joyqueue.domain.TopicConfig; import org.joyqueue.domain.TopicName; import org.joyqueue.exception.JoyQueueException; import org.joyqueue.message.BrokerMessage; import org.joyqueue.message.SourceType; import org.joyqueue.monitor.PointTracer; +import org.joyqueue.monitor.TraceStat; import org.joyqueue.network.session.Consumer; import org.joyqueue.server.archive.store.api.ArchiveStore; import org.joyqueue.server.archive.store.model.AchivePosition; @@ -76,6 +81,7 @@ public class ProduceArchiveService extends Service { private int batchNum = 1000; // 集群管理 private ClusterManager clusterManager; + private BrokerContext brokerContext; // 消费管理 private Consume consume; // 发送归档任务池 @@ -108,6 +114,9 @@ public class ProduceArchiveService extends Service { private ArchiveConfig archiveConfig; private MessageConvertSupport messageConvertSupport; + // 归档限流器 + private SubscribeRateLimiter rateLimiterManager; + public ProduceArchiveService(ArchiveConfig archiveConfig, ClusterManager clusterManager, Consume consume, MessageConvertSupport messageConvertSupport) { this.clusterManager = clusterManager; this.consume = consume; @@ -115,6 +124,15 @@ public ProduceArchiveService(ArchiveConfig archiveConfig, ClusterManager cluster this.messageConvertSupport = messageConvertSupport; } + public ProduceArchiveService(ArchiveConfig archiveConfig, BrokerContext brokerContext, SubscribeRateLimiter rateLimiter) { + this.archiveConfig = archiveConfig; + this.brokerContext = brokerContext; + this.clusterManager = brokerContext.getClusterManager(); + this.consume = brokerContext.getConsume(); + this.messageConvertSupport = brokerContext.getMessageConvertSupport(); + this.rateLimiterManager = rateLimiter; + } + @Override protected void validate() throws Exception { super.validate(); @@ -134,7 +152,8 @@ protected void validate() throws Exception { this.updateItemThread = LoopThread.builder() .sleepTime(1000 * 10, 1000 * 10) .name("UpdateArchiveItem-Thread") - .onException(e -> logger.warn("Exception:", e)) + .daemon(true) + .onException(e -> logger.error("UpdateArchiveItem-Thread error: {}", e)) .doWork(() -> { // 更新item列表 updateArchiveItem(); @@ -145,7 +164,8 @@ protected void validate() throws Exception { this.readMsgThread = LoopThread.builder() .sleepTime(0, 10) .name("ReadArchiveMsg-Thread") - .onException(e -> logger.warn("Exception:", e)) + .daemon(true) + .onException(e -> logger.error("ReadArchiveMsg-Thread error: {}", e)) .doWork(() -> { // 消费接口读取消息,放入队列 readArchiveMsg(); @@ -154,13 +174,12 @@ protected void validate() throws Exception { this.writeMsgThread = LoopThread.builder() .sleepTime(10, 10) .name("WriteArchiveMsg-Thread") - .onException(e -> logger.warn("Exception:", e)) + .daemon(true) + .onException(e -> logger.error("WriteArchiveMsg-Thread error: {}", e)) .doWork(() -> { // 队列读取消息,放入归档存储 write2Store(); }).build(); - - } @Override @@ -172,7 +191,7 @@ protected void doStart() throws Exception { updateItemThread.start(); readMsgThread.start(); writeMsgThread.start(); - logger.info("produce archive archiveService started."); + logger.info("Produce-archive: service started."); } @Override @@ -183,7 +202,7 @@ protected void doStop() { Close.close(writeMsgThread); Close.close(executorService); Close.close(archiveStore); - logger.info("produce archive archiveService stopped."); + logger.info("Produce-archive: service stopped."); } /** @@ -192,13 +211,15 @@ protected void doStop() { private void updateArchiveItem() throws JoyQueueException { List list = new ArrayList<>(); List topics = clusterManager.getTopics(); - topics.stream().forEach(topicConfig -> { + topics.forEach(topicConfig -> { TopicName name = topicConfig.getName(); // 检查是否开启发送归档 if (clusterManager.checkArchiveable(name)) { - logger.info("Topic:{} send archive is enable.", name.getFullName()); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_PRODUCE_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Produce-archive: topic [{}] archive is enable.", name.getFullName()); + } List partitionSet = clusterManager.getLocalPartitions(topicConfig); - partitionSet.stream().forEach(partition -> { + partitionSet.forEach(partition -> { list.add(new SendArchiveItem(name.getFullName(), partition)); }); } @@ -207,7 +228,7 @@ private void updateArchiveItem() throws JoyQueueException { itemList.addAndUpdate(list); long count=updateArchiveMetadataCounter.getAndIncrement(); if(count% ARCHIVE_METADATA_MOD ==0){ - logger.info("Add or Update archive item ping,size {}",list.size()); + logger.info("Produce-archive: add or update archive item,size {}",list.size()); } } @@ -221,15 +242,23 @@ private void readArchiveMsg() throws Exception { if (isPause(item.getTopic(), item.getPartition())) { continue; } + if (!checkRateLimitAvailable(item.topic)) { + TraceStat limitBroker = tracer.begin("archive.produce.rate.limited"); + TraceStat limitTopic = tracer.begin(String.format("archive.produce.rate.limited.%s", item.topic)); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_PRODUCE_PREFIX, clusterManager.getBrokerId().toString())) { + logger.warn("Produce-archive: trigger rate limited topic: {}", item); + } + tracer.end(limitBroker); + tracer.end(limitTopic); + continue; + } PullResult pullResult; long readIndex = item.getReadIndex(); try { pullResult = consume.getMessage(item.topic, item.partition, readIndex, batchNum); } catch (Throwable th) { - if (logger.isDebugEnabled()) { - logger.debug("read message from topic:" + item.topic + " partition:" + item.partition - + " index:" + item.getReadIndex() + " error.", th); - } + logger.error("Produce-archive: read journal message from topic: [{}] partition: [{}] index: [{}] error: {}, {}", + item.topic, item.partition, item.getReadIndex(), th.getMessage(), th.getCause()); if (th.getCause() instanceof PositionUnderflowException) { // 如果读取位置小于存储索引的最小位置,将位置重置为可读到的最小位置 @@ -237,8 +266,9 @@ private void readArchiveMsg() throws Exception { long minIndex = consume.getMinIndex(new Consumer(item.topic, ""), item.partition); item.setReadIndex(minIndex); - logger.debug("repair read message position SendArchiveItem info:[{}], currentIndex:[{}]", item, minIndex); + logger.error("Produce-archive: repair read journal message position SendArchiveItem info:[{}], currentIndex-min:[{}], exception: {}", item, minIndex, th.getCause()); + archiveStore.putPosition(new AchivePosition(item.topic, item.partition, minIndex)); } if (th.getCause() instanceof PositionOverflowException) { @@ -247,8 +277,9 @@ private void readArchiveMsg() throws Exception { long maxIndex = consume.getMaxIndex(new Consumer(item.topic, ""), item.partition); item.setReadIndex(maxIndex); - logger.debug("repair read message position SendArchiveItem info:[{}], currentIndex:[{}]", item, maxIndex); + logger.error("Produce-archive: repair read journal message position SendArchiveItem info:[{}], currentIndex-max:[{}], exception: {}", item, maxIndex, th.getCause()); + archiveStore.putPosition(new AchivePosition(item.topic, item.partition, maxIndex)); } // 报错暂停一会 @@ -269,8 +300,8 @@ private void readArchiveMsg() throws Exception { item.setReadIndex(readIndex + size, readIndex); // 计数 counter += messageSize; - if (logger.isDebugEnabled()) { - logger.debug("produce archive: {} messages put into the archive queue.", size); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_PRODUCE_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Produce-archive: {} messages put into the archive queue.", size); } } } @@ -279,6 +310,14 @@ private void readArchiveMsg() throws Exception { } } + private boolean checkRateLimitAvailable(String topic) { + RateLimiter rateLimiter = rateLimiterManager.getOrCreate(topic, Subscription.Type.PRODUCTION); + if (rateLimiter == null || rateLimiter.tryAcquireTps(batchNum)) { + return true; + } + return false; + } + /** * @param topic * @param partition @@ -373,6 +412,7 @@ private SendLog convert(BrokerMessage brokerMessage, ByteBuffer buffer) { private void write2Store() throws InterruptedException { int readBatchSize; do { + batchNum = archiveConfig.getProduceBatchNum(); List sendLogs = new ArrayList<>(batchNum); for (int i = 0; i < batchNum; i++) { SendLog sendLog = archiveQueue.poll(); @@ -388,10 +428,13 @@ private void write2Store() throws InterruptedException { try { // 写入存储 archiveStore.putSendLog(sendLogs, tracer); - logger.debug("Write sendLogs size:{} to archive store.", sendLogs.size()); + if (archiveConfig.getLogDetail(ArchiveConfig.LOG_DETAIL_PRODUCE_PREFIX, clusterManager.getBrokerId().toString())) { + logger.info("Produce-archive: write sendLogs size:{} to archive store. sample log: {}", sendLogs.size(), sendLogs.get(0)); + } // 写入计数(用于归档位置) writeCounter(sendLogs); } catch (JoyQueueException e) { + logger.error("Produce-archive: write sendLogs error: {}", e); // 写入存储失败 hasStoreError.set(true); // 回滚读取位置 @@ -645,8 +688,11 @@ public List getAll() { public void remove(SendArchiveItem item) throws JoyQueueException { // 移除列表 cpList.remove(item); - // clean archive position from store - archiveStore.cleanPosition(item.getTopic(),item.getPartition()); + // only archive disable will clean position store, not include raft node + if (!clusterManager.checkArchiveable(TopicName.parse(item.getTopic()))) { + logger.info("Produce-archive: topic [{}] archive is disable on clean.", item); + archiveStore.cleanPosition(item.getTopic(),item.getPartition()); + } } /** @@ -663,9 +709,9 @@ public void addAndUpdate(List newItemList) throws JoyQueueExcep if (!newItemList.contains(item)) { try { remove(item); - logger.info("Clean up archive item,topic {},partition {}",item.getTopic(),item.getPartition()); + logger.info("Produce-archive: clean up archive item, topic [{}], partition [{}]",item.getTopic(),item.getPartition()); }catch (JoyQueueException e){ - logger.info("remove archive item exception",e); + logger.error("Produce-archive: clean up archive item error: {}", e); } } }); @@ -674,15 +720,19 @@ public void addAndUpdate(List newItemList) throws JoyQueueExcep // 列表中不包含则添加 if (!cpList.contains(item)) { Long index = archiveStore.getPosition(item.topic, item.partition); + Consumer consumer=new Consumer(); + consumer.setTopic(item.getTopic()); if (index == null) { - // 从当前的 max index 开始归档 - Consumer consumer=new Consumer(); - // fullName - consumer.setTopic(item.getTopic()); index = consume.getMaxIndex(consumer,item.getPartition()); - logger.info("New archive item,topic {},partition {},init from local store max index {}",item.getTopic(),item.getPartition(),item.getReadIndex()); + archiveStore.putPosition(new AchivePosition(item.getTopic(), item.getPartition(), index)); + logger.info("Produce-archive: new archive item, topic [{}], partition [{}], init from local store max index {}",item.getTopic(),item.getPartition(),item.getReadIndex()); }else{ - logger.info("New archive item,topic {},partition {},recover from archive store,index {}",item.getTopic(),item.getPartition(),index); + Long minIndex = consume.getMinIndex(consumer, item.getPartition()); + if (index < minIndex) { + index = minIndex; + archiveStore.putPosition(new AchivePosition(item.getTopic(), item.getPartition(), index)); + } + logger.info("Produce-archive: new archive item, topic [{}], partition [{}], recover from archive position store index {}",item.getTopic(),item.getPartition(),index); } item.setReadIndex(index); cpList.add(item); diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/PartitionConsumption.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/PartitionConsumption.java index 4b6dd5c6d..ba33c1d13 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/PartitionConsumption.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/consumer/PartitionConsumption.java @@ -213,7 +213,7 @@ protected PullResult getMsgByPartitionAndIndex(Consumer consumer, int group, sho PullResult pullResult = new PullResult(consumer, (short) -1, new ArrayList<>(0)); try { PullResult readResult = getMsgByPartitionAndIndex(consumer.getTopic(), group, partition, index, count); - if (readResult.getBuffers() == null) { + if (readResult.getBuffers() == null || readResult.getBuffers().size() == 0) { // 没有拉到消息直接返回 return pullResult; } @@ -249,9 +249,17 @@ protected PullResult getMsgByPartitionAndIndex(Consumer consumer, int group, sho protected PullResult getMsgByPartitionAndIndex(String topic, int group, short partition, long index, int count) throws JoyQueueException, IOException { long startTime = System.nanoTime(); - PullResult result = new PullResult(topic, null, partition, null); + PullResult result = new PullResult(topic, null, partition, new ArrayList<>(0)); PartitionGroupStore store = storeService.getStore(topic, group); + if (index == store.getRightIndex(partition)) { + return result; + } + + /*if (index < store.getLeftIndex(partition) || index >= store.getRightIndex(partition)) { + return result; + }*/ + ReadResult readRst = store.read(partition, index, count, Long.MAX_VALUE); if (readRst.getCode() == JoyQueueCode.SUCCESS) { diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/limit/SubscribeRateLimiter.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/limit/SubscribeRateLimiter.java new file mode 100644 index 000000000..61789dcec --- /dev/null +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/limit/SubscribeRateLimiter.java @@ -0,0 +1,20 @@ +package org.joyqueue.broker.limit; + +import org.joyqueue.domain.Subscription; +import org.joyqueue.event.MetaEvent; +import org.joyqueue.toolkit.concurrent.EventListener; + +/** + * Subscribe limiter interface + * + **/ +public interface SubscribeRateLimiter extends EventListener { + + /** + * Get or create a rate limiter according to subscribe type + * @return null indicate no limit + **/ + RateLimiter getOrCreate(String topic, String app, Subscription.Type subscribe); + + RateLimiter getOrCreate(String topic, Subscription.Type subscribe); +} diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/limit/config/LimiterConfig.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/limit/config/LimiterConfig.java index 005c9e3dd..23130f26e 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/limit/config/LimiterConfig.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/limit/config/LimiterConfig.java @@ -50,4 +50,12 @@ public int getTraffic() { public void setTraffic(int traffic) { this.traffic = traffic; } + + @Override + public String toString() { + return "LimiterConfig{" + + "tps=" + tps + + ", traffic=" + traffic + + '}'; + } } \ No newline at end of file diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/limit/support/AbstractSubscribeRateLimiterManager.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/limit/support/AbstractSubscribeRateLimiterManager.java new file mode 100644 index 000000000..f7749f265 --- /dev/null +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/limit/support/AbstractSubscribeRateLimiterManager.java @@ -0,0 +1,138 @@ +package org.joyqueue.broker.limit.support; + +import com.google.common.collect.Maps; +import org.joyqueue.broker.BrokerContext; +import org.joyqueue.broker.cluster.ClusterManager; +import org.joyqueue.broker.limit.RateLimiter; +import org.joyqueue.broker.limit.SubscribeRateLimiter; +import org.joyqueue.broker.limit.config.LimiterConfig; +import org.joyqueue.domain.Config; +import org.joyqueue.domain.Subscription; +import org.joyqueue.event.MetaEvent; +import org.joyqueue.nsr.event.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * @author majun8 + */ +public abstract class AbstractSubscribeRateLimiterManager implements SubscribeRateLimiter { + protected static final Logger logger = LoggerFactory.getLogger(AbstractSubscribeRateLimiterManager.class); + + protected static final String SPLIT = "."; + + protected ClusterManager clusterManager; + protected ConcurrentMap> subscribeRateLimiters = Maps.newConcurrentMap(); + + public AbstractSubscribeRateLimiterManager(BrokerContext context) { + this.clusterManager = context.getClusterManager(); + this.clusterManager.addListener(this); + } + + public RateLimiter getOrCreate(String topic, String app, Subscription.Type subscribe) { + ConcurrentMap topicRateLimiters = subscribeRateLimiters.get(topic); + if (topicRateLimiters == null) { + topicRateLimiters = new ConcurrentHashMap<>(); + ConcurrentMap old = subscribeRateLimiters.putIfAbsent(topic, topicRateLimiters); + if (old != null) { + topicRateLimiters = old; + } + } + String subscribeName = app == null ? subscribe.name() + SPLIT : subscribe.name() + SPLIT + app; + RateLimiter subscribeRateLimiter = topicRateLimiters.get(subscribeName); + if (subscribeRateLimiter == null) { + LimiterConfig config = getLimiterConfig(topic, app, subscribe); + subscribeRateLimiter = new DefaultRateLimiter(config.getTps()); + RateLimiter oldRateLimiter = topicRateLimiters.putIfAbsent(subscribeName, subscribeRateLimiter); + if (oldRateLimiter != null) { + subscribeRateLimiter = oldRateLimiter; + } else { + logger.info("Archive rate limiter for {},{},{},{}", topic, app, subscribe.name(), config); + } + } + return subscribeRateLimiter; + } + + public RateLimiter getOrCreate(String topic, Subscription.Type subscribe) { + return getOrCreate(topic, null, subscribe); + } + + public abstract LimiterConfig getLimiterConfig(String topic, String app, Subscription.Type subscribe); + + @Override + public void onEvent(MetaEvent event) { + switch (event.getEventType()) { + case ADD_CONFIG: { + AddConfigEvent addConfigEvent = (AddConfigEvent) event; + Config config = addConfigEvent.getConfig(); + cleanRateLimiter(config); + break; + } + case UPDATE_CONFIG: { + UpdateConfigEvent updateConfigEvent = (UpdateConfigEvent) event; + Config config = updateConfigEvent.getNewConfig(); + cleanRateLimiter(config); + break; + } + case REMOVE_CONFIG: { + RemoveConfigEvent removeConfigEvent = (RemoveConfigEvent) event; + Config config = removeConfigEvent.getConfig(); + cleanRateLimiter(config); + break; + } + case ADD_TOPIC: + AddTopicEvent addTopicEvent = (AddTopicEvent) event; + cleanRateLimiter(addTopicEvent.getTopic().getName().getFullName(), null, null); + break; + case UPDATE_TOPIC: + UpdateTopicEvent updateTopicEvent = (UpdateTopicEvent) event; + cleanRateLimiter(updateTopicEvent.getNewTopic().getName().getFullName(), null, null); + break; + case REMOVE_TOPIC: + RemoveTopicEvent removeTopicEvent = (RemoveTopicEvent) event; + cleanRateLimiter(removeTopicEvent.getTopic().getName().getFullName(), null, null); + break; + case UPDATE_PRODUCER: + UpdateProducerEvent updateProducerEvent = (UpdateProducerEvent) event; + cleanRateLimiter(updateProducerEvent.getTopic().getFullName(), + updateProducerEvent.getNewProducer().getApp(), + Subscription.Type.PRODUCTION); + break; + case REMOVE_PRODUCER: + RemoveProducerEvent removeProducerEvent = (RemoveProducerEvent) event; + cleanRateLimiter(removeProducerEvent.getTopic().getFullName(), + removeProducerEvent.getProducer().getApp(), + Subscription.Type.PRODUCTION); + break; + case UPDATE_CONSUMER: + UpdateConsumerEvent updateConsumerEvent = (UpdateConsumerEvent) event; + cleanRateLimiter(updateConsumerEvent.getTopic().getFullName(), + updateConsumerEvent.getNewConsumer().getApp(), + Subscription.Type.CONSUMPTION); + break; + case REMOVE_CONSUMER: + RemoveConsumerEvent removeConsumerEvent = (RemoveConsumerEvent) event; + cleanRateLimiter(removeConsumerEvent.getTopic().getFullName(), + removeConsumerEvent.getConsumer().getApp(), + Subscription.Type.CONSUMPTION); + break; + } + } + + public abstract void cleanRateLimiter(Config config); + + public void cleanRateLimiter(String topic, String app, Subscription.Type subscribe) { + if (app == null) { + subscribeRateLimiters.remove(topic); + } else { + Map rateLimiters = subscribeRateLimiters.get(topic); + if (rateLimiters != null) { + rateLimiters.remove(subscribe.name() + SPLIT + app); + } + } + } +} diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultArchiveMonitorService.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultArchiveMonitorService.java index e2b7eb509..497f3a940 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultArchiveMonitorService.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/monitor/service/support/DefaultArchiveMonitorService.java @@ -53,7 +53,7 @@ public ArchiveMonitorInfo getArchiveMonitorInfo() { long sendBackLogNum = getSendBackLogNum(); ArchiveMonitorInfo info = new ArchiveMonitorInfo(); info.setConsumeBacklog(consumeBacklogNum); - info.setConsumeBacklog(sendBackLogNum); + info.setProduceBacklog(sendBackLogNum); info.setTopicProduceBacklog(archiveManager.getSendBacklogNumByTopic()); return info; diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/BrokerRetryManager.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/BrokerRetryManager.java index bdcbfa353..01bb4efb7 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/BrokerRetryManager.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/BrokerRetryManager.java @@ -24,11 +24,13 @@ import org.joyqueue.broker.Plugins; import org.joyqueue.broker.cluster.ClusterManager; import org.joyqueue.broker.limit.RateLimiter; +import org.joyqueue.broker.limit.SubscribeRateLimiter; import org.joyqueue.broker.monitor.BrokerMonitor; import org.joyqueue.broker.network.support.BrokerTransportClientFactory; import org.joyqueue.config.BrokerConfigKey; import org.joyqueue.domain.Broker; import org.joyqueue.domain.Consumer; +import org.joyqueue.domain.Subscription; import org.joyqueue.domain.TopicName; import org.joyqueue.event.EventType; import org.joyqueue.event.MetaEvent; @@ -85,7 +87,7 @@ public class BrokerRetryManager extends Service implements MessageRetry, B // 集群管理 private ClusterManager clusterManager; private PropertySupplier propertySupplier; - private RetryRateLimiter rateLimiterManager; + private SubscribeRateLimiter rateLimiterManager; private BrokerMonitor brokerMonitor; private PointTracer tracer; @@ -219,6 +221,9 @@ public void addRetry(List retryMessageModelList) throws JoyQu throw e; } }else{ + TraceStat limit = tracer.begin("BrokerRetryManager.rate.limited"); + logger.warn("Broker retry message limited, limit consumers: {}", consumers); + tracer.end(limit); throw new JoyQueueException(JoyQueueCode.RETRY_TOKEN_LIMIT); } } @@ -231,7 +236,7 @@ public void addRetry(List retryMessageModelList) throws JoyQu **/ public boolean retryTokenAvailable(Set consumers){ for(Joint consumer:consumers) { - RateLimiter rateLimiter= rateLimiterManager.getOrCreate(consumer.getTopic(),consumer.getApp()); + RateLimiter rateLimiter= rateLimiterManager.getOrCreate(consumer.getTopic(),consumer.getApp(), Subscription.Type.CONSUMPTION); if(rateLimiter==null||rateLimiter.tryAcquireTps()){ return true; } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/BrokerRetryRateLimiterManager.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/BrokerRetryRateLimiterManager.java index db220cca7..9a1825c13 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/BrokerRetryRateLimiterManager.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/BrokerRetryRateLimiterManager.java @@ -1,152 +1,76 @@ package org.joyqueue.broker.retry; -import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.joyqueue.broker.BrokerContext; -import org.joyqueue.broker.cluster.ClusterManager; import org.joyqueue.broker.consumer.ConsumeConfig; import org.joyqueue.broker.consumer.ConsumeConfigKey; -import org.joyqueue.broker.limit.RateLimiter; -import org.joyqueue.broker.limit.support.DefaultRateLimiter; +import org.joyqueue.broker.limit.config.LimiterConfig; +import org.joyqueue.broker.limit.support.AbstractSubscribeRateLimiterManager; import org.joyqueue.domain.Config; -import org.joyqueue.event.MetaEvent; -import org.joyqueue.nsr.event.RemoveConfigEvent; -import org.joyqueue.nsr.event.RemoveConsumerEvent; -import org.joyqueue.nsr.event.RemoveTopicEvent; -import org.joyqueue.nsr.event.UpdateConfigEvent; +import org.joyqueue.domain.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - /** * Consumer retry rate limiter manager * **/ -public class BrokerRetryRateLimiterManager implements RetryRateLimiter{ +public class BrokerRetryRateLimiterManager extends AbstractSubscribeRateLimiterManager { protected static final Logger LOG = LoggerFactory.getLogger(BrokerRetryRateLimiterManager.class); - private ClusterManager clusterManager; + private ConsumeConfig consumeConfig; - private ConcurrentMap> retryRateLimiters = Maps.newConcurrentMap(); public BrokerRetryRateLimiterManager(BrokerContext context){ - this.clusterManager = context.getClusterManager(); - this.clusterManager.addListener(this); + super(context); this.consumeConfig=new ConsumeConfig(context != null ? context.getPropertySupplier() : null); } + @Override - public RateLimiter getOrCreate(String topic, String app) { - ConcurrentMap topicRateLimiters=retryRateLimiters.get(topic); - if(topicRateLimiters==null){ - topicRateLimiters=new ConcurrentHashMap(); - ConcurrentMap old=retryRateLimiters.putIfAbsent(topic,topicRateLimiters); - if(old!=null){ - topicRateLimiters=old; + public LimiterConfig getLimiterConfig(String topic, String app, Subscription.Type subscribe) { + if (subscribe == Subscription.Type.CONSUMPTION) { + int tps = defaultConsumerLimitRate(topic, app); + if (tps <= 0) { + tps = Integer.MAX_VALUE; } + return new LimiterConfig(tps, -1); } - RateLimiter consumerRetryRateLimiter=topicRateLimiters.get(app); - if(consumerRetryRateLimiter==null){ - int tps=consumerRetryRate(topic,app); - if(tps>0) { // ulimit - consumerRetryRateLimiter = new DefaultRateLimiter(tps); - RateLimiter oldRateLimiter = topicRateLimiters.putIfAbsent(app, consumerRetryRateLimiter); - if (oldRateLimiter != null) { - consumerRetryRateLimiter = oldRateLimiter; - }else{ - LOG.info("New rate limiter for {},{},{}",topic,app,tps); - } - } - } - return consumerRetryRateLimiter; + return null; } + public int defaultProducerLimitRate(String topic, String app) { + return 0; + } - /** - * Mix broker level consume retry rate(default ulimit) with consumer level config - * priority: - * 1. broker consumer level - * 2. broker level - * 3. consumer config level(not support) - * @return -1 indicate ulimit if above all not configure - * - **/ - public int consumerRetryRate(String topic,String app){ - int retryRate=consumeConfig.getRetryRate(topic,app); - if(retryRate<=0){ + public int defaultConsumerLimitRate(String topic, String app) { + int retryRate = consumeConfig.getRetryRate(topic, app); + if (retryRate <= 0) { // get broker level retry rate - retryRate=consumeConfig.getRetryRate(); + retryRate = consumeConfig.getRetryRate(); } return retryRate; } - - /** - * Clean up current rate limiter if consumer metadata has any changes - **/ - @Override - public void onEvent(MetaEvent event) { - switch (event.getEventType()) { - case UPDATE_CONFIG: { - UpdateConfigEvent updateConfigEvent = (UpdateConfigEvent) event; - Config config = updateConfigEvent.getNewConfig(); - cleanRateLimiter(config); - break; - } - case REMOVE_CONFIG: { - RemoveConfigEvent removeConfigEvent = (RemoveConfigEvent) event; - Config config=removeConfigEvent.getConfig(); - cleanRateLimiter(config); - break; - } - case REMOVE_TOPIC: - RemoveTopicEvent topicEvent = (RemoveTopicEvent) event; - cleanRateLimiter(topicEvent.getTopic().getName().getFullName(),null); - break; - case REMOVE_CONSUMER: - RemoveConsumerEvent removeConsumerEvent = (RemoveConsumerEvent) event; - cleanRateLimiter(removeConsumerEvent.getTopic().getFullName(), removeConsumerEvent.getConsumer().getApp()); - break; - } - } - /** * @param config consumer config * **/ - public void cleanRateLimiter(Config config){ - String configKey=config.getKey(); + public void cleanRateLimiter(Config config) { + String configKey = config.getKey(); if (StringUtils.isBlank(configKey)) { return; } - if (configKey.equals(ConsumeConfigKey.RETRY_RATE.getName())) { - retryRateLimiters.clear(); - } else if (configKey.startsWith(ConsumeConfigKey.RETRY_RATE_PREFIX.getName())) { - String[] keys=configKey.split("\\."); - if(keys.length == 4){ - String topic=keys[2]; - String app=keys[3]; - if(topic!=null&&app!=null) { - cleanRateLimiter(topic, app); + if (StringUtils.equals(configKey, ConsumeConfigKey.RETRY_RATE.getName())) { + subscribeRateLimiters.clear(); + } else if (StringUtils.startsWith(configKey, ConsumeConfigKey.RETRY_RATE_PREFIX.getName())) { + String[] keys = StringUtils.split(configKey, "\\."); + if (keys.length == 4) { + String topic = keys[2]; + String app = keys[3]; + if (topic != null && app != null) { + cleanRateLimiter(topic, app, Subscription.Type.CONSUMPTION); } } } } - - /** - * Clean rate limiter of consumer - **/ - public void cleanRateLimiter(String topic,String app) { - if (app == null) { - retryRateLimiters.remove(topic); - }else { - Map rateLimiters = retryRateLimiters.get(topic); - if (rateLimiters != null) { - rateLimiters.remove(app); - } - - } - } } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/RetryRateLimiter.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/RetryRateLimiter.java deleted file mode 100644 index 8b96e144e..000000000 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/retry/RetryRateLimiter.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.joyqueue.broker.retry; - -import org.joyqueue.broker.limit.RateLimiter; -import org.joyqueue.event.MetaEvent; -import org.joyqueue.toolkit.concurrent.EventListener; - -/** - * Retry rate limiter interface - * - **/ -public interface RetryRateLimiter extends EventListener { - - /** - * Get or create a rate limiter - * @return null indicate no limit - **/ - RateLimiter getOrCreate(String topic, String app); -} diff --git a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/ArchiveSerializerTest.java b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/ArchiveSerializerTest.java index 3cfb93d18..1658712d5 100644 --- a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/ArchiveSerializerTest.java +++ b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/ArchiveSerializerTest.java @@ -15,10 +15,10 @@ */ package org.joyqueue.broker.archive; -import org.joyqueue.broker.archive.ArchiveSerializer; import org.joyqueue.server.archive.store.model.ConsumeLog; import org.joyqueue.toolkit.time.SystemClock; import org.apache.commons.lang.builder.ToStringBuilder; +import org.junit.Assert; import org.junit.Test; import java.nio.ByteBuffer; @@ -62,4 +62,22 @@ public void read() { System.out.println(ToStringBuilder.reflectionToString(log)); } + + @Test + public void reverseStringTest() { + String test = "abc"; + Assert.assertEquals("cba", + org.joyqueue.server.archive.store.utils.ArchiveSerializer.reverse(test)); + } + + @Test + public void reverseBytesTest() { + String test = "abc"; + byte[] bytes = test.getBytes(); + Assert.assertEquals(new String(bytes), + new String(org.joyqueue.server.archive.store.utils.ArchiveSerializer.reverse( + org.joyqueue.server.archive.store.utils.ArchiveSerializer.reverse(bytes) + )) + ); + } } \ No newline at end of file diff --git a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/ConsumeArchiveServiceTest.java b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/ConsumeArchiveServiceTest.java index d869dc5e8..a364b1aee 100644 --- a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/ConsumeArchiveServiceTest.java +++ b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/ConsumeArchiveServiceTest.java @@ -15,29 +15,64 @@ */ package org.joyqueue.broker.archive; -import org.joyqueue.broker.archive.ArchiveSerializer; -import org.joyqueue.broker.archive.ConsumeArchiveService; +import org.joyqueue.broker.BrokerContext; +import org.joyqueue.broker.archive.store.hbase.MockArchiveStore; +import org.joyqueue.broker.cluster.ClusterManager; +import org.joyqueue.broker.config.Configuration; +import org.joyqueue.domain.Broker; +import org.joyqueue.exception.JoyQueueException; +import org.joyqueue.server.archive.store.api.ArchiveStore; import org.joyqueue.server.archive.store.model.ConsumeLog; +import org.joyqueue.toolkit.concurrent.LoopThread; +import org.joyqueue.toolkit.config.Property; +import org.joyqueue.toolkit.config.PropertySupplier; import org.joyqueue.toolkit.time.SystemClock; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.reflect.Field; import java.nio.ByteBuffer; -import java.util.Arrays; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by chengzhiliang on 2018/12/19. */ public class ConsumeArchiveServiceTest { - final int writeRecordNum = 1000000; + final int writeRecordNum = 900000; + private AtomicInteger readByteCounter = new AtomicInteger(0); + private ClusterManager clusterManager; + private ArchiveConfig archiveConfig; + private ConsumeArchiveService consumeService; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + PropertySupplier propertySupplier = new Configuration(); + ((Configuration) propertySupplier).addProperty(Property.APPLICATION_DATA_PATH, "/Users/majun8"); + BrokerContext brokerContext = new BrokerContext().propertySupplier(propertySupplier); + clusterManager = new ClusterManager(null, null, null, brokerContext); + Broker broker = new Broker(); + broker.setId(123); + Field brokerField = clusterManager.getClass().getDeclaredField("broker"); + brokerField.setAccessible(true); + brokerField.set(clusterManager, broker); + + archiveConfig = new ArchiveConfig(propertySupplier); + consumeService = new ConsumeArchiveService(archiveConfig, clusterManager); + } @Test public void writeConsumeLog() throws InterruptedException { String testPath = getTestPath(); delTestFolder(testPath); - ConsumeArchiveService.ArchiveMappedFileRepository archiveMappedFileRepository = new ConsumeArchiveService.ArchiveMappedFileRepository(testPath); + ConsumeArchiveService.ArchiveMappedFileRepository archiveMappedFileRepository = consumeService.new ArchiveMappedFileRepository(testPath); String[] appArr = {"test", "app_test", "app_app_test"}; for (int i = 0; i < writeRecordNum; i++) { ConsumeLog consumeLog = new ConsumeLog(); @@ -59,11 +94,32 @@ public void writeConsumeLog() throws InterruptedException { Thread.sleep(100); } + //@Test + public void multiWriteConsumeLogTest() { + String testPath = getTestPath(); + ConsumeArchiveService.ArchiveMappedFileRepository archiveMappedFileRepository = consumeService.new ArchiveMappedFileRepository(testPath); + String[] appArr = {"test", "app_test", "app_app_test"}; + for (int i = 0; i < 10; i++) { + new Thread(() -> { + for (int i1 = 0; i1 < writeRecordNum; i1++) { + ConsumeLog consumeLog = new ConsumeLog(); + consumeLog.setBrokerId(i1); + consumeLog.setBytesMessageId(new byte[16]); + consumeLog.setClientIp(new byte[16]); + consumeLog.setConsumeTime(System.currentTimeMillis()); + consumeLog.setApp(appArr[i1 % 3]); + ByteBuffer buffer = ArchiveSerializer.write(consumeLog); + archiveMappedFileRepository.append(buffer); + } + }, "write-thread-" + i).start(); + } + } + @Test public void writeConsumeLog2() throws InterruptedException { String testPath = getTestPath(); delTestFolder(testPath); - ConsumeArchiveService.ArchiveMappedFileRepository archiveMappedFileRepository = new ConsumeArchiveService.ArchiveMappedFileRepository(testPath); + ConsumeArchiveService.ArchiveMappedFileRepository archiveMappedFileRepository = consumeService.new ArchiveMappedFileRepository(testPath); String[] appArr = {"test", "app_test", "app_app_test"}; for (int i = 0; i < 100; i++) { ConsumeLog consumeLog = new ConsumeLog(); @@ -105,17 +161,26 @@ public void writeConsumeLog2() throws InterruptedException { } } - @Test + //@Test public void readConsumeLog() throws InterruptedException { writeConsumeLog(); String testPath = getTestPath(); - ConsumeArchiveService.ArchiveMappedFileRepository archiveMappedFileRepository = new ConsumeArchiveService.ArchiveMappedFileRepository(testPath); + ConsumeArchiveService.ArchiveMappedFileRepository archiveMappedFileRepository = consumeService.new ArchiveMappedFileRepository(testPath); + int brokerid = 0; for (int i = 0; i < writeRecordNum; i++) { byte[] bytes = archiveMappedFileRepository.readOne(); if (bytes.length > 0) { + brokerid = i; ConsumeLog read = ArchiveSerializer.read(ByteBuffer.wrap(bytes)); - Assert.assertEquals(i, read.getBrokerId()); + try { + Assert.assertEquals(brokerid, read.getBrokerId()); + } catch (Throwable e) { + System.out.println("index: " + i + " brokerid: "+ read.getBrokerId()); + throw e; + } + } else { + brokerid -= 1; } } } @@ -129,6 +194,7 @@ private void delTestFolder(String testPath) { File file = new File(testPath); if (file.isDirectory()) { File[] files = file.listFiles(); + assert files != null; if (files.length > 0) { Arrays.stream(files).forEach(file1 -> file1.delete()); } @@ -149,4 +215,262 @@ private String getTestPath() { return userPath + childPath; } + private String getTestFile() { + return ""; + } + + //@Test + public void testConsumeArchiveService() throws NoSuchFieldException, IllegalAccessException { + ArchiveStore archiveStore = new MockArchiveStore(); + Field archiveField = consumeService.getClass().getDeclaredField("archiveStore"); + archiveField.setAccessible(true); + archiveField.set(consumeService, archiveStore); + + ConsumeArchiveService.ArchiveMappedFileRepository repository = consumeService.new ArchiveMappedFileRepository(archiveConfig.getArchivePath()); + Field repositoryField = consumeService.getClass().getDeclaredField("repository"); + repositoryField.setAccessible(true); + repositoryField.set(consumeService, repository); + + + Field counterField = consumeService.getClass().getDeclaredField("readByteCounter"); + counterField.setAccessible(true); + counterField.set(consumeService, readByteCounter); + + LoopThread readConsumeLogThread = LoopThread.builder() + .sleepTime(1, 10) + .name("ReadAndPutHBase-ConsumeLog-Thread") + .onException(e -> { + System.out.println(e.getMessage()); + repository.rollBack(readByteCounter.get()); + System.out.println("finish rollback."); + }) + .doWork(this::wrapReadAndWrite) + .build(); + Field readThread = consumeService.getClass().getDeclaredField("readConsumeLogThread"); + readThread.setAccessible(true); + readThread.set(consumeService, readConsumeLogThread); + + readConsumeLogThread.start(); + } + + private void wrapReadAndWrite() throws InterruptedException, JoyQueueException { + consumeService.readAndWrite(); + } + + @Test + public void testArchive() { + ConsumeArchiveService.ArchiveMappedFileRepository archiveMappedFileRepository = + consumeService.new ArchiveMappedFileRepository(getTestPath()); + + try { + readBatch(archiveMappedFileRepository); + } catch (Exception e) { + archiveMappedFileRepository.rollBack(readByteCounter.get()); + try { + readBatch(archiveMappedFileRepository); + } catch (Exception e1) { + e1.printStackTrace(); + } + } + System.out.println("Finish"); + } + + private void readBatch(ConsumeArchiveService.ArchiveMappedFileRepository repository) throws Exception { + int readBatchSize; + int batchSize = 1000; + int exception = 0; + do { + List list = readConsumeLog(repository, batchSize); + readBatchSize = list.size(); + if (readBatchSize > 0) { + long startTime = SystemClock.now(); + + // 调用存储接口写数据 + System.out.println("Store batch: " + batchSize); + /*if (++exception == 3) { + throw new Exception("Exception for store"); + }*/ + + long endTime = SystemClock.now(); + + } else { + break; + } + } while (readBatchSize == batchSize); + } + + private List readConsumeLog(ConsumeArchiveService.ArchiveMappedFileRepository repository, int count) { + // 每次读取之前清零 + readByteCounter.set(0); + + List list = new LinkedList<>(); + for (int i = 0; i < count; i++) { + byte[] bytes = repository.readOne(); + // 读到结尾会返回byte[0] + if (bytes.length > 0) { + // 1个字节开始符号,4个字节int类型长度信息 + readByteCounter.addAndGet(1 + 4 + bytes.length); + // 反序列花并放入集合 + list.add(ArchiveSerializer.read(ByteBuffer.wrap(bytes))); + } else { + break; + } + } + return list; + } + + //@Test + public void testAsyncRead() throws IOException { + String path = getTestPath(); + String fileName = getTestFile(); + File rFile = new File(path + fileName); + MappedByteBuffer rMap = new RandomAccessFile(rFile, "r").getChannel().map(FileChannel.MapMode.READ_ONLY, 0, 1024); + rMap.position(100); + int i100 = rMap.getInt(); + System.out.println(i100); + print(rMap); + int iUnknown = rMap.getInt(); + System.out.println(iUnknown); + print(rMap); + byte[] bytes = new byte[iUnknown]; + rMap.get(bytes); + System.out.println(bytes); + print(rMap); + } + + //@Test + public void testMappedFileRollback() throws IOException { + String path = getTestPath(); + String fileName = getTestFile(); + File file = new File(path + fileName); + MappedByteBuffer map = new RandomAccessFile(file, "r").getChannel().map(FileChannel.MapMode.READ_ONLY, 0, 1024 * 1024 * 16);; + print(map); + map.position(6815718); + boolean find = false; + do { + find = rollback(map); + + if (find) { + print(map); + int msgLen = map.getInt(); + byte[] bytes = new byte[msgLen]; + map.get(bytes); + ConsumeLog log = ArchiveSerializer.read(ByteBuffer.wrap(bytes)); + System.out.println(log); + } + } while (!find); + } + + private boolean rollback(MappedByteBuffer map) { + if (map.get() != Byte.MIN_VALUE) { + map.position(map.position() - 2); + return false; + } else { + return true; + } + } + + //@Test + public void testMappedFileRW() throws IOException { + String pathR = getTestPath(); + String fileNameR = getTestFile(); + File fileR = new File(pathR + fileNameR); + MappedByteBuffer mapR = new RandomAccessFile(fileR, "r").getChannel().map(FileChannel.MapMode.READ_ONLY, 0, 1024 * 1024 * 16);; + print(mapR); + String pathW = getTestPath(); + String fileNameW = getTestFile(); + File fileW = new File(pathW + fileNameW); + MappedByteBuffer mapW = new RandomAccessFile(fileW, "r").getChannel().map(FileChannel.MapMode.READ_ONLY, 0, 1024 * 1024 * 16);; + print(mapW); + + boolean isExit = true; + do { + if (checkPositionReadable(fileW, fileR, mapR, 5181218)) { + int msgLen = mapR.getInt(); + byte[] bytes = new byte[msgLen]; + mapR.get(bytes); + print(mapR); + } else if (checkFileEndFlag(mapR)) { + isExit = false; + System.out.println("File end, lenght: " + mapR); + break; + } + } while (isExit); + } + + //@Test + public void testMappedFileRead() throws IOException { + AtomicInteger counter = new AtomicInteger(0); + String path = getTestPath(); + String fileName = getTestFile(); + File file = new File(path + fileName); + MappedByteBuffer map = new RandomAccessFile(file, "r").getChannel().map(FileChannel.MapMode.READ_ONLY, 0, 1024 * 1024 * 16);; + print(map); + //map.position(5705701); + map.position(2768394); + print(map); + boolean isExit = true; + int exception = 3; + int error = 0; + do { + counter.set(0); + for (int i = 0; i < 1000; i++) { + if (checkStartFlag(map)) { + int msgLen = map.getInt(); + byte[] bytes = new byte[msgLen]; + map.get(bytes); + //print(map); + counter.addAndGet(1 + 4 + bytes.length); + } else if (checkFileEndFlag(map)) { + isExit = false; + System.out.println("File end, lenght: " + counter.get()); + break; + } + } + if (++error % exception == 0) { + print(map); + map.position(map.position() - counter.get()); + } + } while (isExit); + } + + private boolean checkStartFlag(MappedByteBuffer rMap) { + if (rMap.position() + 1 < 1024 * 1024 * 16) { + if (rMap.get() == Byte.MIN_VALUE) { + return true; + } else { + rMap.position(rMap.position() - 1); + } + } + return false; + } + + private boolean checkFileEndFlag(MappedByteBuffer rMap) { + if (rMap.position() + 1 <= 1024 * 1024 * 16) { + if (rMap.get() == Byte.MAX_VALUE) { + rMap.position(rMap.position() - 1); + return true; + } else { + rMap.position(rMap.position() - 1); + } + } + return false; + } + + private boolean checkPositionReadable(File rwFile, File rFile, MappedByteBuffer rMap, int position) { + if (rwFile.getName().equals(rFile.getName())) { + if (rMap.position() <= position) { + return checkStartFlag(rMap); + } else { + return false; + } + } + return checkStartFlag(rMap); + } + + private static void print(ByteBuffer buffer) { + System.out.printf("position: %d, limit: %d, capacity: %d\n", + buffer.position(), buffer.limit(), buffer.capacity()); + } + } \ No newline at end of file diff --git a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/store/hbase/MockArchiveStore.java b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/store/hbase/MockArchiveStore.java new file mode 100644 index 000000000..4d5564e4a --- /dev/null +++ b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/archive/store/hbase/MockArchiveStore.java @@ -0,0 +1,80 @@ +package org.joyqueue.broker.archive.store.hbase; + +import org.joyqueue.exception.JoyQueueCode; +import org.joyqueue.exception.JoyQueueException; +import org.joyqueue.monitor.PointTracer; +import org.joyqueue.server.archive.store.api.ArchiveStore; +import org.joyqueue.server.archive.store.model.AchivePosition; +import org.joyqueue.server.archive.store.model.ConsumeLog; +import org.joyqueue.server.archive.store.model.Query; +import org.joyqueue.server.archive.store.model.SendLog; + +import java.util.List; + +public class MockArchiveStore implements ArchiveStore { + + int exception = 0; + int exceptionCount = 3; + + @Override + public void putConsumeLog(List consumeLogs, PointTracer tracer) throws JoyQueueException { + if (++exception % 3 == 0) { + throw new JoyQueueException("Exception for store", JoyQueueCode.SE_IO_ERROR.getCode()); + } + } + + @Override + public void putSendLog(List sendLogs, PointTracer tracer) throws JoyQueueException { + + } + + @Override + public void putPosition(AchivePosition achivePosition) throws JoyQueueException { + + } + + @Override + public Long getPosition(String topic, short partition) throws JoyQueueException { + return null; + } + + @Override + public void cleanPosition(String topic, short partition) throws JoyQueueException { + + } + + @Override + public List scanSendLog(Query query) throws JoyQueueException { + return null; + } + + @Override + public SendLog getOneSendLog(Query query) throws JoyQueueException { + return null; + } + + @Override + public List scanConsumeLog(String messageId, Integer count) throws JoyQueueException { + return null; + } + + @Override + public void setNameSpace(String nameSpace) { + + } + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() { + + } + + @Override + public boolean isStarted() { + return false; + } +} diff --git a/pom.xml b/pom.xml index 7f5e11a07..0e3e4cd39 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ 3.2.2 4.1 1.4 + 3.1 4.5.12 4.1.4 2.9.10.4 @@ -560,6 +561,11 @@ commons-beanutils ${commons-beanutils.version} + + commons-net + commons-net + ${commons-net.version} + commons-logging commons-logging