Skip to content

Commit 95a3850

Browse files
author
刘立家
committed
[Fix](bdb) Fix feeder handshake error
1 parent 00dd1a3 commit 95a3850

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

src/main/java/com/sleepycat/je/rep/impl/node/Feeder.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.lang.Thread.UncaughtExceptionHandler;
1818
import java.nio.ByteBuffer;
1919
import java.nio.channels.Channel;
20+
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.logging.Level;
2223
import java.util.logging.Logger;
@@ -1753,4 +1754,35 @@ public void makeSecurityCheckResponse(String err)
17531754

17541755
throw new ReplicationSecurityException(err, replica, null);
17551756
}
1757+
1758+
/**
1759+
* Check whether the channel is available by monitoring the change of lastHeartbeatTime,
1760+
* the loop will break when:
1761+
* 1. feeder is shutdown
1762+
* 2. channel is not open
1763+
* 3. lastHeartbeatTime changed
1764+
* 4. FEEDER_TIMEOUT reached
1765+
*/
1766+
public boolean isChannelAvailable() {
1767+
long baseTime = this.lastHeartbeatTime;
1768+
long startNs = System.nanoTime();
1769+
long timeoutNs = repNode.getConfigManager().getDuration(RepParams.FEEDER_TIMEOUT) * 1000000L;
1770+
while (System.nanoTime() - startNs < timeoutNs) {
1771+
if (shutdown.get() || !feederReplicaChannel.isOpen()) {
1772+
return false;
1773+
}
1774+
1775+
if (baseTime < lastHeartbeatTime) {
1776+
return true;
1777+
}
1778+
1779+
try {
1780+
TimeUnit.MILLISECONDS.sleep(500);
1781+
} catch (InterruptedException ie) {
1782+
Thread.currentThread().interrupt();
1783+
}
1784+
}
1785+
1786+
return false;
1787+
}
17561788
}

src/main/java/com/sleepycat/je/rep/stream/FeederReplicaHandshake.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ private Protocol negotiateProtocol()
611611
* issue a shutdown to the feeder explicitly.
612612
*/
613613
if (dup != null && dup.getChannel() != null &&
614-
!dup.getChannel().isOpen() && !dup.isShutdown()) {
614+
!dup.isChannelAvailable()) {
615615
dup.shutdown(new IOException("Feeder's channel for node " +
616616
replicaNameIdPair +
617617
" is already closed"));

0 commit comments

Comments
 (0)