Skip to content

Commit b8082e9

Browse files
authored
IGNITE-27316 Use MessageSerializer for TcpDiscoveryHandshakeResponse (#12572)
1 parent 044f62b commit b8082e9

File tree

5 files changed

+186
-20
lines changed

5 files changed

+186
-20
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,28 @@
1717

1818
package org.apache.ignite.internal.managers.discovery;
1919

20+
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
2021
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
2122
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
2223
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
2324
import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer;
2425
import org.apache.ignite.internal.codegen.TcpDiscoveryDiscardMessageSerializer;
2526
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
27+
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
2628
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
2729
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
2830
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
2931
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
3032
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
3133
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
34+
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
3235
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
3336
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
3437
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
3538
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
3639
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage;
3740
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
41+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
3842
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
3943
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
4044
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
@@ -44,6 +48,8 @@
4448
public class DiscoveryMessageFactory implements MessageFactoryProvider {
4549
/** {@inheritDoc} */
4650
@Override public void registerAll(MessageFactory factory) {
51+
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());
52+
4753
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
4854
factory.register((short)1, TcpDiscoveryPingRequest::new, new TcpDiscoveryPingRequestSerializer());
4955
factory.register((short)2, TcpDiscoveryPingResponse::new, new TcpDiscoveryPingResponseSerializer());
@@ -54,5 +60,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
5460
factory.register((short)7, TcpDiscoveryRingLatencyCheckMessage::new, new TcpDiscoveryRingLatencyCheckMessageSerializer());
5561
factory.register((short)8, TcpDiscoveryHandshakeRequest::new, new TcpDiscoveryHandshakeRequestSerializer());
5662
factory.register((short)9, TcpDiscoveryDiscardMessage::new, new TcpDiscoveryDiscardMessageSerializer());
63+
factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer());
5764
}
5865
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,8 @@ else if (state == DISCONNECTED) {
606606

607607
T2<Boolean, T2<SocketStream, Integer>> waitAndRes = sendJoinRequests(prevAddr != null, addrs);
608608

609+
addrs.clear();
610+
609611
boolean wait = waitAndRes.get1();
610612
T2<SocketStream, Integer> res = waitAndRes.get2();
611613

@@ -670,8 +672,6 @@ private T2<Boolean, T2<SocketStream, Integer>> sendJoinRequests(
670672
}
671673
}
672674

673-
addrs.clear();
674-
675675
return new T2<>(false, null);
676676
}
677677

@@ -739,13 +739,16 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
739739

740740
TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, ackTimeout0);
741741

742-
if (res.redirectAddresses() != null) {
742+
// Convert the addresses once.
743+
Collection<InetSocketAddress> redirectAddrs = res.redirectAddresses();
744+
745+
if (redirectAddrs != null) {
743746
U.closeQuiet(sock);
744747

745748
if (log.isInfoEnabled())
746-
log.info("Reconnecting to the addresses of a proper DC [addrs=" + res.redirectAddresses() + ']');
749+
log.info("Reconnecting to the addresses of a proper DC [addrs=" + redirectAddrs + ']');
747750

748-
T2<Boolean, T2<SocketStream, Integer>> redirectedRes = sendJoinRequests(recon, res.redirectAddresses());
751+
T2<Boolean, T2<SocketStream, Integer>> redirectedRes = sendJoinRequests(recon, redirectAddrs);
749752

750753
return redirectedRes.get2();
751754
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.spi.discovery.tcp.messages;
19+
20+
import java.net.InetAddress;
21+
import java.net.UnknownHostException;
22+
import org.apache.ignite.internal.Order;
23+
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
24+
import org.apache.ignite.internal.util.typedef.internal.S;
25+
import org.apache.ignite.plugin.extensions.communication.Message;
26+
27+
/** Address utility container message. Is not a pure {@link TcpDiscoveryAbstractMessage}. */
28+
public class InetAddressMessage implements Message {
29+
/** */
30+
@Order(0)
31+
private String hostName;
32+
33+
/** */
34+
@Order(1)
35+
private int port;
36+
37+
/** */
38+
@Order(value = 2, method = "addressBytes")
39+
private byte[] addrBytes;
40+
41+
/**
42+
* Default constructor for {@link DiscoveryMessageFactory}.
43+
*/
44+
public InetAddressMessage() {
45+
// No-op.
46+
}
47+
48+
/**
49+
* Constructor.
50+
*
51+
* @param addr Address.
52+
* @param port Port.
53+
*/
54+
public InetAddressMessage(InetAddress addr, int port) {
55+
hostName = addr.getHostName();
56+
this.port = port;
57+
addrBytes = addr.getAddress();
58+
}
59+
60+
/** @return {@link InetAddress#getAddress()} */
61+
public byte[] addressBytes() {
62+
return addrBytes;
63+
}
64+
65+
/** @param addrBytes {@link InetAddress#getAddress()} */
66+
public void addressBytes(byte[] addrBytes) {
67+
this.addrBytes = addrBytes;
68+
}
69+
70+
/** @return port. */
71+
public int port() {
72+
return port;
73+
}
74+
75+
/** @param port port. */
76+
public void port(int port) {
77+
this.port = port;
78+
}
79+
80+
/** @return Host name. */
81+
public String hostName() {
82+
return hostName;
83+
}
84+
85+
/** @param hostName Host name. */
86+
public void hostName(String hostName) {
87+
this.hostName = hostName;
88+
}
89+
90+
/** @return {@link InetAddress#getByAddress(String, byte[])} */
91+
public InetAddress address() throws UnknownHostException {
92+
return addrBytes == null ? null : InetAddress.getByAddress(hostName, addrBytes);
93+
}
94+
95+
/** {@inheritDoc} */
96+
@Override public short directType() {
97+
return -100;
98+
}
99+
100+
/** {@inheritDoc} */
101+
@Override public String toString() {
102+
return S.toString(InetAddressMessage.class, this);
103+
}
104+
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable {
4545
/** */
4646
protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2;
4747

48-
/** */
49-
protected static final int CHANGE_TOPOLOGY_FLAG_POS = 3;
50-
5148
/** */
5249
protected static final int FORCE_FAIL_FLAG_POS = 4;
5350

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,42 @@
1818
package org.apache.ignite.spi.discovery.tcp.messages;
1919

2020
import java.net.InetSocketAddress;
21+
import java.net.UnknownHostException;
2122
import java.util.Collection;
2223
import java.util.UUID;
24+
import org.apache.ignite.IgniteException;
25+
import org.apache.ignite.internal.Order;
26+
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
27+
import org.apache.ignite.internal.util.typedef.F;
2328
import org.apache.ignite.internal.util.typedef.internal.S;
29+
import org.apache.ignite.plugin.extensions.communication.Message;
2430
import org.jetbrains.annotations.Nullable;
2531

2632
/**
2733
* Handshake response.
2834
*/
29-
public class TcpDiscoveryHandshakeResponse extends TcpDiscoveryAbstractMessage {
35+
public class TcpDiscoveryHandshakeResponse extends TcpDiscoveryAbstractMessage implements Message {
3036
/** */
3137
private static final long serialVersionUID = 0L;
3238

3339
/** */
40+
@Order(5)
3441
private long order;
3542

36-
/** Redirect addresses. */
37-
private Collection<InetSocketAddress> redirectAddresses;
43+
/** */
44+
@Order(value = 6, method = "previousNodeAliveFlag")
45+
private boolean prevNodeAliveFlag;
46+
47+
/** Redirect addresses messages serialization holder. */
48+
@Order(value = 7, method = "redirectAddressesMessages")
49+
private @Nullable Collection<InetAddressMessage> redirectAddrsMsgs;
50+
51+
/**
52+
* Default constructor for {@link DiscoveryMessageFactory}.
53+
*/
54+
public TcpDiscoveryHandshakeResponse() {
55+
// No-op.
56+
}
3857

3958
/**
4059
* Constructor.
@@ -55,17 +74,17 @@ public TcpDiscoveryHandshakeResponse(UUID creatorNodeId, long locNodeOrder) {
5574
* @return previous node alive flag.
5675
*/
5776
public boolean previousNodeAlive() {
58-
return getFlag(CHANGE_TOPOLOGY_FLAG_POS);
77+
return prevNodeAliveFlag;
5978
}
6079

6180
/**
6281
* Sets topology change flag.<br>
6382
* {@code True} means node has connectivity to it's previous node in a ring.
6483
*
65-
* @param prevNodeAlive previous node alive flag.
84+
* @param prevNodeAliveFlag previous node alive flag.
6685
*/
67-
public void previousNodeAlive(boolean prevNodeAlive) {
68-
setFlag(CHANGE_TOPOLOGY_FLAG_POS, prevNodeAlive);
86+
public void previousNodeAlive(boolean prevNodeAliveFlag) {
87+
this.prevNodeAliveFlag = prevNodeAliveFlag;
6988
}
7089

7190
/**
@@ -87,13 +106,49 @@ public void order(long order) {
87106
}
88107

89108
/** @return Socket addresses list for redirect. */
90-
@Nullable public Collection<InetSocketAddress> redirectAddresses() {
91-
return redirectAddresses;
109+
public @Nullable Collection<InetSocketAddress> redirectAddresses() {
110+
return F.isEmpty(redirectAddrsMsgs)
111+
? null
112+
: F.transform(redirectAddrsMsgs, msg -> {
113+
try {
114+
return new InetSocketAddress(msg.address(), msg.port());
115+
}
116+
catch (UnknownHostException e) {
117+
throw new IgniteException("Failed to read host address.", e);
118+
}
119+
});
120+
}
121+
122+
/** @param sockAddrs Socket addresses list for redirect. */
123+
public void redirectAddresses(@Nullable Collection<InetSocketAddress> sockAddrs) {
124+
redirectAddrsMsgs = sockAddrs == null
125+
? null
126+
: F.viewReadOnly(sockAddrs, addr -> new InetAddressMessage(addr.getAddress(), addr.getPort()));
92127
}
93128

94-
/** @param socketAddresses Socket addresses list for redirect. */
95-
public void redirectAddresses(Collection<InetSocketAddress> socketAddresses) {
96-
this.redirectAddresses = socketAddresses;
129+
/** @return Collection of {@link InetAddressMessage}. */
130+
public @Nullable Collection<InetAddressMessage> redirectAddressesMessages() {
131+
return redirectAddrsMsgs;
132+
}
133+
134+
/** @param redirectAddrsMsgs Collection of {@link InetAddressMessage}. */
135+
public void redirectAddressesMessages(@Nullable Collection<InetAddressMessage> redirectAddrsMsgs) {
136+
this.redirectAddrsMsgs = redirectAddrsMsgs;
137+
}
138+
139+
/** @return Previous node aliveness flag. */
140+
public boolean previousNodeAliveFlag() {
141+
return prevNodeAliveFlag;
142+
}
143+
144+
/** @param prevNodeAliveFlag Previous node aliveness flag. */
145+
public void previousNodeAliveFlag(boolean prevNodeAliveFlag) {
146+
this.prevNodeAliveFlag = prevNodeAliveFlag;
147+
}
148+
149+
/** {@inheritDoc} */
150+
@Override public short directType() {
151+
return 10;
97152
}
98153

99154
/** {@inheritDoc} */

0 commit comments

Comments
 (0)