diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 2e72f9e6f28..999826b2a8e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -80,6 +80,7 @@ public class MessageStoreConfig { private boolean timerEnableCheckMetrics = true; private boolean timerInterceptDelayLevel = false; private int timerMaxDelaySec = 3600 * 24 * 3; + private boolean timerWheelSnapshotFlush = false; private boolean timerWheelEnable = true; /** @@ -103,6 +104,7 @@ public class MessageStoreConfig { private int timerMetricSmallThreshold = 1000000; private int timerProgressLogIntervalMs = 10 * 1000; + private int timerWheelSnapshotIntervalMs = 10 * 1000; // default, defaultRocksDB @ImportantField @@ -1747,6 +1749,14 @@ public boolean isTimerWarmEnable() { return timerWarmEnable; } + public boolean isTimerWheelSnapshotFlush() { + return timerWheelSnapshotFlush; + } + + public void setTimerWheelSnapshotFlush(boolean timerWheelSnapshotFlush) { + this.timerWheelSnapshotFlush = timerWheelSnapshotFlush; + } + public boolean isTimerWheelEnable() { return timerWheelEnable; } @@ -1792,6 +1802,14 @@ public int getTimerProgressLogIntervalMs() { return timerProgressLogIntervalMs; } + public int getTimerWheelSnapshotIntervalMs() { + return timerWheelSnapshotIntervalMs; + } + + public void setTimerWheelSnapshotIntervalMs(int timerWheelSnapshotIntervalMs) { + this.timerWheelSnapshotIntervalMs = timerWheelSnapshotIntervalMs; + } + public void setTimerProgressLogIntervalMs(final int timerProgressLogIntervalMs) { this.timerProgressLogIntervalMs = timerProgressLogIntervalMs; } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerLog.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerLog.java index 01b56ee449b..689f1d792f0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerLog.java @@ -19,6 +19,7 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.store.RunningFlags; import org.apache.rocketmq.store.logfile.MappedFile; import org.apache.rocketmq.store.MappedFileQueue; import org.apache.rocketmq.store.SelectMappedBufferResult; @@ -45,8 +46,12 @@ public class TimerLog { private final int fileSize; public TimerLog(final String storePath, final int fileSize) { + this(storePath, fileSize, null, false); + } + + public TimerLog(final String storePath, final int fileSize, RunningFlags runningFlags, boolean writeWithoutMmap) { this.fileSize = fileSize; - this.mappedFileQueue = new MappedFileQueue(storePath, fileSize, null); + this.mappedFileQueue = new MappedFileQueue(storePath, fileSize, null, runningFlags, writeWithoutMmap); } public boolean load() { diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 8b995fbd709..6e40b5c7f21 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store.timer; import com.conversantmedia.util.concurrent.DisruptorBlockingQueue; +import io.opentelemetry.api.common.Attributes; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -40,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import io.opentelemetry.api.common.Attributes; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.rocketmq.common.ServiceThread; @@ -61,6 +61,7 @@ import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.RunningFlags; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -160,6 +161,8 @@ public class TimerMessageStore { private final BrokerStatsManager brokerStatsManager; private Function escapeBridgeHook; + private final Object lockWhenFlush = new Object(); + public TimerMessageStore(final MessageStore messageStore, final MessageStoreConfig storeConfig, TimerCheckpoint timerCheckpoint, TimerMetrics timerMetrics, final BrokerStatsManager brokerStatsManager) throws IOException { @@ -172,9 +175,29 @@ public TimerMessageStore(final MessageStore messageStore, final MessageStoreConf // TimerWheel contains the fixed number of slots regardless of precision. this.slotsTotal = TIMER_WHEEL_TTL_DAY * DAY_SECS; + + String timerWheelPath = getTimerWheelPath(storeConfig.getStorePathRootDir()); + long snapOffset = -1; + if (storeConfig.isTimerWheelSnapshotFlush()) { + snapOffset = TimerWheel.getMaxSnapshotFlag(timerWheelPath); + if (snapOffset > 0) { + // correct recover offset + timerCheckpoint.setLastTimerLogFlushPos(snapOffset); + LOGGER.info("found timerWheel snapshot offset {}", snapOffset); + } else { + LOGGER.info("not found timerWheel snapshot", snapOffset); + } + } + + RunningFlags runningFlags = null; + if (storeConfig.isEnableRunningFlagsInFlush() && messageStore != null) { + runningFlags = messageStore.getRunningFlags(); + } + this.timerWheel = new TimerWheel( - getTimerWheelPath(storeConfig.getStorePathRootDir()), this.slotsTotal, precisionMs); - this.timerLog = new TimerLog(getTimerLogPath(storeConfig.getStorePathRootDir()), timerLogFileSize); + timerWheelPath, this.slotsTotal, precisionMs, snapOffset); + this.timerLog = new TimerLog(getTimerLogPath(storeConfig.getStorePathRootDir()), timerLogFileSize, + runningFlags, storeConfig.isWriteWithoutMmap()); this.timerMetrics = timerMetrics; this.timerCheckpoint = timerCheckpoint; this.lastBrokerRole = storeConfig.getBrokerRole(); @@ -617,7 +640,6 @@ public void shutdown() { } } - protected void maybeMoveWriteTime() { if (currWriteTimeMs < formatTimeMs(System.currentTimeMillis())) { currWriteTimeMs = formatTimeMs(System.currentTimeMillis()); @@ -959,7 +981,7 @@ public void checkDequeueLatch(CountDownLatch latch, long delayedTime) throws Exc LOGGER.info("Not Running dequeue, skip checkDequeueLatch for delayedTime:{}", delayedTime); break; } - + if (dequeuePutQueue.size() > 0 || !checkStateForGetMessages(AbstractStateService.WAITING) || !checkStateForPutMessages(AbstractStateService.WAITING)) { @@ -1478,7 +1500,13 @@ protected void fetchAndPutTimerRequest() throws Exception { CountDownLatch latch = new CountDownLatch(trs.size()); for (TimerRequest req : trs) { req.setLatch(latch); - this.putMessageToTimerWheel(req); + if (storeConfig.isTimerWheelSnapshotFlush()) { + synchronized (lockWhenFlush) { + this.putMessageToTimerWheel(req); + } + } else { + this.putMessageToTimerWheel(req); + } } checkDequeueLatch(latch, -1); boolean allSuccess = trs.stream().allMatch(TimerRequest::isSucc); @@ -1790,7 +1818,8 @@ public boolean needDelete(int magic) { public class TimerFlushService extends ServiceThread { private final SimpleDateFormat sdf = new SimpleDateFormat("MM-dd HH:mm:ss"); - @Override public String getServiceName() { + @Override + public String getServiceName() { String brokerIdentifier = ""; if (TimerMessageStore.this.messageStore instanceof DefaultMessageStore && ((DefaultMessageStore) TimerMessageStore.this.messageStore).getBrokerConfig().isInBrokerContainer()) { brokerIdentifier = ((DefaultMessageStore) TimerMessageStore.this.messageStore).getBrokerConfig().getIdentifier(); @@ -1805,33 +1834,55 @@ private String format(long time) { @Override public void run() { TimerMessageStore.LOGGER.info(this.getServiceName() + " service start"); - long start = System.currentTimeMillis(); while (!this.isStopped()) { try { - prepareTimerCheckPoint(); - timerLog.getMappedFileQueue().flush(0); - timerWheel.flush(); - timerCheckpoint.flush(); - if (System.currentTimeMillis() - start > storeConfig.getTimerProgressLogIntervalMs()) { - start = System.currentTimeMillis(); - long tmpQueueOffset = currQueueOffset; - ConsumeQueueInterface cq = messageStore.getConsumeQueue(TIMER_TOPIC, 0); - long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue(); - TimerMessageStore.LOGGER.info("[{}]Timer progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{} currReadOffset:{} offsetBehind:{} behindMaster:{} " + - "enqPutQueue:{} deqGetQueue:{} deqPutQueue:{} allCongestNum:{} enqExpiredStoreTime:{}", - storeConfig.getBrokerRole(), - format(commitReadTimeMs), format(currReadTimeMs), format(currWriteTimeMs), getDequeueBehind(), - tmpQueueOffset, maxOffsetInQueue - tmpQueueOffset, timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset, - enqueuePutQueue.size(), dequeueGetQueue.size(), dequeuePutQueue.size(), getAllCongestNum(), format(lastEnqueueButExpiredStoreTime)); - } - timerMetrics.persist(); - waitForRunning(storeConfig.getTimerFlushIntervalMs()); + this.flush(); } catch (Throwable e) { TimerMessageStore.LOGGER.error("Error occurred in " + getServiceName(), e); } + try { + waitForRunning(storeConfig.getTimerFlushIntervalMs()); + } catch (Throwable e) { + // ignore interrupt + } } TimerMessageStore.LOGGER.info(this.getServiceName() + " service end"); } + + long start = System.currentTimeMillis(); + long lastSnapshotTime = System.currentTimeMillis(); + + public void flush() throws IOException { + if (storeConfig.isTimerWheelSnapshotFlush()) { + synchronized (lockWhenFlush) { + prepareTimerCheckPoint(); + timerLog.getMappedFileQueue().flush(0); + if (System.currentTimeMillis() - lastSnapshotTime > storeConfig.getTimerWheelSnapshotIntervalMs()) { + lastSnapshotTime = System.currentTimeMillis(); + timerWheel.backup(timerLog.getMappedFileQueue().getFlushedWhere()); + } + timerCheckpoint.flush(); + } + } else { + prepareTimerCheckPoint(); + timerLog.getMappedFileQueue().flush(0); + timerWheel.flush(); + timerCheckpoint.flush(); + } + if (System.currentTimeMillis() - start > storeConfig.getTimerProgressLogIntervalMs()) { + start = System.currentTimeMillis(); + long tmpQueueOffset = currQueueOffset; + ConsumeQueueInterface cq = messageStore.getConsumeQueue(TIMER_TOPIC, 0); + long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue(); + TimerMessageStore.LOGGER.info("[{}]Timer progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{} currReadOffset:{} offsetBehind:{} behindMaster:{} " + + "enqPutQueue:{} deqGetQueue:{} deqPutQueue:{} allCongestNum:{} enqExpiredStoreTime:{}", + storeConfig.getBrokerRole(), + format(commitReadTimeMs), format(currReadTimeMs), format(currWriteTimeMs), getDequeueBehind(), + tmpQueueOffset, maxOffsetInQueue - tmpQueueOffset, timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset, + enqueuePutQueue.size(), dequeueGetQueue.size(), dequeuePutQueue.size(), getAllCongestNum(), format(lastEnqueueButExpiredStoreTime)); + } + timerMetrics.persist(); + } } public long getAllCongestNum() { @@ -2023,4 +2074,8 @@ public TimerCheckpoint getTimerCheckpoint() { public static String buildDeleteKey(String realTopic, String uniqueKey) { return realTopic + "+" + uniqueKey; } + + public TimerFlushService getTimerFlushService() { + return timerFlushService; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java index 6c7d1645925..261d8f6a3db 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.store.timer; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -32,13 +38,12 @@ public class TimerWheel { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + public static final String TIMER_WHEEL_FILE_NAME = "timerwheel"; public static final int BLANK = -1, IGNORE = -2; public final int slotsTotal; public final int precisionMs; - private String fileName; - private final RandomAccessFile randomAccessFile; - private final FileChannel fileChannel; - private final MappedByteBuffer mappedByteBuffer; + private final String fileName; + private MappedByteBuffer mappedByteBuffer; private final ByteBuffer byteBuffer; private final ThreadLocal localBuffer = new ThreadLocal() { @Override @@ -48,34 +53,47 @@ protected ByteBuffer initialValue() { }; private final int wheelLength; + private long snapOffset; + public TimerWheel(String fileName, int slotsTotal, int precisionMs) throws IOException { + this(fileName, slotsTotal, precisionMs, -1); + } + public TimerWheel(String fileName, int slotsTotal, int precisionMs, long snapOffset) throws IOException { this.slotsTotal = slotsTotal; this.precisionMs = precisionMs; this.fileName = fileName; this.wheelLength = this.slotsTotal * 2 * Slot.SIZE; + this.snapOffset = snapOffset; - File file = new File(fileName); + String finalFileName = selectSnapshotByFlag(snapOffset); + File file = new File(finalFileName); UtilAll.ensureDirOK(file.getParent()); + RandomAccessFile randomAccessFile = null; try { - randomAccessFile = new RandomAccessFile(this.fileName, "rw"); + randomAccessFile = new RandomAccessFile(finalFileName, "rw"); if (file.exists() && randomAccessFile.length() != 0 && randomAccessFile.length() != wheelLength) { throw new RuntimeException(String.format("Timer wheel length:%d != expected:%s", randomAccessFile.length(), wheelLength)); } randomAccessFile.setLength(wheelLength); - fileChannel = randomAccessFile.getChannel(); - mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, wheelLength); - assert wheelLength == mappedByteBuffer.remaining(); + if (snapOffset < 0) { + mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, wheelLength); + assert wheelLength == mappedByteBuffer.remaining(); + } this.byteBuffer = ByteBuffer.allocateDirect(wheelLength); - this.byteBuffer.put(mappedByteBuffer); + this.byteBuffer.put(Files.readAllBytes(file.toPath())); } catch (FileNotFoundException e) { - log.error("create file channel " + this.fileName + " Failed. ", e); + log.error("create file channel " + finalFileName + " Failed. ", e); throw e; } catch (IOException e) { - log.error("map file " + this.fileName + " Failed. ", e); + log.error("map file " + finalFileName + " Failed. ", e); throw e; + } finally { + if (randomAccessFile != null) { + randomAccessFile.close(); + } } } @@ -96,15 +114,12 @@ public void shutdown(boolean flush) { UtilAll.cleanBuffer(this.mappedByteBuffer); UtilAll.cleanBuffer(this.byteBuffer); localBuffer.remove(); - - try { - this.fileChannel.close(); - } catch (Throwable t) { - log.error("Shutdown error in timer wheel", t); - } } public void flush() { + if (mappedByteBuffer == null) { + return; + } ByteBuffer bf = localBuffer.get(); bf.position(0); bf.limit(wheelLength); @@ -118,6 +133,131 @@ public void flush() { this.mappedByteBuffer.force(); } + /** + * Perform backup operation. + *

+ * Select snapshot file based on the provided flag, write current buffer content to a temporary file, + * then rename the temporary file to the formal snapshot file. If rename fails, delete the temporary file. + * Finally clean up expired snapshot files. + * + * @param flushWhere Flag used to select snapshot file. + * @throws IOException If I/O error occurs during backup process. + */ + public void backup(long flushWhere) throws IOException { + // Get current local buffer and position it to the beginning + ByteBuffer bf = localBuffer.get(); + bf.position(0); + bf.limit(wheelLength); + + // Select snapshot file name based on flag + String fileName = selectSnapshotByFlag(flushWhere); + File bakFile = new File(fileName); + // Create or open temporary file for snapshot, ready for writing + File tmpFile = new File(fileName + ".tmp"); + // Delete if exists first + Files.deleteIfExists(tmpFile.toPath()); + try (RandomAccessFile randomAccessFile = new RandomAccessFile(tmpFile, "rw")) { + try (FileChannel fileChannel = randomAccessFile.getChannel()) { + fileChannel.write(bf); + fileChannel.force(true); + } + } + + if (tmpFile.exists()) { + // atomic move + Files.move(tmpFile.toPath(), bakFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + + // sync the directory, ensure that the bak file is visible + MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); + } + cleanExpiredSnapshot(); // Clean up expired snapshot files + } + + /** + * Select snapshot file name based on flag. + * + * @param flag Flag used to select or identify snapshot file. + * @return Name of the snapshot file. + */ + private String selectSnapshotByFlag(long flag) { + if (flag < 0) { + return this.fileName; // If flag is less than 0, return default file name + } + return this.fileName + "." + flag; // Otherwise, return file name with flag suffix + } + + /** + * Clean up expired snapshot files. + *

+ * This method will find and delete all snapshot files with flags smaller than the specified value + * under the current file name, keeping the two snapshot files with the largest flags. + */ + public void cleanExpiredSnapshot() { + File dir = new File(this.fileName).getParentFile(); + File[] files = dir.listFiles(); + if (files == null) { + return; + } + + // Collect all snapshot files and their flags + List snapshotFiles = new ArrayList<>(); + for (File file : files) { + String fileName = file.getName(); + if (fileName.startsWith(TIMER_WHEEL_FILE_NAME + ".")) { + long flag = UtilAll.asLong(fileName.substring(TIMER_WHEEL_FILE_NAME.length() + 1), -1); + if (flag >= 0) { + snapshotFiles.add(new FileWithFlag(file, flag)); + } + } + } + + // Sort by flag in descending order + snapshotFiles.sort((a, b) -> Long.compare(b.flag, a.flag)); + + // Delete all files except the first two + for (int i = 2; i < snapshotFiles.size(); i++) { + UtilAll.deleteFile(snapshotFiles.get(i).file); + } + } + + /** + * Get the maximum flag from existing snapshot files. + * + * @return The maximum flag value, or -1 if no snapshot files exist + */ + public static long getMaxSnapshotFlag(String timerWheelPath) { + File dir = new File(timerWheelPath).getParentFile(); + File[] files = dir.listFiles(); + if (files == null) { + return -1; + } + + long maxFlag = -1; + for (File file : files) { + String fileName = file.getName(); + if (fileName.startsWith(TIMER_WHEEL_FILE_NAME + ".")) { + long flag = UtilAll.asLong(fileName.substring(TIMER_WHEEL_FILE_NAME.length() + 1), -1); + if (flag > maxFlag) { + maxFlag = flag; + } + } + } + return maxFlag; + } + + /** + * Wrapper class for file and flag + */ + private static class FileWithFlag { + final File file; + final long flag; + + FileWithFlag(File file, long flag) { + this.file = file; + this.flag = flag; + } + } + public Slot getSlot(long timeMs) { Slot slot = getRawSlot(timeMs); if (slot.timeMs != timeMs / precisionMs * precisionMs) { @@ -145,6 +285,7 @@ public void putSlot(long timeMs, long firstPos, long lastPos) { localBuffer.get().putLong(firstPos); localBuffer.get().putLong(lastPos); } + public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) { localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE); localBuffer.get().putLong(timeMs / precisionMs); @@ -212,4 +353,8 @@ public long getAllNum(long timeStartMs) { } return allNum; } + + public String getFileName() { + return fileName; + } }