Skip to content

Commit eac8eee

Browse files
committed
Review fixes
1 parent 83c8073 commit eac8eee

File tree

7 files changed

+280
-213
lines changed

7 files changed

+280
-213
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
5050
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
5151
import org.apache.ignite.internal.codegen.SessionChannelMessageSerializer;
52-
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
5352
import org.apache.ignite.internal.codegen.TcpInverseConnectionResponseMessageSerializer;
5453
import org.apache.ignite.internal.codegen.TxLockSerializer;
5554
import org.apache.ignite.internal.codegen.TxLocksRequestSerializer;
@@ -200,7 +199,6 @@
200199
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
201200
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
202201
import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
203-
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
204202

205203
/**
206204
* Message factory implementation.
@@ -381,8 +379,5 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
381379
// [2048..2053] - Snapshots
382380
// [-42..-37] - former hadoop.
383381
// [64..71] - former IGFS.
384-
385-
// Discovery messages.
386-
factory.register((short)-1000, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
387382
}
388383
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.managers.discovery;
19+
20+
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
21+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
22+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
23+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
24+
25+
/** */
26+
public class DiscoveryMessageFactory implements MessageFactoryProvider {
27+
/** {@inheritDoc} */
28+
@Override public void registerAll(MessageFactory factory) {
29+
// Discovery messages.
30+
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
31+
}
32+
}

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.ignite.internal.cluster.NodeOrderComparator;
7676
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
7777
import org.apache.ignite.internal.managers.GridManagerAdapter;
78+
import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
7879
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
7980
import org.apache.ignite.internal.managers.systemview.walker.ClusterNodeViewWalker;
8081
import org.apache.ignite.internal.managers.systemview.walker.NodeAttributeViewWalker;
@@ -121,6 +122,8 @@
121122
import org.apache.ignite.lang.IgniteProductVersion;
122123
import org.apache.ignite.lang.IgniteUuid;
123124
import org.apache.ignite.marshaller.Marshaller;
125+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
126+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
124127
import org.apache.ignite.plugin.security.SecurityCredentials;
125128
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
126129
import org.apache.ignite.spi.IgniteSpiException;
@@ -345,7 +348,11 @@ public GridDiscoveryManager(GridKernalContext ctx) {
345348
DiscoverySpi spi = getSpi();
346349

347350
spi.setNodeAttributes(ctx.nodeAttributes(), VER);
348-
spi.setMessageFactory(ctx.io().messageFactory());
351+
352+
MessageFactory msgFactory = new IgniteMessageFactoryImpl(
353+
new MessageFactoryProvider[] {new DiscoveryMessageFactory()});
354+
355+
spi.setMessageFactory(msgFactory);
349356
}
350357

351358
/**

modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public class IsolatedDiscoverySpi extends IgniteSpiAdapter implements IgniteDisc
132132

133133
/** {@inheritDoc} */
134134
@Override public void setMessageFactory(MessageFactory msgFactory) {
135-
// Np-op.
135+
// No-op.
136136
}
137137

138138
/** {@inheritDoc} */
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.spi.discovery.tcp;
19+
20+
import java.io.EOFException;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.OutputStream;
24+
import java.io.StreamCorruptedException;
25+
import java.net.Socket;
26+
import java.net.SocketException;
27+
import java.nio.ByteBuffer;
28+
import org.apache.ignite.Ignite;
29+
import org.apache.ignite.IgniteCheckedException;
30+
import org.apache.ignite.internal.direct.DirectMessageReader;
31+
import org.apache.ignite.internal.direct.DirectMessageWriter;
32+
import org.apache.ignite.internal.util.typedef.internal.U;
33+
import org.apache.ignite.marshaller.Marshaller;
34+
import org.apache.ignite.plugin.extensions.communication.Message;
35+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
36+
import org.apache.ignite.plugin.extensions.communication.MessageReader;
37+
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
38+
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
39+
40+
/**
41+
* TODO
42+
*/
43+
public class DiscoveryIOSession {
44+
/** */
45+
private static final byte[] JAVA_SERIALIZATION = new byte[] { 0 };
46+
47+
/** */
48+
private static final byte[] MESSAGE_SERIALIZATION = new byte[] { 1 };
49+
50+
/** */
51+
private final Socket sock;
52+
53+
/** */
54+
private final MessageWriter msgWriter;
55+
56+
/** */
57+
private final MessageReader msgReader;
58+
59+
/** */
60+
private final ByteBuffer buf;
61+
62+
/** */
63+
private final MessageFactory msgFactory;
64+
65+
/** */
66+
DiscoveryIOSession(MessageFactory msgFactory, Socket sock) throws SocketException {
67+
this.msgFactory = msgFactory;
68+
this.sock = sock;
69+
70+
msgWriter = new DirectMessageWriter(msgFactory);
71+
msgReader = new DirectMessageReader(msgFactory);
72+
73+
buf = ByteBuffer.allocate(sock.getSendBufferSize());
74+
}
75+
76+
/** */
77+
void writeMessage(Object msg, OutputStream out, Marshaller marshaller) throws IgniteCheckedException {
78+
try {
79+
if (!(msg instanceof Message)) {
80+
out.write(JAVA_SERIALIZATION);
81+
82+
U.marshal(marshaller, msg, out);
83+
84+
return;
85+
}
86+
87+
out.write(MESSAGE_SERIALIZATION);
88+
89+
Message m = (Message)msg;
90+
MessageSerializer msgSer = msgFactory.serializer(m.directType());
91+
msgWriter.reset();
92+
93+
boolean finish;
94+
95+
do {
96+
buf.clear();
97+
98+
finish = msgSer.writeTo(m, buf, msgWriter);
99+
100+
out.write(buf.array(), 0, buf.position());
101+
}
102+
while (!finish);
103+
104+
out.flush();
105+
}
106+
catch (Exception e) {
107+
// Keep logic similar to `U.marshal(...)`.
108+
if (e instanceof IgniteCheckedException)
109+
throw (IgniteCheckedException)e;
110+
111+
throw new IgniteCheckedException(e);
112+
}
113+
}
114+
115+
/** */
116+
<T> T readMessage(InputStream in, Marshaller marshaller, Ignite ign) throws IgniteCheckedException {
117+
try {
118+
buf.clear();
119+
120+
readNBytes(in, 1);
121+
122+
byte serMode = buf.array()[0];
123+
124+
if (JAVA_SERIALIZATION[0] == serMode)
125+
return U.unmarshal(marshaller, in, U.resolveClassLoader(ign.configuration()));
126+
127+
else if (MESSAGE_SERIALIZATION[0] == serMode) {
128+
readNBytes(in, 2);
129+
130+
msgReader.reset();
131+
msgReader.setBuffer(buf);
132+
133+
short msgType = msgReader.readShort();
134+
135+
MessageSerializer msgSer = msgFactory.serializer(msgType);
136+
Message msg = msgFactory.create(msgType);
137+
138+
boolean finish;
139+
140+
do {
141+
readNBytes(in, 1);
142+
143+
buf.position(0);
144+
buf.limit(1);
145+
146+
finish = msgSer.readFrom(msg, buf, msgReader);
147+
} while (!finish);
148+
149+
return (T)msg;
150+
}
151+
152+
detectSslAlert(serMode, in);
153+
154+
throw new EOFException();
155+
}
156+
catch (Exception e) {
157+
// Keep logic similar to `U.marshal(...)`.
158+
if (e instanceof IgniteCheckedException)
159+
throw (IgniteCheckedException)e;
160+
161+
throw new IgniteCheckedException(e);
162+
}
163+
}
164+
165+
/** */
166+
public Socket socket() {
167+
return sock;
168+
}
169+
170+
/** */
171+
private void readNBytes(InputStream in, int nbytes) throws IOException {
172+
if (in.readNBytes(buf.array(), 0, nbytes) < nbytes)
173+
throw new EOFException();
174+
}
175+
176+
/**
177+
* Checks wheter input stream contains SSL alert.
178+
* See handling {@code StreamCorruptedException} in {@link #readMessage(Socket, InputStream, long)}.
179+
* Keeps logic similar to {@link java.io.ObjectInputStream#readStreamHeader}.
180+
*/
181+
private void detectSslAlert(byte firstByte, InputStream in) throws IOException {
182+
byte[] hdr = new byte[4];
183+
hdr[0] = firstByte;
184+
int read = in.readNBytes(hdr, 1, 3);
185+
186+
if (read < 3)
187+
throw new EOFException();
188+
189+
String hex = String.format("%02x%02x%02x%02x", hdr[0], hdr[1], hdr[2], hdr[3]);
190+
191+
if (hex.matches("15....00"))
192+
throw new StreamCorruptedException("invalid stream header: " + hex);
193+
}
194+
}

0 commit comments

Comments
 (0)