Skip to content

Commit 1f1c2e1

Browse files
authored
IGNITE-27415 Use MessageSerializer for TcpDiscoveryAuthFailedMessage (#12605)
1 parent 07307c9 commit 1f1c2e1

File tree

7 files changed

+132
-78
lines changed

7 files changed

+132
-78
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.ignite.internal.managers.discovery;
1919

2020
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
21+
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
22+
import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
2123
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
2224
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
2325
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
@@ -32,6 +34,8 @@
3234
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
3335
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
3436
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
37+
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
38+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
3539
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
3640
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
3741
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
@@ -48,6 +52,7 @@
4852
public class DiscoveryMessageFactory implements MessageFactoryProvider {
4953
/** {@inheritDoc} */
5054
@Override public void registerAll(MessageFactory factory) {
55+
factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer());
5156
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());
5257

5358
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
@@ -61,5 +66,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
6166
factory.register((short)8, TcpDiscoveryHandshakeRequest::new, new TcpDiscoveryHandshakeRequestSerializer());
6267
factory.register((short)9, TcpDiscoveryDiscardMessage::new, new TcpDiscoveryDiscardMessageSerializer());
6368
factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer());
69+
factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer());
6470
}
6571
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3340,7 +3340,7 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage)
33403340
*/
33413341
private void processAuthFailedMessage(TcpDiscoveryAuthFailedMessage authFailedMsg) {
33423342
try {
3343-
sendDirectlyToClient(authFailedMsg.getTargetNodeId(), authFailedMsg);
3343+
sendDirectlyToClient(authFailedMsg.targetNodeId(), authFailedMsg);
33443344
}
33453345
catch (IgniteSpiException ex) {
33463346
log.warning(
@@ -7174,7 +7174,7 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
71747174
mux.notifyAll();
71757175
}
71767176
else {
7177-
UUID targetNode = ((TcpDiscoveryAuthFailedMessage)msg).getTargetNodeId();
7177+
UUID targetNode = ((TcpDiscoveryAuthFailedMessage)msg).targetNodeId();
71787178

71797179
if (targetNode == null || targetNode.equals(locNodeId)) {
71807180
if (log.isDebugEnabled())

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2019,7 +2019,7 @@ protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMes
20192019
assert msg != null;
20202020

20212021
return new IgniteSpiException(new IgniteAuthenticationException("Authentication failed [nodeId=" +
2022-
msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']'));
2022+
msg.creatorNodeId() + ", addr=" + msg.creatorAddress().getHostAddress() + ']'));
20232023
}
20242024

20252025
/**

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetAddressMessage.java

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.net.InetAddress;
2121
import java.net.UnknownHostException;
22+
import org.apache.ignite.IgniteException;
2223
import org.apache.ignite.internal.Order;
2324
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
2425
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -31,29 +32,17 @@ public class InetAddressMessage implements Message {
3132
private String hostName;
3233

3334
/** */
34-
@Order(1)
35-
private int port;
36-
37-
/** */
38-
@Order(value = 2, method = "addressBytes")
35+
@Order(value = 1, method = "addressBytes")
3936
private byte[] addrBytes;
4037

41-
/**
42-
* Default constructor for {@link DiscoveryMessageFactory}.
43-
*/
38+
/** Default constructor for {@link DiscoveryMessageFactory}. */
4439
public InetAddressMessage() {
4540
// No-op.
4641
}
4742

48-
/**
49-
* Constructor.
50-
*
51-
* @param addr Address.
52-
* @param port Port.
53-
*/
54-
public InetAddressMessage(InetAddress addr, int port) {
43+
/** @param addr Address. */
44+
public InetAddressMessage(InetAddress addr) {
5545
hostName = addr.getHostName();
56-
this.port = port;
5746
addrBytes = addr.getAddress();
5847
}
5948

@@ -67,14 +56,14 @@ public void addressBytes(byte[] addrBytes) {
6756
this.addrBytes = addrBytes;
6857
}
6958

70-
/** @return port. */
71-
public int port() {
72-
return port;
73-
}
74-
75-
/** @param port port. */
76-
public void port(int port) {
77-
this.port = port;
59+
/** @return {@link InetAddress#getByAddress(String, byte[])} */
60+
public InetAddress address() {
61+
try {
62+
return addrBytes == null ? null : InetAddress.getByAddress(hostName, addrBytes);
63+
}
64+
catch (UnknownHostException e) {
65+
throw new IgniteException("Failed to read host address.", e);
66+
}
7867
}
7968

8069
/** @return Host name. */
@@ -87,11 +76,6 @@ public void hostName(String hostName) {
8776
this.hostName = hostName;
8877
}
8978

90-
/** @return {@link InetAddress#getByAddress(String, byte[])} */
91-
public InetAddress address() throws UnknownHostException {
92-
return addrBytes == null ? null : InetAddress.getByAddress(hostName, addrBytes);
93-
}
94-
9579
/** {@inheritDoc} */
9680
@Override public short directType() {
9781
return -100;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.spi.discovery.tcp.messages;
19+
20+
import java.net.InetAddress;
21+
import org.apache.ignite.internal.Order;
22+
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
23+
import org.apache.ignite.internal.util.typedef.internal.S;
24+
25+
/** Socket address utility container message. Is not a pure {@link TcpDiscoveryAbstractMessage}. */
26+
public class InetSocketAddressMessage extends InetAddressMessage {
27+
/** */
28+
@Order(2)
29+
private int port;
30+
31+
/**
32+
* Default constructor for {@link DiscoveryMessageFactory}.
33+
*/
34+
public InetSocketAddressMessage() {
35+
// No-op.
36+
}
37+
38+
/**
39+
* Constructor.
40+
*
41+
* @param addr Address.
42+
* @param port Port.
43+
*/
44+
public InetSocketAddressMessage(InetAddress addr, int port) {
45+
super(addr);
46+
47+
this.port = port;
48+
}
49+
50+
/** @return Port. */
51+
public int port() {
52+
return port;
53+
}
54+
55+
/** @param port Port. */
56+
public void port(int port) {
57+
this.port = port;
58+
}
59+
60+
/** {@inheritDoc} */
61+
@Override public short directType() {
62+
return -101;
63+
}
64+
65+
/** {@inheritDoc} */
66+
@Override public String toString() {
67+
return S.toString(InetSocketAddressMessage.class, this);
68+
}
69+
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAuthFailedMessage.java

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,71 +17,75 @@
1717

1818
package org.apache.ignite.spi.discovery.tcp.messages;
1919

20-
import java.io.IOException;
21-
import java.io.ObjectInputStream;
22-
import java.io.ObjectOutputStream;
2320
import java.net.InetAddress;
2421
import java.util.UUID;
22+
import org.apache.ignite.internal.Order;
23+
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
2524
import org.apache.ignite.internal.util.typedef.internal.S;
26-
import org.apache.ignite.internal.util.typedef.internal.U;
25+
import org.apache.ignite.plugin.extensions.communication.Message;
2726

2827
/**
29-
* Message telling joining node that its authentication failed on coordinator.
28+
* Message telling joining node that its authentication failed.
3029
*/
31-
public class TcpDiscoveryAuthFailedMessage extends TcpDiscoveryAbstractMessage {
30+
public class TcpDiscoveryAuthFailedMessage extends TcpDiscoveryAbstractMessage implements Message {
3231
/** */
3332
private static final long serialVersionUID = 0L;
3433

35-
/** Coordinator address. */
36-
private transient InetAddress addr;
34+
/** Creator address. */
35+
@Order(value = 5, method = "creatorAddressMessage")
36+
private InetAddressMessage creatorAddrMsg;
3737

3838
/** Node id for which authentication was failed. */
39+
@Order(6)
3940
private UUID targetNodeId;
4041

42+
/** Default constructor for {@link DiscoveryMessageFactory}. */
43+
public TcpDiscoveryAuthFailedMessage() {
44+
// No-op.
45+
}
46+
4147
/**
4248
* Constructor.
4349
*
4450
* @param creatorNodeId Creator node ID.
45-
* @param addr Coordinator address.
51+
* @param creatorAddr Creator address.
4652
* @param targetNodeId Node for which authentication was failed.
4753
*/
48-
public TcpDiscoveryAuthFailedMessage(UUID creatorNodeId, InetAddress addr, UUID targetNodeId) {
54+
public TcpDiscoveryAuthFailedMessage(UUID creatorNodeId, InetAddress creatorAddr, UUID targetNodeId) {
4955
super(creatorNodeId);
5056

51-
this.addr = addr;
57+
this.creatorAddrMsg = new InetAddressMessage(creatorAddr);
5258
this.targetNodeId = targetNodeId;
5359
}
5460

55-
/**
56-
* @return Node for which authentication was failed.
57-
*/
58-
public UUID getTargetNodeId() {
61+
/** @return Node for which authentication was failed. */
62+
public UUID targetNodeId() {
5963
return targetNodeId;
6064
}
6165

62-
/**
63-
* @return Coordinator address.
64-
*/
65-
public InetAddress address() {
66-
return addr;
66+
/** @param targetNodeId Node for which authentication was failed. */
67+
public void targetNodeId(UUID targetNodeId) {
68+
this.targetNodeId = targetNodeId;
6769
}
6870

69-
/**
70-
* Serialize this message.
71-
*/
72-
private void writeObject(ObjectOutputStream out) throws IOException {
73-
out.defaultWriteObject();
71+
/** @return Creator address message. */
72+
public InetAddressMessage creatorAddressMessage() {
73+
return creatorAddrMsg;
74+
}
7475

75-
U.writeByteArray(out, addr.getAddress());
76+
/** @param addr Creator address message. */
77+
public void creatorAddressMessage(InetAddressMessage addr) {
78+
this.creatorAddrMsg = addr;
7679
}
7780

78-
/**
79-
* Deserialize this message.
80-
*/
81-
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
82-
in.defaultReadObject();
81+
/** @return Creator address. */
82+
public InetAddress creatorAddress() {
83+
return creatorAddrMsg.address();
84+
}
8385

84-
addr = InetAddress.getByAddress(U.readByteArray(in));
86+
/** {@inheritDoc} */
87+
@Override public short directType() {
88+
return 11;
8589
}
8690

8791
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
package org.apache.ignite.spi.discovery.tcp.messages;
1919

2020
import java.net.InetSocketAddress;
21-
import java.net.UnknownHostException;
2221
import java.util.Collection;
2322
import java.util.UUID;
24-
import org.apache.ignite.IgniteException;
2523
import org.apache.ignite.internal.Order;
2624
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
2725
import org.apache.ignite.internal.util.typedef.F;
@@ -46,7 +44,7 @@ public class TcpDiscoveryHandshakeResponse extends TcpDiscoveryAbstractMessage i
4644

4745
/** Redirect addresses messages serialization holder. */
4846
@Order(value = 7, method = "redirectAddressesMessages")
49-
private @Nullable Collection<InetAddressMessage> redirectAddrsMsgs;
47+
private @Nullable Collection<InetSocketAddressMessage> redirectAddrsMsgs;
5048

5149
/**
5250
* Default constructor for {@link DiscoveryMessageFactory}.
@@ -109,30 +107,23 @@ public void order(long order) {
109107
public @Nullable Collection<InetSocketAddress> redirectAddresses() {
110108
return F.isEmpty(redirectAddrsMsgs)
111109
? null
112-
: F.transform(redirectAddrsMsgs, msg -> {
113-
try {
114-
return new InetSocketAddress(msg.address(), msg.port());
115-
}
116-
catch (UnknownHostException e) {
117-
throw new IgniteException("Failed to read host address.", e);
118-
}
119-
});
110+
: F.transform(redirectAddrsMsgs, msg -> new InetSocketAddress(msg.address(), msg.port()));
120111
}
121112

122113
/** @param sockAddrs Socket addresses list for redirect. */
123114
public void redirectAddresses(@Nullable Collection<InetSocketAddress> sockAddrs) {
124115
redirectAddrsMsgs = sockAddrs == null
125116
? null
126-
: F.viewReadOnly(sockAddrs, addr -> new InetAddressMessage(addr.getAddress(), addr.getPort()));
117+
: F.viewReadOnly(sockAddrs, addr -> new InetSocketAddressMessage(addr.getAddress(), addr.getPort()));
127118
}
128119

129120
/** @return Collection of {@link InetAddressMessage}. */
130-
public @Nullable Collection<InetAddressMessage> redirectAddressesMessages() {
121+
public @Nullable Collection<InetSocketAddressMessage> redirectAddressesMessages() {
131122
return redirectAddrsMsgs;
132123
}
133124

134125
/** @param redirectAddrsMsgs Collection of {@link InetAddressMessage}. */
135-
public void redirectAddressesMessages(@Nullable Collection<InetAddressMessage> redirectAddrsMsgs) {
126+
public void redirectAddressesMessages(@Nullable Collection<InetSocketAddressMessage> redirectAddrsMsgs) {
136127
this.redirectAddrsMsgs = redirectAddrsMsgs;
137128
}
138129

0 commit comments

Comments
 (0)