Skip to content

Commit 86d4313

Browse files
IGNITE-26582 Thick client optimizations for MultiDC (#12504)
1 parent a9a1fec commit 86d4313

File tree

9 files changed

+281
-91
lines changed

9 files changed

+281
-91
lines changed

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 74 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
7777
import org.apache.ignite.internal.util.typedef.F;
7878
import org.apache.ignite.internal.util.typedef.T2;
79-
import org.apache.ignite.internal.util.typedef.T3;
8079
import org.apache.ignite.internal.util.typedef.X;
8180
import org.apache.ignite.internal.util.typedef.internal.LT;
8281
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -559,7 +558,7 @@ else if (state == DISCONNECTED) {
559558
* @throws IgniteSpiException If failed.
560559
* @see TcpDiscoverySpi#joinTimeout
561560
*/
562-
@Nullable private T2<SocketStream, Boolean> joinTopology(
561+
@Nullable private SocketStream joinTopology(
563562
InetSocketAddress prevAddr,
564563
long timeout,
565564
@Nullable Runnable beforeEachSleep,
@@ -605,50 +604,13 @@ else if (state == DISCONNECTED) {
605604

606605
Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
607606

608-
boolean wait = false;
607+
T2<Boolean, T2<SocketStream, Integer>> waitAndRes = sendJoinRequests(prevAddr != null, addrs);
609608

610-
for (int i = addrs.size() - 1; i >= 0; i--) {
611-
if (Thread.currentThread().isInterrupted())
612-
throw new InterruptedException();
609+
boolean wait = waitAndRes.get1();
610+
T2<SocketStream, Integer> res = waitAndRes.get2();
613611

614-
InetSocketAddress addr = addrs.get(i);
615-
616-
boolean recon = prevAddr != null;
617-
618-
T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
619-
620-
if (sockAndRes == null) {
621-
addrs.remove(i);
622-
623-
continue;
624-
}
625-
626-
assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes;
627-
628-
Socket sock = sockAndRes.get1().socket();
629-
630-
if (log.isDebugEnabled())
631-
log.debug("Received response to join request [addr=" + addr + ", res=" + sockAndRes.get2() + ']');
632-
633-
switch (sockAndRes.get2()) {
634-
case RES_OK:
635-
return new T2<>(sockAndRes.get1(), sockAndRes.get3());
636-
637-
case RES_CONTINUE_JOIN:
638-
case RES_WAIT:
639-
wait = true;
640-
641-
U.closeQuiet(sock);
642-
643-
break;
644-
645-
default:
646-
if (log.isDebugEnabled())
647-
log.debug("Received unexpected response to join request: " + sockAndRes.get2());
648-
649-
U.closeQuiet(sock);
650-
}
651-
}
612+
if (res != null)
613+
return res.get1();
652614

653615
if (timeout > 0 && U.millisSinceNanos(startNanos) > timeout)
654616
return null;
@@ -669,6 +631,50 @@ else if (addrs.isEmpty()) {
669631
}
670632
}
671633

634+
/** */
635+
private T2<Boolean, T2<SocketStream, Integer>> sendJoinRequests(
636+
boolean recon,
637+
Collection<InetSocketAddress> addrs
638+
) throws InterruptedException {
639+
for (InetSocketAddress addr : addrs) {
640+
if (Thread.currentThread().isInterrupted())
641+
throw new InterruptedException();
642+
643+
T2<SocketStream, Integer> sockAndRes = sendJoinRequest(recon, addr);
644+
645+
if (sockAndRes == null)
646+
continue;
647+
648+
assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes;
649+
650+
Socket sock = sockAndRes.get1().socket();
651+
652+
if (log.isDebugEnabled())
653+
log.debug("Received response to join request [addr=" + addr + ", res=" + sockAndRes.get2() + ']');
654+
655+
switch (sockAndRes.get2()) {
656+
case RES_OK:
657+
return new T2<>(false, sockAndRes);
658+
659+
case RES_CONTINUE_JOIN:
660+
case RES_WAIT:
661+
U.closeQuiet(sock);
662+
663+
return new T2<>(true, null);
664+
665+
default:
666+
if (log.isDebugEnabled())
667+
log.debug("Received unexpected response to join request: " + sockAndRes.get2());
668+
669+
U.closeQuiet(sock);
670+
}
671+
}
672+
673+
addrs.clear();
674+
675+
return new T2<>(false, null);
676+
}
677+
672678
/** */
673679
private static void sleepEx(long millis, Runnable before, Runnable after) throws InterruptedException {
674680
if (before != null)
@@ -688,8 +694,8 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
688694
* @param addr Address.
689695
* @return Socket, connect response and client acknowledge support flag.
690696
*/
691-
@Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon,
692-
InetSocketAddress addr) {
697+
@Nullable private T2<SocketStream, Integer> sendJoinRequest(boolean recon,
698+
InetSocketAddress addr) throws InterruptedException {
693699
assert addr != null;
694700

695701
if (log.isDebugEnabled())
@@ -724,16 +730,26 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
724730

725731
TcpDiscoveryIoSession ses = createSession(sock);
726732

727-
openSock = true;
728-
729733
TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
730734

731735
req.client(true);
736+
req.dcId(locNode.dataCenterId());
732737

733738
spi.writeMessage(ses, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
734739

735740
TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, ackTimeout0);
736741

742+
if (res.redirectAddresses() != null) {
743+
U.closeQuiet(sock);
744+
745+
if (log.isInfoEnabled())
746+
log.info("Reconnecting to the addresses of a proper DC [addrs=" + res.redirectAddresses() + ']');
747+
748+
T2<Boolean, T2<SocketStream, Integer>> redirectedRes = sendJoinRequests(recon, res.redirectAddresses());
749+
750+
return redirectedRes.get2();
751+
}
752+
737753
UUID rmtNodeId = res.creatorNodeId();
738754

739755
assert rmtNodeId != null;
@@ -793,9 +809,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
793809
log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
794810
", rmtNodeId=" + rmtNodeId + ']');
795811

796-
return new T3<>(new SocketStream(sock),
797-
spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
798-
res.clientAck());
812+
return new T2<>(new SocketStream(sock), spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)));
799813
}
800814
catch (IOException | IgniteCheckedException e) {
801815
U.closeQuiet(sock);
@@ -1266,9 +1280,6 @@ private class SocketWriter extends IgniteSpiThread {
12661280
/** */
12671281
private TcpDiscoveryIoSession ses;
12681282

1269-
/** */
1270-
private boolean clientAck;
1271-
12721283
/** */
12731284
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
12741285

@@ -1327,16 +1338,13 @@ private void forceLeave() throws InterruptedException {
13271338

13281339
/**
13291340
* @param sock Socket.
1330-
* @param clientAck {@code True} is server supports client message acknowlede.
13311341
*/
1332-
private void setSocket(Socket sock, boolean clientAck) {
1342+
private void setSocket(Socket sock) {
13331343
synchronized (mux) {
13341344
this.sock = sock;
13351345

13361346
ses = createSession(sock);
13371347

1338-
this.clientAck = clientAck;
1339-
13401348
unackedMsg = null;
13411349

13421350
mux.notifyAll();
@@ -1421,7 +1429,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
14211429
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
14221430
msgLsnr.apply(msg);
14231431

1424-
boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse);
1432+
boolean ack = !(msg instanceof TcpDiscoveryPingResponse);
14251433

14261434
try {
14271435
if (ack) {
@@ -1529,9 +1537,6 @@ private class Reconnector extends IgniteSpiThread {
15291537
/** */
15301538
private volatile SocketStream sockStream;
15311539

1532-
/** */
1533-
private boolean clientAck;
1534-
15351540
/** */
15361541
private final boolean join;
15371542

@@ -1576,7 +1581,7 @@ public void cancel() {
15761581

15771582
try {
15781583
while (true) {
1579-
T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout, null, null);
1584+
SocketStream joinRes = joinTopology(prevAddr, timeout, null, null);
15801585

15811586
if (joinRes == null) {
15821587
if (join) {
@@ -1591,8 +1596,7 @@ public void cancel() {
15911596
return;
15921597
}
15931598

1594-
sockStream = joinRes.get1();
1595-
clientAck = joinRes.get2();
1599+
sockStream = joinRes;
15961600

15971601
Socket sock = sockStream.socket();
15981602
TcpDiscoveryIoSession ses = createSession(sock);
@@ -2087,7 +2091,7 @@ private void tryJoin() throws InterruptedException {
20872091

20882092
joinCnt++;
20892093

2090-
T2<SocketStream, Boolean> joinRes;
2094+
SocketStream joinRes;
20912095

20922096
try {
20932097
joinRes = joinTopology(null, spi.joinTimeout,
@@ -2120,9 +2124,9 @@ private void tryJoin() throws InterruptedException {
21202124
return;
21212125
}
21222126

2123-
currSock = joinRes.get1();
2127+
currSock = joinRes;
21242128

2125-
sockWriter.setSocket(joinRes.get1().socket(), joinRes.get2());
2129+
sockWriter.setSocket(joinRes.socket());
21262130

21272131
if (spi.joinTimeout > 0) {
21282132
final int joinCnt0 = joinCnt;
@@ -2132,7 +2136,7 @@ private void tryJoin() throws InterruptedException {
21322136
}, spi.joinTimeout, MILLISECONDS);
21332137
}
21342138

2135-
sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId());
2139+
sockReader.setSocket(joinRes, locNode.clientRouterNodeId());
21362140
}
21372141

21382142
/**
@@ -2532,7 +2536,7 @@ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage ms
25322536

25332537
currSock = reconnector.sockStream;
25342538

2535-
sockWriter.setSocket(currSock.socket(), reconnector.clientAck);
2539+
sockWriter.setSocket(currSock.socket());
25362540
sockReader.setSocket(currSock, locNode.clientRouterNodeId());
25372541

25382542
reconnector = null;

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@
159159
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
160160
import org.jetbrains.annotations.Nullable;
161161

162+
import static java.util.stream.Collectors.collectingAndThen;
163+
import static java.util.stream.Collectors.toList;
162164
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
163165
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
164166
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NODE_IDS_HISTORY_SIZE;
@@ -6833,8 +6835,35 @@ else if (log.isDebugEnabled())
68336835
TcpDiscoveryHandshakeResponse res =
68346836
new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
68356837

6836-
if (req.client())
6837-
res.clientAck(true);
6838+
if (req.client()) {
6839+
if (req.dcId() != null && !Objects.equals(req.dcId(), locNode.dataCenterId())) {
6840+
List<TcpDiscoveryNode> dcNodes = ring.serverNodes().stream()
6841+
.filter(TcpDiscoveryNode::visible)
6842+
.filter(node -> node.dataCenterId() != null && node.dataCenterId().equals(req.dcId()))
6843+
.collect(
6844+
collectingAndThen(
6845+
toList(),
6846+
l -> {
6847+
Collections.shuffle(l);
6848+
return l;
6849+
}
6850+
));
6851+
6852+
if (!dcNodes.isEmpty()) {
6853+
Collection<InetSocketAddress> addrs = new ArrayList<>(dcNodes.size());
6854+
6855+
for (TcpDiscoveryNode dcNode : dcNodes) {
6856+
addrs.addAll(dcNode.socketAddresses());
6857+
}
6858+
6859+
res.redirectAddresses(addrs);
6860+
6861+
spi.writeMessage(ses, res, spi.getEffectiveSocketTimeout(srvSock));
6862+
6863+
return;
6864+
}
6865+
}
6866+
}
68386867
else if (req.changeTopology()) {
68396868
// Node cannot connect to it's next (for local node it's previous).
68406869
// Need to check connectivity to it.

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
5050
protected static final int CHANGE_TOPOLOGY_FLAG_POS = 3;
5151

5252
/** */
53-
protected static final int CLIENT_ACK_FLAG_POS = 4;
54-
55-
/** */
56-
protected static final int FORCE_FAIL_FLAG_POS = 8;
53+
protected static final int FORCE_FAIL_FLAG_POS = 4;
5754

5855
/** Sender of the message (transient). */
5956
private transient UUID sndNodeId;

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.UUID;
2121
import org.apache.ignite.internal.util.typedef.internal.S;
22+
import org.jetbrains.annotations.Nullable;
2223

2324
/**
2425
* Handshake request.
@@ -30,6 +31,9 @@ public class TcpDiscoveryHandshakeRequest extends TcpDiscoveryAbstractMessage {
3031
/** */
3132
private UUID prevNodeId;
3233

34+
/** */
35+
private String dcId;
36+
3337
/**
3438
* Constructor.
3539
*
@@ -69,6 +73,16 @@ public void changeTopology(UUID prevNodeId) {
6973
this.prevNodeId = prevNodeId;
7074
}
7175

76+
/** @return DataCenter id. */
77+
@Nullable public String dcId() {
78+
return dcId;
79+
}
80+
81+
/** @param dcId DataCenter id. */
82+
public void dcId(String dcId) {
83+
this.dcId = dcId;
84+
}
85+
7286
/** {@inheritDoc} */
7387
@Override public String toString() {
7488
return S.toString(TcpDiscoveryHandshakeRequest.class, this, "super", super.toString(),

0 commit comments

Comments
 (0)