Skip to content

Commit 6e44048

Browse files
author
刘立家
committed
[Fix](bdb) Fix feeder handshake error
1 parent 00dd1a3 commit 6e44048

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

src/main/java/com/sleepycat/je/rep/impl/node/Feeder.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.lang.Thread.UncaughtExceptionHandler;
1818
import java.nio.ByteBuffer;
1919
import java.nio.channels.Channel;
20+
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.logging.Level;
2223
import java.util.logging.Logger;
@@ -1753,4 +1754,36 @@ public void makeSecurityCheckResponse(String err)
17531754

17541755
throw new ReplicationSecurityException(err, replica, null);
17551756
}
1757+
1758+
/**
1759+
* Refer to https://github.com/StarRocks/starrocks-bdb-je/pull/17
1760+
* Check whether the channel is available by monitoring the change of lastHeartbeatTime,
1761+
* the loop will break when:
1762+
* 1. feeder is shutdown
1763+
* 2. channel is not open
1764+
* 3. lastHeartbeatTime changed
1765+
* 4. FEEDER_TIMEOUT reached
1766+
*/
1767+
public boolean isChannelAvailable() {
1768+
long baseTime = this.lastHeartbeatTime;
1769+
long startNs = System.nanoTime();
1770+
long timeoutNs = repNode.getConfigManager().getDuration(RepParams.FEEDER_TIMEOUT) * 1000000L;
1771+
while (System.nanoTime() - startNs < timeoutNs) {
1772+
if (shutdown.get() || !feederReplicaChannel.isOpen()) {
1773+
return false;
1774+
}
1775+
1776+
if (baseTime < lastHeartbeatTime) {
1777+
return true;
1778+
}
1779+
1780+
try {
1781+
TimeUnit.MILLISECONDS.sleep(500);
1782+
} catch (InterruptedException ie) {
1783+
Thread.currentThread().interrupt();
1784+
}
1785+
}
1786+
1787+
return false;
1788+
}
17561789
}

src/main/java/com/sleepycat/je/rep/stream/FeederReplicaHandshake.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ private Protocol negotiateProtocol()
611611
* issue a shutdown to the feeder explicitly.
612612
*/
613613
if (dup != null && dup.getChannel() != null &&
614-
!dup.getChannel().isOpen() && !dup.isShutdown()) {
614+
!dup.isChannelAvailable()) {
615615
dup.shutdown(new IOException("Feeder's channel for node " +
616616
replicaNameIdPair +
617617
" is already closed"));

0 commit comments

Comments
 (0)