Skip to content

Commit 018268e

Browse files
WIP
1 parent ff01a9e commit 018268e

File tree

4 files changed

+19
-51
lines changed

4 files changed

+19
-51
lines changed

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
7777
import org.apache.ignite.internal.util.typedef.F;
7878
import org.apache.ignite.internal.util.typedef.T2;
79-
import org.apache.ignite.internal.util.typedef.T3;
8079
import org.apache.ignite.internal.util.typedef.X;
8180
import org.apache.ignite.internal.util.typedef.internal.LT;
8281
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -559,7 +558,7 @@ else if (state == DISCONNECTED) {
559558
* @throws IgniteSpiException If failed.
560559
* @see TcpDiscoverySpi#joinTimeout
561560
*/
562-
@Nullable private T2<SocketStream, Boolean> joinTopology(
561+
@Nullable private SocketStream joinTopology(
563562
InetSocketAddress prevAddr,
564563
long timeout,
565564
@Nullable Runnable beforeEachSleep,
@@ -605,13 +604,13 @@ else if (state == DISCONNECTED) {
605604

606605
Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
607606

608-
T2<Boolean, T3<SocketStream, Integer, Boolean>> waitAndRes = sendJoinRequests(prevAddr != null, addrs);
607+
T2<Boolean, T2<SocketStream, Integer>> waitAndRes = sendJoinRequests(prevAddr != null, addrs);
609608

610609
boolean wait = waitAndRes.get1();
611-
T3<SocketStream, Integer, Boolean> res = waitAndRes.get2();
610+
T2<SocketStream, Integer> res = waitAndRes.get2();
612611

613612
if (res != null)
614-
return new T2<>(waitAndRes.get2().get1(), waitAndRes.get2().get3());
613+
return waitAndRes.get2().get1();
615614

616615
if (timeout > 0 && U.millisSinceNanos(startNanos) > timeout)
617616
return null;
@@ -633,15 +632,15 @@ else if (addrs.isEmpty()) {
633632
}
634633

635634
/** */
636-
private T2<Boolean, T3<SocketStream, Integer, Boolean>> sendJoinRequests(
635+
private T2<Boolean, T2<SocketStream, Integer>> sendJoinRequests(
637636
boolean recon,
638637
Collection<InetSocketAddress> addrs
639638
) throws InterruptedException {
640639
for (InetSocketAddress addr : addrs) {
641640
if (Thread.currentThread().isInterrupted())
642641
throw new InterruptedException();
643642

644-
T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
643+
T2<SocketStream, Integer> sockAndRes = sendJoinRequest(recon, addr);
645644

646645
if (sockAndRes == null)
647646
continue;
@@ -695,7 +694,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
695694
* @param addr Address.
696695
* @return Socket, connect response and client acknowledge support flag.
697696
*/
698-
@Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon,
697+
@Nullable private T2<SocketStream, Integer> sendJoinRequest(boolean recon,
699698
InetSocketAddress addr) throws InterruptedException {
700699
assert addr != null;
701700

@@ -746,7 +745,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
746745
if (log.isInfoEnabled())
747746
log.info("Reconnecting to the addresses of a proper DC [addrs=" + res.redirectAddresses() + ']');
748747

749-
T2<Boolean, T3<SocketStream, Integer, Boolean>> redirectedRes = sendJoinRequests(recon, res.redirectAddresses());
748+
T2<Boolean, T2<SocketStream, Integer>> redirectedRes = sendJoinRequests(recon, res.redirectAddresses());
750749

751750
return redirectedRes.get2();
752751
}
@@ -810,9 +809,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
810809
log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr +
811810
", rmtNodeId=" + rmtNodeId + ']');
812811

813-
return new T3<>(new SocketStream(sock),
814-
spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
815-
res.clientAck());
812+
return new T2<>(new SocketStream(sock), spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)));
816813
}
817814
catch (IOException | IgniteCheckedException e) {
818815
U.closeQuiet(sock);
@@ -1283,9 +1280,6 @@ private class SocketWriter extends IgniteSpiThread {
12831280
/** */
12841281
private TcpDiscoveryIoSession ses;
12851282

1286-
/** */
1287-
private boolean clientAck;
1288-
12891283
/** */
12901284
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
12911285

@@ -1344,16 +1338,13 @@ private void forceLeave() throws InterruptedException {
13441338

13451339
/**
13461340
* @param sock Socket.
1347-
* @param clientAck {@code True} is server supports client message acknowlede.
13481341
*/
1349-
private void setSocket(Socket sock, boolean clientAck) {
1342+
private void setSocket(Socket sock) {
13501343
synchronized (mux) {
13511344
this.sock = sock;
13521345

13531346
ses = createSession(sock);
13541347

1355-
this.clientAck = clientAck;
1356-
13571348
unackedMsg = null;
13581349

13591350
mux.notifyAll();
@@ -1438,7 +1429,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
14381429
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)
14391430
msgLsnr.apply(msg);
14401431

1441-
boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse);
1432+
boolean ack = !(msg instanceof TcpDiscoveryPingResponse);
14421433

14431434
try {
14441435
if (ack) {
@@ -1546,9 +1537,6 @@ private class Reconnector extends IgniteSpiThread {
15461537
/** */
15471538
private volatile SocketStream sockStream;
15481539

1549-
/** */
1550-
private boolean clientAck;
1551-
15521540
/** */
15531541
private final boolean join;
15541542

@@ -1593,7 +1581,7 @@ public void cancel() {
15931581

15941582
try {
15951583
while (true) {
1596-
T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout, null, null);
1584+
SocketStream joinRes = joinTopology(prevAddr, timeout, null, null);
15971585

15981586
if (joinRes == null) {
15991587
if (join) {
@@ -1608,8 +1596,7 @@ public void cancel() {
16081596
return;
16091597
}
16101598

1611-
sockStream = joinRes.get1();
1612-
clientAck = joinRes.get2();
1599+
sockStream = joinRes;
16131600

16141601
Socket sock = sockStream.socket();
16151602
TcpDiscoveryIoSession ses = createSession(sock);
@@ -2104,7 +2091,7 @@ private void tryJoin() throws InterruptedException {
21042091

21052092
joinCnt++;
21062093

2107-
T2<SocketStream, Boolean> joinRes;
2094+
SocketStream joinRes;
21082095

21092096
try {
21102097
joinRes = joinTopology(null, spi.joinTimeout,
@@ -2137,9 +2124,9 @@ private void tryJoin() throws InterruptedException {
21372124
return;
21382125
}
21392126

2140-
currSock = joinRes.get1();
2127+
currSock = joinRes;
21412128

2142-
sockWriter.setSocket(joinRes.get1().socket(), joinRes.get2());
2129+
sockWriter.setSocket(joinRes.socket());
21432130

21442131
if (spi.joinTimeout > 0) {
21452132
final int joinCnt0 = joinCnt;
@@ -2149,7 +2136,7 @@ private void tryJoin() throws InterruptedException {
21492136
}, spi.joinTimeout, MILLISECONDS);
21502137
}
21512138

2152-
sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId());
2139+
sockReader.setSocket(joinRes, locNode.clientRouterNodeId());
21532140
}
21542141

21552142
/**
@@ -2549,7 +2536,7 @@ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage ms
25492536

25502537
currSock = reconnector.sockStream;
25512538

2552-
sockWriter.setSocket(currSock.socket(), reconnector.clientAck);
2539+
sockWriter.setSocket(currSock.socket());
25532540
sockReader.setSocket(currSock, locNode.clientRouterNodeId());
25542541

25552542
reconnector = null;

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6836,8 +6836,6 @@ else if (log.isDebugEnabled())
68366836
new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder());
68376837

68386838
if (req.client()) {
6839-
res.clientAck(true);
6840-
68416839
if (req.dcId() != null && !Objects.equals(req.dcId(), locNode.dataCenterId())) {
68426840
List<TcpDiscoveryNode> dcNodes = ring.serverNodes().stream()
68436841
.filter(node -> node.dataCenterId() != null && node.dataCenterId().equals(req.dcId()))

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
5050
protected static final int CHANGE_TOPOLOGY_FLAG_POS = 3;
5151

5252
/** */
53-
protected static final int CLIENT_ACK_FLAG_POS = 4;
54-
55-
/** */
56-
protected static final int FORCE_FAIL_FLAG_POS = 8;
53+
protected static final int FORCE_FAIL_FLAG_POS = 4;
5754

5855
/** Sender of the message (transient). */
5956
private transient UUID sndNodeId;

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,20 +85,6 @@ public void order(long order) {
8585
this.order = order;
8686
}
8787

88-
/**
89-
* @return {@code True} if server supports client message acknowledge.
90-
*/
91-
public boolean clientAck() {
92-
return getFlag(CLIENT_ACK_FLAG_POS);
93-
}
94-
95-
/**
96-
* @param clientAck {@code True} if server supports client message acknowledge.
97-
*/
98-
public void clientAck(boolean clientAck) {
99-
setFlag(CLIENT_ACK_FLAG_POS, clientAck);
100-
}
101-
10288
/** @return Socket addresses list for redirect. */
10389
public Collection<InetSocketAddress> redirectAddresses() {
10490
return redirectAddresses;

0 commit comments

Comments
 (0)