Skip to content

Commit e923f92

Browse files
authored
IGNITE-27810 Use MessageSerializer for TcpDiscoveryJoinRequestMessage (#12724)
1 parent 6ab4f70 commit e923f92

File tree

4 files changed

+69
-4
lines changed

4 files changed

+69
-4
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequestSerializer;
5454
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
5555
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponseSerializer;
56+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
57+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessageSerializer;
5658
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
5759
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessageSerializer;
5860
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
@@ -110,5 +112,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
110112
factory.register((short)17, TcpDiscoveryNodeFailedMessage::new, new TcpDiscoveryNodeFailedMessageSerializer());
111113
factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer());
112114
factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer());
115+
factory.register((short)20, TcpDiscoveryJoinRequestMessage::new, new TcpDiscoveryJoinRequestMessageSerializer());
113116
}
114117
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,8 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
787787

788788
TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(node, discoveryData);
789789

790+
joinReqMsg.prepareMarshal(spi.marshaller());
791+
790792
TcpDiscoveryNode nodef = node;
791793

792794
joinReqMsg.spanContainer().span(

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1123,6 +1123,8 @@ private void joinTopology() throws IgniteSpiException {
11231123

11241124
TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(locNode, discoveryData);
11251125

1126+
joinReqMsg.prepareMarshal(spi.marshaller());
1127+
11261128
joinReqMsg.spanContainer().span(
11271129
tracing.create(TraceableMessagesTable.traceName(joinReqMsg.getClass()))
11281130
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> locNode.id().toString())
@@ -3305,6 +3307,9 @@ private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
33053307
if (msg instanceof TraceableMessage)
33063308
tracing.messages().beforeSend((TraceableMessage)msg);
33073309

3310+
if (msg instanceof TcpDiscoveryJoinRequestMessage)
3311+
((TcpDiscoveryJoinRequestMessage)msg).prepareMarshal(spi.marshaller());
3312+
33083313
sendMessageToClients(msg);
33093314

33103315
List<TcpDiscoveryNode> failedNodes;
@@ -6955,6 +6960,8 @@ else if (e.hasCause(ObjectStreamException.class) ||
69556960
else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
69566961
TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
69576962

6963+
req.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
6964+
69586965
// Current node holds connection with the node that is joining the cluster. Therefore, it can
69596966
// save certificates with which the connection was established to joining node attributes.
69606967
if (spi.nodeAuth != null && nodeId.equals(req.node().id()))

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@
1717

1818
package org.apache.ignite.spi.discovery.tcp.messages;
1919

20+
import org.apache.ignite.IgniteCheckedException;
21+
import org.apache.ignite.IgniteException;
22+
import org.apache.ignite.internal.Order;
2023
import org.apache.ignite.internal.util.typedef.internal.S;
24+
import org.apache.ignite.internal.util.typedef.internal.U;
25+
import org.apache.ignite.marshaller.Marshaller;
26+
import org.apache.ignite.plugin.extensions.communication.Message;
2127
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
2228
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
2329

@@ -27,15 +33,26 @@
2733
* Initial message sent by a node that wants to enter topology.
2834
* Sent to random node during SPI start. Then forwarded directly to coordinator.
2935
*/
30-
public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractTraceableMessage {
36+
public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractTraceableMessage implements Message {
3137
/** */
3238
private static final long serialVersionUID = 0L;
3339

3440
/** New node that wants to join the topology. */
35-
private final TcpDiscoveryNode node;
41+
private TcpDiscoveryNode node;
42+
43+
/** Serialized {@link #node}. */
44+
// TODO Remove the field after completing https://issues.apache.org/jira/browse/IGNITE-27899.
45+
@Order(6)
46+
byte[] nodeBytes;
3647

3748
/** Discovery data container. */
38-
private final DiscoveryDataPacket dataPacket;
49+
@Order(7)
50+
DiscoveryDataPacket dataPacket;
51+
52+
/** Constructor. */
53+
public TcpDiscoveryJoinRequestMessage() {
54+
// No-op.
55+
}
3956

4057
/**
4158
* Constructor.
@@ -65,7 +82,7 @@ public DiscoveryDataPacket gridDiscoveryData() {
6582
}
6683

6784
/**
68-
* @return {@code true} flag.
85+
* @return Responded flag.
6986
*/
7087
public boolean responded() {
7188
return getFlag(RESPONDED_FLAG_POS);
@@ -78,6 +95,37 @@ public void responded(boolean responded) {
7895
setFlag(RESPONDED_FLAG_POS, responded);
7996
}
8097

98+
/**
99+
* @param marsh Marshaller.
100+
*/
101+
public void prepareMarshal(Marshaller marsh) {
102+
if (node != null && nodeBytes == null) {
103+
try {
104+
nodeBytes = U.marshal(marsh, node);
105+
}
106+
catch (IgniteCheckedException e) {
107+
throw new IgniteException("Failed to marshal TcpDiscoveryNode object", e);
108+
}
109+
}
110+
}
111+
112+
/**
113+
* @param marsh Marshaller.
114+
* @param clsLdr Class loader.
115+
*/
116+
public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) {
117+
if (nodeBytes != null && node == null) {
118+
try {
119+
node = U.unmarshal(marsh, nodeBytes, clsLdr);
120+
121+
nodeBytes = null;
122+
}
123+
catch (IgniteCheckedException e) {
124+
throw new IgniteException("Failed to unmarshal TcpDiscoveryNode object", e);
125+
}
126+
}
127+
}
128+
81129
/** {@inheritDoc} */
82130
@Override public boolean equals(Object obj) {
83131
// NOTE!
@@ -95,4 +143,9 @@ public void responded(boolean responded) {
95143
@Override public String toString() {
96144
return S.toString(TcpDiscoveryJoinRequestMessage.class, this, "super", super.toString());
97145
}
146+
147+
/** {@inheritDoc} */
148+
@Override public short directType() {
149+
return 20;
150+
}
98151
}

0 commit comments

Comments
 (0)