Skip to content

Commit 811f1cb

Browse files
LucasMLKclaude
andcommitted
fix(channel): Add deterministic duplicate connection tie-breaking
When two nodes connect to each other simultaneously, both detect "duplicate connection" and previously used first-come-first-served which could result in both closing each other's connections. This fix implements deterministic tie-breaking based on NodeId: - Node with smaller NodeId prefers OUTBOUND connections - Node with larger NodeId prefers INBOUND connections - Both sides make the SAME decision, keeping the SAME connection Changes: - Add localNodeId field to cache local node identifier - Initialize localNodeId in start() from nodeKey - Implement deterministic tie-breaking in onChannelActive() - Add 9 unit tests covering all scenarios 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 8daa404 commit 811f1cb

File tree

2 files changed

+574
-10
lines changed

2 files changed

+574
-10
lines changed

src/main/java/io/xdag/p2p/channel/ChannelManager.java

Lines changed: 94 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import io.netty.channel.ChannelFuture;
2929
import io.netty.channel.ChannelHandlerContext;
3030
import io.netty.channel.ChannelFutureListener;
31+
import io.xdag.crypto.encoding.Base58;
32+
import io.xdag.crypto.keys.AddressUtils;
3133
import io.xdag.p2p.PeerClient;
3234
import io.xdag.p2p.config.P2pConfig;
3335
import io.xdag.p2p.discover.Node;
@@ -75,6 +77,9 @@ public class ChannelManager {
7577
private final AtomicInteger activePeersCount = new AtomicInteger(0);
7678
private final AtomicInteger connectingPeersCount = new AtomicInteger(0);
7779

80+
// Cached local NodeId for deterministic duplicate connection resolution
81+
private volatile String localNodeId;
82+
7883
private final ScheduledExecutorService poolLoopExecutor =
7984
Executors.newSingleThreadScheduledExecutor(BasicThreadFactory.builder().namingPattern("p2p-pool-%d").build());
8085
private final ScheduledExecutorService disconnectExecutor =
@@ -88,6 +93,15 @@ public ChannelManager(P2pConfig config, NodeManager nodeManager) {
8893

8994
public void start(PeerClient peerClient) {
9095
this.peerClient = peerClient;
96+
97+
// Initialize local NodeId for deterministic duplicate connection resolution
98+
if (config.getNodeKey() != null) {
99+
this.localNodeId = Base58.encodeCheck(AddressUtils.toBytesAddress(config.getNodeKey().getPublicKey()));
100+
log.info("ChannelManager started with localNodeId: {}", localNodeId);
101+
} else {
102+
log.warn("No nodeKey configured - duplicate connection resolution may not work correctly");
103+
}
104+
91105
poolLoopExecutor.scheduleWithFixedDelay(this::connectLoop, 3, 5, TimeUnit.SECONDS);
92106

93107
if (config.isDisconnectionPolicyEnable()) {
@@ -412,14 +426,14 @@ private void connectLoop() {
412426

413427
/**
414428
* Check if we already have an active connection to the target address.
415-
*
429+
* <p>
416430
* This method performs a comprehensive check to prevent duplicate connection attempts:
417431
* 1. Checks if there's an exact address match in channels Map
418432
* 2. Checks all connectedNodeIds to see if any Channel is connected to the same IP:Port
419433
* 3. For local testing (loopback addresses), checks if we have any active connection to the same IP,
420434
* since inbound connections use different ports than the target listening port
421435
* 4. Verifies that the Channel's underlying Netty channel is actually active
422-
*
436+
* <p>
423437
* This is particularly useful when both nodes have each other in their whitelist,
424438
* preventing unnecessary reconnection attempts after recentConnections cache expires.
425439
*
@@ -505,15 +519,15 @@ public ChannelFuture connectAsync(Node node, boolean isDiscovery) {
505519

506520
// CRITICAL: Check if we already have an active connection to this address
507521
// This prevents wasting resources on duplicate handshakes
508-
if (address != null && hasActiveConnectionTo(address)) {
522+
if (hasActiveConnectionTo(address)) {
509523
log.debug("Skipped connection to {} - already have active connection", address);
510524
return null; // Don't even attempt the connection
511525
}
512526

513527
if (address != null) {
514528
recentConnections.put(address, System.currentTimeMillis());
515529
}
516-
return peerClient.connect(node, (ChannelFutureListener) future -> {
530+
return peerClient.connect(node, future -> {
517531
if (!future.isSuccess()) {
518532
log.warn("Connect to peer {} fail, cause:{}", node.getPreferInetSocketAddress(),
519533
future.cause() != null ? future.cause().getMessage() : "unknown");
@@ -547,12 +561,82 @@ public void onChannelActive(Channel channel) {
547561

548562
// Check if existing channel is actually active
549563
if (isNettyChannelActive(existingChannel)) {
550-
// Existing channel is active, reject the new one
551-
log.warn("Duplicate connection to NodeId {}. Existing: {}, New: {}. Closing new connection.",
552-
nodeId, existingChannel.getRemoteAddress(), channel.getRemoteAddress());
553-
shouldClose[0] = true;
554-
result[0] = existingChannel;
555-
return existingChannel; // Keep existing
564+
// DUPLICATE DETECTION: Both channels are active
565+
// Use deterministic tie-breaking to ensure both sides make the same decision
566+
//
567+
// Algorithm:
568+
// - Compare local NodeId with remote NodeId
569+
// - Node with smaller NodeId prefers OUTBOUND (isActive=true) connections
570+
// - Node with larger NodeId prefers INBOUND (isActive=false) connections
571+
//
572+
// Example (Node1=ABC, Node2=XYZ, ABC < XYZ):
573+
// - Node1 sees duplicate: localId(ABC) < remoteId(XYZ) → prefers outbound → keeps its outbound
574+
// - Node2 sees duplicate: localId(XYZ) > remoteId(ABC) → prefers inbound → keeps Node1's outbound
575+
// - Both nodes keep the SAME connection!
576+
577+
Channel channelToKeep;
578+
Channel channelToClose;
579+
580+
if (localNodeId != null) {
581+
boolean preferOutbound = localNodeId.compareTo(nodeId) < 0;
582+
boolean newIsOutbound = channel.isActive();
583+
boolean existingIsOutbound = existingChannel.isActive();
584+
585+
log.info("Duplicate connection to NodeId {}. Deterministic resolution: localId={}, remoteId={}, preferOutbound={}",
586+
nodeId, localNodeId.substring(0, 8) + "...", nodeId.substring(0, 8) + "...", preferOutbound);
587+
log.info(" Existing: {} (outbound={}), New: {} (outbound={})",
588+
existingChannel.getRemoteAddress(), existingIsOutbound,
589+
channel.getRemoteAddress(), newIsOutbound);
590+
591+
if (preferOutbound) {
592+
// Local node has smaller ID → prefer outbound connection
593+
if (newIsOutbound && !existingIsOutbound) {
594+
// New is outbound, existing is inbound → keep new
595+
channelToKeep = channel;
596+
channelToClose = existingChannel;
597+
} else if (!newIsOutbound && existingIsOutbound) {
598+
// New is inbound, existing is outbound → keep existing
599+
channelToKeep = existingChannel;
600+
channelToClose = channel;
601+
} else {
602+
// Both same direction → keep existing (first-come-first-served for same direction)
603+
channelToKeep = existingChannel;
604+
channelToClose = channel;
605+
}
606+
} else {
607+
// Local node has larger ID → prefer inbound connection
608+
if (!newIsOutbound && existingIsOutbound) {
609+
// New is inbound, existing is outbound → keep new
610+
channelToKeep = channel;
611+
channelToClose = existingChannel;
612+
} else if (newIsOutbound && !existingIsOutbound) {
613+
// New is outbound, existing is inbound → keep existing
614+
channelToKeep = existingChannel;
615+
channelToClose = channel;
616+
} else {
617+
// Both same direction → keep existing (first-come-first-served for same direction)
618+
channelToKeep = existingChannel;
619+
channelToClose = channel;
620+
}
621+
}
622+
} else {
623+
// No local NodeId configured, fall back to first-come-first-served
624+
log.warn("No localNodeId configured, using first-come-first-served for duplicate resolution");
625+
channelToKeep = existingChannel;
626+
channelToClose = channel;
627+
}
628+
629+
if (channelToClose == channel) {
630+
log.info(" → Closing NEW connection, keeping existing");
631+
shouldClose[0] = true;
632+
result[0] = existingChannel;
633+
return existingChannel;
634+
} else {
635+
log.info(" → Closing EXISTING connection, keeping new");
636+
cleanupStaleChannel(existingChannel);
637+
result[0] = channel;
638+
return channel;
639+
}
556640
} else {
557641
// Existing channel is stale, replace it
558642
log.info("Replacing stale connection for NodeId {}. Old: {}, New: {}",

0 commit comments

Comments
 (0)