Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class MessageStoreConfig {
private boolean timerEnableCheckMetrics = true;
private boolean timerInterceptDelayLevel = false;
private int timerMaxDelaySec = 3600 * 24 * 3;
private boolean timerWheelSnapshotFlush = false;
private boolean timerWheelEnable = true;

/**
Expand All @@ -103,6 +104,7 @@ public class MessageStoreConfig {

private int timerMetricSmallThreshold = 1000000;
private int timerProgressLogIntervalMs = 10 * 1000;
private int timerWheelSnapshotIntervalMs = 10 * 1000;

// default, defaultRocksDB
@ImportantField
Expand Down Expand Up @@ -1747,6 +1749,14 @@ public boolean isTimerWarmEnable() {
return timerWarmEnable;
}

public boolean isTimerWheelSnapshotFlush() {
return timerWheelSnapshotFlush;
}

public void setTimerWheelSnapshotFlush(boolean timerWheelSnapshotFlush) {
this.timerWheelSnapshotFlush = timerWheelSnapshotFlush;
}

public boolean isTimerWheelEnable() {
return timerWheelEnable;
}
Expand Down Expand Up @@ -1792,6 +1802,14 @@ public int getTimerProgressLogIntervalMs() {
return timerProgressLogIntervalMs;
}

public int getTimerWheelSnapshotIntervalMs() {
return timerWheelSnapshotIntervalMs;
}

public void setTimerWheelSnapshotIntervalMs(int timerWheelSnapshotIntervalMs) {
this.timerWheelSnapshotIntervalMs = timerWheelSnapshotIntervalMs;
}

public void setTimerProgressLogIntervalMs(final int timerProgressLogIntervalMs) {
this.timerProgressLogIntervalMs = timerProgressLogIntervalMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.RunningFlags;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.MappedFileQueue;
import org.apache.rocketmq.store.SelectMappedBufferResult;
Expand All @@ -45,8 +46,12 @@ public class TimerLog {
private final int fileSize;

public TimerLog(final String storePath, final int fileSize) {
this(storePath, fileSize, null, false);
}

public TimerLog(final String storePath, final int fileSize, RunningFlags runningFlags, boolean writeWithoutMmap) {
this.fileSize = fileSize;
this.mappedFileQueue = new MappedFileQueue(storePath, fileSize, null);
this.mappedFileQueue = new MappedFileQueue(storePath, fileSize, null, runningFlags, writeWithoutMmap);
}

public boolean load() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.store.timer;

import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import io.opentelemetry.api.common.Attributes;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -40,7 +41,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import io.opentelemetry.api.common.Attributes;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.common.ServiceThread;
Expand All @@ -61,6 +61,7 @@
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.RunningFlags;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
Expand Down Expand Up @@ -160,6 +161,8 @@ public class TimerMessageStore {
private final BrokerStatsManager brokerStatsManager;
private Function<MessageExtBrokerInner, PutMessageResult> escapeBridgeHook;

private final Object lockWhenFlush = new Object();

public TimerMessageStore(final MessageStore messageStore, final MessageStoreConfig storeConfig,
TimerCheckpoint timerCheckpoint, TimerMetrics timerMetrics,
final BrokerStatsManager brokerStatsManager) throws IOException {
Expand All @@ -172,9 +175,29 @@ public TimerMessageStore(final MessageStore messageStore, final MessageStoreConf

// TimerWheel contains the fixed number of slots regardless of precision.
this.slotsTotal = TIMER_WHEEL_TTL_DAY * DAY_SECS;

String timerWheelPath = getTimerWheelPath(storeConfig.getStorePathRootDir());
long snapOffset = -1;
if (storeConfig.isTimerWheelSnapshotFlush()) {
snapOffset = TimerWheel.getMaxSnapshotFlag(timerWheelPath);
if (snapOffset > 0) {
// correct recover offset
timerCheckpoint.setLastTimerLogFlushPos(snapOffset);
LOGGER.info("found timerWheel snapshot offset {}", snapOffset);
} else {
LOGGER.info("not found timerWheel snapshot", snapOffset);
}
}

RunningFlags runningFlags = null;
if (storeConfig.isEnableRunningFlagsInFlush() && messageStore != null) {
runningFlags = messageStore.getRunningFlags();
}

this.timerWheel = new TimerWheel(
getTimerWheelPath(storeConfig.getStorePathRootDir()), this.slotsTotal, precisionMs);
this.timerLog = new TimerLog(getTimerLogPath(storeConfig.getStorePathRootDir()), timerLogFileSize);
timerWheelPath, this.slotsTotal, precisionMs, snapOffset);
this.timerLog = new TimerLog(getTimerLogPath(storeConfig.getStorePathRootDir()), timerLogFileSize,
runningFlags, storeConfig.isWriteWithoutMmap());
this.timerMetrics = timerMetrics;
this.timerCheckpoint = timerCheckpoint;
this.lastBrokerRole = storeConfig.getBrokerRole();
Expand Down Expand Up @@ -617,7 +640,6 @@ public void shutdown() {
}
}


protected void maybeMoveWriteTime() {
if (currWriteTimeMs < formatTimeMs(System.currentTimeMillis())) {
currWriteTimeMs = formatTimeMs(System.currentTimeMillis());
Expand Down Expand Up @@ -959,7 +981,7 @@ public void checkDequeueLatch(CountDownLatch latch, long delayedTime) throws Exc
LOGGER.info("Not Running dequeue, skip checkDequeueLatch for delayedTime:{}", delayedTime);
break;
}

if (dequeuePutQueue.size() > 0
|| !checkStateForGetMessages(AbstractStateService.WAITING)
|| !checkStateForPutMessages(AbstractStateService.WAITING)) {
Expand Down Expand Up @@ -1478,7 +1500,13 @@ protected void fetchAndPutTimerRequest() throws Exception {
CountDownLatch latch = new CountDownLatch(trs.size());
for (TimerRequest req : trs) {
req.setLatch(latch);
this.putMessageToTimerWheel(req);
if (storeConfig.isTimerWheelSnapshotFlush()) {
synchronized (lockWhenFlush) {
this.putMessageToTimerWheel(req);
}
} else {
this.putMessageToTimerWheel(req);
}
}
checkDequeueLatch(latch, -1);
boolean allSuccess = trs.stream().allMatch(TimerRequest::isSucc);
Expand Down Expand Up @@ -1790,7 +1818,8 @@ public boolean needDelete(int magic) {
public class TimerFlushService extends ServiceThread {
private final SimpleDateFormat sdf = new SimpleDateFormat("MM-dd HH:mm:ss");

@Override public String getServiceName() {
@Override
public String getServiceName() {
String brokerIdentifier = "";
if (TimerMessageStore.this.messageStore instanceof DefaultMessageStore && ((DefaultMessageStore) TimerMessageStore.this.messageStore).getBrokerConfig().isInBrokerContainer()) {
brokerIdentifier = ((DefaultMessageStore) TimerMessageStore.this.messageStore).getBrokerConfig().getIdentifier();
Expand All @@ -1805,33 +1834,55 @@ private String format(long time) {
@Override
public void run() {
TimerMessageStore.LOGGER.info(this.getServiceName() + " service start");
long start = System.currentTimeMillis();
while (!this.isStopped()) {
try {
prepareTimerCheckPoint();
timerLog.getMappedFileQueue().flush(0);
timerWheel.flush();
timerCheckpoint.flush();
if (System.currentTimeMillis() - start > storeConfig.getTimerProgressLogIntervalMs()) {
start = System.currentTimeMillis();
long tmpQueueOffset = currQueueOffset;
ConsumeQueueInterface cq = messageStore.getConsumeQueue(TIMER_TOPIC, 0);
long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue();
TimerMessageStore.LOGGER.info("[{}]Timer progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{} currReadOffset:{} offsetBehind:{} behindMaster:{} " +
"enqPutQueue:{} deqGetQueue:{} deqPutQueue:{} allCongestNum:{} enqExpiredStoreTime:{}",
storeConfig.getBrokerRole(),
format(commitReadTimeMs), format(currReadTimeMs), format(currWriteTimeMs), getDequeueBehind(),
tmpQueueOffset, maxOffsetInQueue - tmpQueueOffset, timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset,
enqueuePutQueue.size(), dequeueGetQueue.size(), dequeuePutQueue.size(), getAllCongestNum(), format(lastEnqueueButExpiredStoreTime));
}
timerMetrics.persist();
waitForRunning(storeConfig.getTimerFlushIntervalMs());
this.flush();
} catch (Throwable e) {
TimerMessageStore.LOGGER.error("Error occurred in " + getServiceName(), e);
}
try {
waitForRunning(storeConfig.getTimerFlushIntervalMs());
} catch (Throwable e) {
// ignore interrupt
}
}
TimerMessageStore.LOGGER.info(this.getServiceName() + " service end");
}

long start = System.currentTimeMillis();
long lastSnapshotTime = System.currentTimeMillis();

public void flush() throws IOException {
if (storeConfig.isTimerWheelSnapshotFlush()) {
synchronized (lockWhenFlush) {
prepareTimerCheckPoint();
timerLog.getMappedFileQueue().flush(0);
if (System.currentTimeMillis() - lastSnapshotTime > storeConfig.getTimerWheelSnapshotIntervalMs()) {
lastSnapshotTime = System.currentTimeMillis();
timerWheel.backup(timerLog.getMappedFileQueue().getFlushedWhere());
}
timerCheckpoint.flush();
}
} else {
prepareTimerCheckPoint();
timerLog.getMappedFileQueue().flush(0);
timerWheel.flush();
timerCheckpoint.flush();
}
if (System.currentTimeMillis() - start > storeConfig.getTimerProgressLogIntervalMs()) {
start = System.currentTimeMillis();
long tmpQueueOffset = currQueueOffset;
ConsumeQueueInterface cq = messageStore.getConsumeQueue(TIMER_TOPIC, 0);
long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue();
TimerMessageStore.LOGGER.info("[{}]Timer progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{} currReadOffset:{} offsetBehind:{} behindMaster:{} " +
"enqPutQueue:{} deqGetQueue:{} deqPutQueue:{} allCongestNum:{} enqExpiredStoreTime:{}",
storeConfig.getBrokerRole(),
format(commitReadTimeMs), format(currReadTimeMs), format(currWriteTimeMs), getDequeueBehind(),
tmpQueueOffset, maxOffsetInQueue - tmpQueueOffset, timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset,
enqueuePutQueue.size(), dequeueGetQueue.size(), dequeuePutQueue.size(), getAllCongestNum(), format(lastEnqueueButExpiredStoreTime));
}
timerMetrics.persist();
}
}

public long getAllCongestNum() {
Expand Down Expand Up @@ -2023,4 +2074,8 @@ public TimerCheckpoint getTimerCheckpoint() {
public static String buildDeleteKey(String realTopic, String uniqueKey) {
return realTopic + "+" + uniqueKey;
}

public TimerFlushService getTimerFlushService() {
return timerFlushService;
}
}
Loading
Loading