Skip to content

Commit 66cd822

Browse files
committed
RATIS-2234. Remove lock race between heartbeat and append log channels (#1205)
1 parent 00a8a1d commit 66cd822

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.ratis.util;
1919

20+
import java.util.Objects;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.concurrent.locks.Lock;
2223

@@ -45,6 +46,13 @@ public static AutoCloseableLock acquire(final Lock lock, Runnable preUnlock) {
4546
return new AutoCloseableLock(lock, preUnlock);
4647
}
4748

49+
public static AutoCloseableLock tryAcquire(final Lock lock, Runnable preUnlock, TimeDuration timeout)
50+
throws InterruptedException {
51+
Objects.requireNonNull(timeout, "timeout == null");
52+
final boolean locked = lock.tryLock(timeout.getDuration(), timeout.getUnit());
53+
return locked? new AutoCloseableLock(lock, preUnlock): null;
54+
}
55+
4856
private final Lock underlying;
4957
private final AtomicBoolean closed = new AtomicBoolean(false);
5058
private final Runnable preUnlock;

ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public boolean isOpened() {
119119

120120
@Override
121121
public boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean isLeader) {
122-
try(AutoCloseableLock writeLock = writeLock()) {
122+
try(AutoCloseableLock writeLock = tryWriteLock(TimeDuration.ONE_SECOND)) {
123123
final long oldCommittedIndex = getLastCommittedIndex();
124124
final long newCommitIndex = Math.min(majorityIndex, getFlushIndex());
125125
if (oldCommittedIndex < newCommitIndex) {
@@ -133,6 +133,9 @@ public boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean i
133133
return commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);
134134
}
135135
}
136+
} catch (InterruptedException e) {
137+
LOG.warn("{}: Interrupted to updateCommitIndex: majorityIndex={}, currentTerm={}, isLeader={}",
138+
getName(), majorityIndex, currentTerm, isLeader, e);
136139
}
137140
return false;
138141
}
@@ -375,6 +378,10 @@ public AutoCloseableLock writeLock() {
375378
return AutoCloseableLock.acquire(lock.writeLock());
376379
}
377380

381+
public AutoCloseableLock tryWriteLock(TimeDuration timeout) throws InterruptedException {
382+
return AutoCloseableLock.tryAcquire(lock.writeLock(), null, timeout);
383+
}
384+
378385
public boolean hasWriteLock() {
379386
return this.lock.isWriteLockedByCurrentThread();
380387
}

0 commit comments

Comments
 (0)