Skip to content

Commit 56cc51c

Browse files
WIP
1 parent f6aee60 commit 56cc51c

File tree

9 files changed

+131
-211
lines changed

9 files changed

+131
-211
lines changed

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 57 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import java.util.Map;
3939
import java.util.NavigableMap;
4040
import java.util.NavigableSet;
41-
import java.util.Objects;
4241
import java.util.Queue;
4342
import java.util.TreeMap;
4443
import java.util.TreeSet;
@@ -112,8 +111,6 @@
112111
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
113112
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
114113
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
115-
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryInfoRequest;
116-
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryInfoResponse;
117114
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
118115
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
119116
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
@@ -609,52 +606,13 @@ else if (state == DISCONNECTED) {
609606
Collections.swap(addrs, idx, 0);
610607
}
611608

612-
Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
609+
T2<Boolean, T3<SocketStream, Integer, Boolean>> waitAndRes = sendJoinRequests(prevAddr != null, addrs);
613610

614-
boolean wait = false;
611+
boolean wait = waitAndRes.get1();
612+
T3<SocketStream, Integer, Boolean> res = waitAndRes.get2();
615613

616-
for (int i = addrs.size() - 1; i >= 0; i--) {
617-
if (Thread.currentThread().isInterrupted())
618-
throw new InterruptedException();
619-
620-
InetSocketAddress addr = addrs.get(i);
621-
622-
boolean recon = prevAddr != null;
623-
624-
T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
625-
626-
if (sockAndRes == null) {
627-
addrs.remove(i);
628-
629-
continue;
630-
}
631-
632-
assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes;
633-
634-
Socket sock = sockAndRes.get1().socket();
635-
636-
if (log.isDebugEnabled())
637-
log.debug("Received response to join request [addr=" + addr + ", res=" + sockAndRes.get2() + ']');
638-
639-
switch (sockAndRes.get2()) {
640-
case RES_OK:
641-
return new T2<>(sockAndRes.get1(), sockAndRes.get3());
642-
643-
case RES_CONTINUE_JOIN:
644-
case RES_WAIT:
645-
wait = true;
646-
647-
U.closeQuiet(sock);
648-
649-
break;
650-
651-
default:
652-
if (log.isDebugEnabled())
653-
log.debug("Received unexpected response to join request: " + sockAndRes.get2());
654-
655-
U.closeQuiet(sock);
656-
}
657-
}
614+
if (res != null)
615+
return new T2<>(waitAndRes.get2().get1(), waitAndRes.get2().get3());
658616

659617
if (timeout > 0 && U.millisSinceNanos(startNanos) > timeout)
660618
return null;
@@ -668,13 +626,54 @@ else if (state == DISCONNECTED) {
668626
else if (addrs.isEmpty()) {
669627
LT.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " +
670628
"every " + spi.getReconnectDelay() + " ms; change 'reconnectDelay' to configure the frequency " +
671-
"of retries): " + toOrderedList(addrs0), true);
629+
"of retries): " + toOrderedList(addrs), true);
672630

673631
sleepEx(spi.getReconnectDelay(), beforeEachSleep, afterEachSleep);
674632
}
675633
}
676634
}
677635

636+
private T2<Boolean, T3<SocketStream, Integer, Boolean>> sendJoinRequests(
637+
boolean recon,
638+
Collection<InetSocketAddress> addrs
639+
) throws InterruptedException {
640+
for (InetSocketAddress addr : addrs) {
641+
if (Thread.currentThread().isInterrupted())
642+
throw new InterruptedException();
643+
644+
T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
645+
646+
if (sockAndRes == null)
647+
continue;
648+
649+
assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes;
650+
651+
Socket sock = sockAndRes.get1().socket();
652+
653+
if (log.isDebugEnabled())
654+
log.debug("Received response to join request [addr=" + addr + ", res=" + sockAndRes.get2() + ']');
655+
656+
switch (sockAndRes.get2()) {
657+
case RES_OK:
658+
return new T2<>(false, sockAndRes);
659+
660+
case RES_CONTINUE_JOIN:
661+
case RES_WAIT:
662+
U.closeQuiet(sock);
663+
664+
return new T2<>(true, null);
665+
666+
default:
667+
if (log.isDebugEnabled())
668+
log.debug("Received unexpected response to join request: " + sockAndRes.get2());
669+
670+
U.closeQuiet(sock);
671+
}
672+
}
673+
674+
return new T2<>(false, null);
675+
}
676+
678677
/** */
679678
private static void sleepEx(long millis, Runnable before, Runnable after) throws InterruptedException {
680679
if (before != null)
@@ -695,7 +694,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
695694
* @return Socket, connect response and client acknowledge support flag.
696695
*/
697696
@Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon,
698-
InetSocketAddress addr) {
697+
InetSocketAddress addr) throws InterruptedException {
699698
assert addr != null;
700699

701700
if (log.isDebugEnabled())
@@ -730,28 +729,23 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
730729
sock = spi.openSocket(addr, timeoutHelper);
731730
out = spi.socketStream(sock);
732731

733-
openSock = true;
734-
735-
TcpDiscoveryInfoRequest infoReq = new TcpDiscoveryInfoRequest(locNodeId);
736-
737-
spi.writeToSocket(sock, out, infoReq, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
738-
739-
TcpDiscoveryInfoResponse infoRes = spi.readMessage(sock, null, ackTimeout0);
740-
741-
if (locNode.dataCenterId() != null && !Objects.equals(locNode.dataCenterId(), infoRes.node().dataCenterId()))
742-
return null;
743-
744-
sock = spi.openSocket(addr, timeoutHelper);
745-
out = spi.socketStream(sock);
746-
747732
TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
748733

749734
req.client(true);
735+
req.dcId(locNode.dataCenterId());
750736

751737
spi.writeToSocket(sock, out, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
752738

753739
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
754740

741+
if (res.redirectAddresses() != null) {
742+
T2<Boolean, T3<SocketStream, Integer, Boolean>> redirectedRes = sendJoinRequests(recon, res.redirectAddresses());
743+
744+
U.closeQuiet(sock);
745+
746+
return redirectedRes.get2();
747+
}
748+
755749
UUID rmtNodeId = res.creatorNodeId();
756750

757751
assert rmtNodeId != null;

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.NavigableMap;
4747
import java.util.NoSuchElementException;
4848
import java.util.Objects;
49+
import java.util.Optional;
4950
import java.util.Queue;
5051
import java.util.Set;
5152
import java.util.SortedMap;
@@ -145,8 +146,6 @@
145146
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
146147
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
147148
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
148-
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryInfoRequest;
149-
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryInfoResponse;
150149
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
151150
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
152151
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
@@ -6829,35 +6828,6 @@ else if (log.isDebugEnabled())
68296828

68306829
return;
68316830
}
6832-
else if (msg instanceof TcpDiscoveryInfoRequest) {
6833-
if (!spi.isNodeStopping0()) {
6834-
TcpDiscoveryInfoResponse res = new TcpDiscoveryInfoResponse(locNodeId);
6835-
6836-
if (log.isInfoEnabled()) {
6837-
log.info("Received info request from the remote node " +
6838-
"[rmtNodeId=" + msg.creatorNodeId() +
6839-
", rmtAddr=" + rmtAddr + ", rmtPort=" + sock.getPort() + "]");
6840-
}
6841-
6842-
res.node(locNode);
6843-
6844-
IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
6845-
6846-
spi.writeToSocket(sock, spi.socketStream(sock), res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
6847-
6848-
if (!(sock instanceof SSLSocket))
6849-
sock.shutdownOutput();
6850-
6851-
if (log.isInfoEnabled()) {
6852-
log.info("Finished writing info response " + "[rmtNodeId=" + msg.creatorNodeId() +
6853-
", rmtAddr=" + rmtAddr + ", rmtPort=" + sock.getPort() + "]");
6854-
}
6855-
}
6856-
else if (log.isDebugEnabled())
6857-
log.debug("Ignore info request, node is stopping.");
6858-
6859-
return;
6860-
}
68616831

68626832
// Handshake.
68636833
TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
@@ -6874,8 +6844,23 @@ else if (log.isDebugEnabled())
68746844
TcpDiscoveryHandshakeResponse res =
68756845
new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
68766846

6877-
if (req.client())
6847+
if (req.client()) {
68786848
res.clientAck(true);
6849+
6850+
if (req.dcId() != null && !Objects.equals(req.dcId(), locNode.dataCenterId())) {
6851+
Optional<TcpDiscoveryNode> dcNode = ring.serverNodes().stream()
6852+
.filter(node -> node.dataCenterId() != null && node.dataCenterId().equals(req.dcId()))
6853+
.findAny();
6854+
6855+
if (dcNode.isPresent()) {
6856+
res.redirectAddresses(dcNode.get().socketAddresses());
6857+
6858+
spi.writeToSocket(sock, spi.socketStream(sock), res, spi.getEffectiveSocketTimeout(srvSock));
6859+
6860+
return;
6861+
}
6862+
}
6863+
}
68796864
else if (req.changeTopology()) {
68806865
// Node cannot connect to it's next (for local node it's previous).
68816866
// Need to check connectivity to it.

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public class TcpDiscoveryHandshakeRequest extends TcpDiscoveryAbstractMessage {
3030
/** */
3131
private UUID prevNodeId;
3232

33+
/** */
34+
private String dcId;
35+
3336
/**
3437
* Constructor.
3538
*
@@ -69,6 +72,16 @@ public void changeTopology(UUID prevNodeId) {
6972
this.prevNodeId = prevNodeId;
7073
}
7174

75+
/** @return DataCenter id.*/
76+
public String dcId() {
77+
return dcId;
78+
}
79+
80+
/** @param dcId DataCenter id.*/
81+
public void dcId(String dcId) {
82+
this.dcId = dcId;
83+
}
84+
7285
/** {@inheritDoc} */
7386
@Override public String toString() {
7487
return S.toString(TcpDiscoveryHandshakeRequest.class, this, "super", super.toString(),

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.ignite.spi.discovery.tcp.messages;
1919

20+
import java.net.InetSocketAddress;
21+
import java.util.Collection;
2022
import java.util.UUID;
2123
import org.apache.ignite.internal.util.typedef.internal.S;
2224

@@ -30,6 +32,9 @@ public class TcpDiscoveryHandshakeResponse extends TcpDiscoveryAbstractMessage {
3032
/** */
3133
private long order;
3234

35+
/** Redirect addresses. */
36+
private Collection<InetSocketAddress> redirectAddresses;
37+
3338
/**
3439
* Constructor.
3540
*
@@ -94,6 +99,16 @@ public void clientAck(boolean clientAck) {
9499
setFlag(CLIENT_ACK_FLAG_POS, clientAck);
95100
}
96101

102+
/** @return Socket addresses list for redirect.*/
103+
public Collection<InetSocketAddress> redirectAddresses() {
104+
return redirectAddresses;
105+
}
106+
107+
/** @param socketAddresses Socket addresses list for redirect. */
108+
public void redirectAddresses(Collection<InetSocketAddress> socketAddresses) {
109+
this.redirectAddresses = socketAddresses;
110+
}
111+
97112
/** {@inheritDoc} */
98113
@Override public String toString() {
99114
return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString(),

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryInfoRequest.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

0 commit comments

Comments
 (0)