diff --git a/joyqueue-distribution/joyqueue-distribution-server/bin/console-archive-log.sh b/joyqueue-distribution/joyqueue-distribution-server/bin/console-archive-log.sh new file mode 100755 index 000000000..6389d117e --- /dev/null +++ b/joyqueue-distribution/joyqueue-distribution-server/bin/console-archive-log.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2019 The JoyQueue Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +exec $(dirname $0)/run-class.sh org.joyqueue.tools.archive.ConsumeArchieveLogReader "$@" \ No newline at end of file diff --git a/joyqueue-server/joyqueue-archive/joyqueue-archive-api/src/main/java/org/joyqueue/server/archive/store/api/ArchiveStore.java b/joyqueue-server/joyqueue-archive/joyqueue-archive-api/src/main/java/org/joyqueue/server/archive/store/api/ArchiveStore.java index 281539a56..ab92990c5 100644 --- a/joyqueue-server/joyqueue-archive/joyqueue-archive-api/src/main/java/org/joyqueue/server/archive/store/api/ArchiveStore.java +++ b/joyqueue-server/joyqueue-archive/joyqueue-archive-api/src/main/java/org/joyqueue/server/archive/store/api/ArchiveStore.java @@ -63,7 +63,7 @@ public interface ArchiveStore extends LifeCycle { * * @param topic * @param partition - * @return + * @return current archived position * @throws JoyQueueException */ Long getPosition(String topic, short partition) throws JoyQueueException; @@ -78,7 +78,7 @@ public interface ArchiveStore extends LifeCycle { * 查看发送日志 * * @param query - * @return + * @return send logs * @throws JoyQueueException */ List scanSendLog(Query query) throws JoyQueueException; @@ -87,7 +87,7 @@ public interface ArchiveStore extends LifeCycle { * 查看一条发送日志 * * @param query - * @return + * @return send log * @throws JoyQueueException */ SendLog getOneSendLog(Query query) throws JoyQueueException; @@ -97,7 +97,7 @@ public interface ArchiveStore extends LifeCycle { * * @param messageId * @param count - * @return + * @return consume logs * @throws JoyQueueException */ List scanConsumeLog(String messageId,Integer count) throws JoyQueueException; diff --git a/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseStore.java b/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseStore.java index fa9cc9d88..56d930f4f 100644 --- a/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseStore.java +++ b/joyqueue-server/joyqueue-archive/joyqueue-archive-hbase/src/main/java/org/joyqueue/server/archive/store/HBaseStore.java @@ -443,7 +443,7 @@ public List scanConsumeLog(String messageId, Integer count) throws J logger.error("hBaseClient is null,archive no service"); throw new JoyQueueException(JoyQueueCode.CN_SERVICE_NOT_AVAILABLE, "hBaseClient is null"); } - // 查询消费日志(rowkey=messageId+appId) + // 查询消费日志(rowkey = messageId+appId) List logList = new LinkedList<>(); // 查询发送日志(rowkey=topicId+sendTime+businessId) try { diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveSerializer.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveSerializer.java index 334ab0394..3346d8e2b 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveSerializer.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ArchiveSerializer.java @@ -91,6 +91,17 @@ private static int consumeLogSize(ConsumeLog consumeLog) { return size; } + /** + * length + consumeLog bytes + **/ + public static ConsumeLog tryRead(ByteBuffer buffer){ + int len= buffer.getInt(); + if(len<=0||buffer.remaining() convert(Connection connection, MessageLocation[] locati ConsumeLog log = new ConsumeLog(); byte[] bytesMsgId = buildMessageId(location); + log.setBytesMessageId(bytesMsgId); log.setApp(connection.getApp()); @@ -265,7 +266,7 @@ private List convert(Connection connection, MessageLocation[] locati } /** - * 构造消息Id + * 构造消息Id md5 定长 * * @param location 应答位置信息 * @return @@ -293,7 +294,7 @@ private synchronized void appendLog(ByteBuffer buffer) { /** * 本地日志日志文件存储 */ - static class ArchiveMappedFileRepository implements Closeable { + public static class ArchiveMappedFileRepository implements Closeable { // 消费归档文件本地根存储路径 private String baseDir; @@ -322,7 +323,7 @@ static class ArchiveMappedFileRepository implements Closeable { // 单个归档文件的大小 private final long pageSize = 1024 * 1024 * 16; // 16M - ArchiveMappedFileRepository(String baseDir) { + public ArchiveMappedFileRepository(String baseDir) { this.baseDir = baseDir; recover(); } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ProduceArchiveService.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ProduceArchiveService.java index b598a1102..3f79b64c1 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ProduceArchiveService.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/archive/ProduceArchiveService.java @@ -134,7 +134,7 @@ protected void validate() throws Exception { this.updateItemThread = LoopThread.builder() .sleepTime(1000 * 10, 1000 * 10) .name("UpdateArchiveItem-Thread") - .onException(e -> logger.warn("Exception:", e)) + .onException(e -> logger.warn("Produce archive service update item thread Exception:", e)) .doWork(() -> { // 更新item列表 updateArchiveItem(); @@ -145,7 +145,7 @@ protected void validate() throws Exception { this.readMsgThread = LoopThread.builder() .sleepTime(0, 10) .name("ReadArchiveMsg-Thread") - .onException(e -> logger.warn("Exception:", e)) + .onException(e -> logger.warn("Produce archive service read message thread Exception:", e)) .doWork(() -> { // 消费接口读取消息,放入队列 readArchiveMsg(); diff --git a/joyqueue-server/joyqueue-server-runtime/src/main/java/org/joyqueue/tools/archive/ConsumeArchiveLogReader.java b/joyqueue-server/joyqueue-server-runtime/src/main/java/org/joyqueue/tools/archive/ConsumeArchiveLogReader.java new file mode 100644 index 000000000..34898e81f --- /dev/null +++ b/joyqueue-server/joyqueue-server-runtime/src/main/java/org/joyqueue/tools/archive/ConsumeArchiveLogReader.java @@ -0,0 +1,186 @@ +/** + * Copyright 2019 The JoyQueue Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.joyqueue.tools.archive; + +import org.joyqueue.broker.archive.ArchiveSerializer; +import org.joyqueue.server.archive.store.HBaseSerializer; +import org.joyqueue.server.archive.store.model.ConsumeLog; +import org.joyqueue.toolkit.network.IpUtil; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; + +/** + * + * Consume archive log reader + * + **/ +public class ConsumeArchiveLogReader { + + private static final byte RECORD_START_FLAG= Byte.MIN_VALUE; + private static final byte RECORD_END_FLAG= Byte.MAX_VALUE; + // use larger buffer size than consume log file + private static final int bufferSize=32*1024*1024; + private ByteBuffer buf= ByteBuffer.allocate(bufferSize); + private static final DateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + public static void main(String[] args){ + if(args.length < 5){ + throw new IllegalArgumentException("args too less(base path,start time,end time,message id)," + + "such as /archive 1596084370000 1596123970000 ssss F7C0D91A52D59F4E1B05C8273EF41B69"); + } + String basePath=args[0]; + long startLastModifiedTimeMs=Long.parseLong(args[1]); + long endLastModifiedTimeMs=Long.parseLong(args[2]); + String topic=args[3]; + // message id md5 hex + String messageId=args[4]; + System.out.println(String.format("on %s,Scan for topic:%s,message id %s in %s-%s",basePath,topic,messageId, + sdf.format(new Date(startLastModifiedTimeMs)), + sdf.format(new Date(endLastModifiedTimeMs)))); + File consumeLogParentFile=new File(basePath); + if(!consumeLogParentFile.isDirectory()){ + throw new IllegalArgumentException(String.format("%s is not directory",consumeLogParentFile)); + } + File[] allConsumeLogs=consumeLogParentFile.listFiles(); + ConsumeArchiveLogReader reader=new ConsumeArchiveLogReader(messageId); + Arrays.sort(allConsumeLogs, Comparator.comparingLong(reader::FileToLong)); + // never read last file + for(int i=0;istartLastModifiedTimeMs&&log.lastModified() consumeLogs = reader.readConsumeLogFromFile(log); + reader.onConsumeLogEvent(consumeLogs); + }catch (IOException e){ + System.out.println("io exception:"+e); + break; + } + } + } + } + private String messageId; + // message id md5 bytes + private byte[] byteMessageId; + public ConsumeArchiveLogReader(String messageId){ + this.messageId=messageId; + this.byteMessageId= HBaseSerializer.hexStrToByteArray(messageId); + } + public long FileToLong(File file){ + return Long.parseLong(file.getName()); + } + + /** + * Consume message log and + **/ + public void onConsumeLogEvent(List consumeLogs){ + for(ConsumeLog log:consumeLogs){ + // consume log don't contain topic + if(Arrays.equals(log.getBytesMessageId(),byteMessageId)){ + System.out.println(formatConsumeLog(log)); + } + } + } + + /** + * Format consume log + **/ + public String formatConsumeLog(ConsumeLog log){ + StringBuilder clientIp = new StringBuilder(); + IpUtil.toAddress(log.getClientIp(), clientIp); + return String.format("Consume event found, %s %s %s %s",messageId,log.getApp(),clientIp,sdf.format(new Date(log.getConsumeTime()))); + } + + + /** + * Read consume log from log file + **/ + public List readConsumeLogFromFile(File consumeLog) throws IOException { + if(!consumeLog.exists()){ + return null; + } + List consumeLogs=new ArrayList<>(); + RandomAccessFile randomAccessFile=null; + FileChannel fileChannel=null; + try { + buf.clear(); + randomAccessFile= new RandomAccessFile(consumeLog, "r"); + fileChannel= randomAccessFile.getChannel(); + while (true) { + try { + //if(readBuffer.) + int readSize = fileChannel.read(buf); + if (readSize > 0) { + buf.flip(); + // prepare for read + consumeLogs.addAll(readConsumeLog(consumeLog.getName(),buf)); + // process + if (buf.hasRemaining()) { + buf.compact(); + } + buf.flip(); + } else { + break; + } + } catch (IOException e) { + System.out.println("read exception:" + e); + break; + }catch (IllegalStateException e){ + System.out.println("on file: "+consumeLog.getName() + ", " + e); + break; + } + } + }finally { + if(fileChannel!=null) { + fileChannel.close(); + } + if(randomAccessFile!=null) { + randomAccessFile.close(); + } + } + System.out.println("read consume log on "+consumeLog.getName()+" finished!"); + return consumeLogs; + } + + /** + * Read consume log from buffer + **/ + public List readConsumeLog(String logName,ByteBuffer buffer){ + List consumeLogs=new ArrayList<>(); + while(buffer.hasRemaining()){ + byte flag= buffer.get(); + if(flag == RECORD_START_FLAG) { + ConsumeLog log = ArchiveSerializer.tryRead(buffer); + if (log != null) { + consumeLogs.add(log); + } + }else if(flag == RECORD_END_FLAG){ + System.out.println(String.format("Meet %s end flag",logName)); + break; + }else{ + // reset + buffer.position(buffer.position()-1); + throw new IllegalStateException(String.format("Bad format on %s",logName)); + } + } + return consumeLogs; + } + +}