Skip to content

Commit f9f0574

Browse files
authored
Fix frozen IoTConsensus sync lag (apache#15023)
* Fix frozen IoTConsensus sync lag * spotless
1 parent d412a20 commit f9f0574

File tree

3 files changed

+29
-2
lines changed

3 files changed

+29
-2
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.storageengine.dataregion.wal.buffer;
2121

2222
import java.util.concurrent.TimeUnit;
23+
import java.util.function.Predicate;
2324

2425
/**
2526
* This class serializes and flushes {@link WALEntry}. If search is enabled, the order of search
@@ -52,6 +53,15 @@ public interface IWALBuffer extends AutoCloseable {
5253
*/
5354
void waitForFlush() throws InterruptedException;
5455

56+
/**
57+
* Wait for next flush operation done, if the predicate == true after entering a locked
58+
* environment. Otherwise, return directly.
59+
*
60+
* @param waitPredicate the condition which should be satisfied before waiting.
61+
* @throws InterruptedException when interrupted by the flush thread
62+
*/
63+
public void waitForFlush(Predicate<WALBuffer> waitPredicate) throws InterruptedException;
64+
5565
/**
5666
* Wait for next flush operation done.
5767
*

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import java.util.concurrent.locks.Condition;
6565
import java.util.concurrent.locks.Lock;
6666
import java.util.concurrent.locks.ReentrantLock;
67+
import java.util.function.Predicate;
6768

6869
import static org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode.DEFAULT_SEARCH_INDEX;
6970

@@ -666,6 +667,18 @@ public void waitForFlush() throws InterruptedException {
666667
}
667668
}
668669

670+
@Override
671+
public void waitForFlush(Predicate<WALBuffer> waitPredicate) throws InterruptedException {
672+
buffersLock.lock();
673+
try {
674+
if (waitPredicate.test(this)) {
675+
idleBufferReadyCondition.await();
676+
}
677+
} finally {
678+
buffersLock.unlock();
679+
}
680+
}
681+
669682
@Override
670683
public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException {
671684
buffersLock.lock();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -826,20 +826,24 @@ public IndexedConsensusRequest next() {
826826
@Override
827827
public void waitForNextReady() throws InterruptedException {
828828
boolean walFileRolled = false;
829+
long bufferLastSearchIndex = 0;
829830
while (!hasNext()) {
830831
if (!walFileRolled) {
831832
boolean timeout =
832833
!buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
833834
if (timeout) {
835+
bufferLastSearchIndex = buffer.getCurrentSearchIndex();
834836
logger.info(
835837
"timeout when waiting for next WAL entry ready, execute rollWALFile. Current search index in wal buffer is {}, and next target index is {}",
836-
buffer.getCurrentSearchIndex(),
838+
bufferLastSearchIndex,
837839
nextSearchIndex);
838840
rollWALFile();
839841
walFileRolled = true;
840842
}
841843
} else {
842-
buffer.waitForFlush();
844+
// only wait when the search index of the buffer remains the same as the previous check
845+
long finalBufferLastSearchIndex = bufferLastSearchIndex;
846+
buffer.waitForFlush(buf -> buf.getCurrentSearchIndex() == finalBufferLastSearchIndex);
843847
}
844848
}
845849
}

0 commit comments

Comments
 (0)