Skip to content

Commit 539e804

Browse files
authored
RATIS-2234. Remove lock race between heartbeat and append log channels (#1205)
1 parent 053683f commit 539e804

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.ratis.util;
1919

20+
import java.util.Objects;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.concurrent.locks.Lock;
2223

@@ -45,6 +46,13 @@ public static AutoCloseableLock acquire(final Lock lock, Runnable preUnlock) {
4546
return new AutoCloseableLock(lock, preUnlock);
4647
}
4748

49+
public static AutoCloseableLock tryAcquire(final Lock lock, Runnable preUnlock, TimeDuration timeout)
50+
throws InterruptedException {
51+
Objects.requireNonNull(timeout, "timeout == null");
52+
final boolean locked = lock.tryLock(timeout.getDuration(), timeout.getUnit());
53+
return locked? new AutoCloseableLock(lock, preUnlock): null;
54+
}
55+
4856
private final Lock underlying;
4957
private final AtomicBoolean closed = new AtomicBoolean(false);
5058
private final Runnable preUnlock;

ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public boolean isOpened() {
122122

123123
@Override
124124
public boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean isLeader) {
125-
try(AutoCloseableLock writeLock = writeLock()) {
125+
try(AutoCloseableLock writeLock = tryWriteLock(TimeDuration.ONE_SECOND)) {
126126
final long oldCommittedIndex = getLastCommittedIndex();
127127
final long newCommitIndex = Math.min(majorityIndex, getFlushIndex());
128128
if (oldCommittedIndex < newCommitIndex) {
@@ -136,6 +136,9 @@ public boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean i
136136
return commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);
137137
}
138138
}
139+
} catch (InterruptedException e) {
140+
LOG.warn("{}: Interrupted to updateCommitIndex: majorityIndex={}, currentTerm={}, isLeader={}",
141+
getName(), majorityIndex, currentTerm, isLeader, e);
139142
}
140143
return false;
141144
}
@@ -389,6 +392,10 @@ public AutoCloseableLock writeLock() {
389392
return AutoCloseableLock.acquire(lock.writeLock());
390393
}
391394

395+
public AutoCloseableLock tryWriteLock(TimeDuration timeout) throws InterruptedException {
396+
return AutoCloseableLock.tryAcquire(lock.writeLock(), null, timeout);
397+
}
398+
392399
public boolean hasWriteLock() {
393400
return this.lock.isWriteLockedByCurrentThread();
394401
}

0 commit comments

Comments
 (0)