Skip to content

Commit bf87669

Browse files
committed
WIP
1 parent d45d2be commit bf87669

File tree

34 files changed

+538
-232
lines changed

34 files changed

+538
-232
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.managers.discovery;
19+
20+
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
21+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
22+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
23+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
24+
25+
/** Message factory for discovery messages. */
26+
public class DiscoveryMessageFactory implements MessageFactoryProvider {
27+
/** {@inheritDoc} */
28+
@Override public void registerAll(MessageFactory factory) {
29+
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
30+
}
31+
}

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.ignite.internal.cluster.NodeOrderComparator;
7676
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
7777
import org.apache.ignite.internal.managers.GridManagerAdapter;
78+
import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
7879
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
7980
import org.apache.ignite.internal.managers.systemview.walker.ClusterNodeViewWalker;
8081
import org.apache.ignite.internal.managers.systemview.walker.NodeAttributeViewWalker;
@@ -121,6 +122,8 @@
121122
import org.apache.ignite.lang.IgniteProductVersion;
122123
import org.apache.ignite.lang.IgniteUuid;
123124
import org.apache.ignite.marshaller.Marshaller;
125+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
126+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
124127
import org.apache.ignite.plugin.security.SecurityCredentials;
125128
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
126129
import org.apache.ignite.spi.IgniteSpiException;
@@ -347,6 +350,11 @@ public GridDiscoveryManager(GridKernalContext ctx) {
347350
DiscoverySpi spi = getSpi();
348351

349352
spi.setNodeAttributes(ctx.nodeAttributes(), VER);
353+
354+
MessageFactory msgFactory = new IgniteMessageFactoryImpl(
355+
new MessageFactoryProvider[] {new DiscoveryMessageFactory()});
356+
357+
spi.setMessageFactory(msgFactory);
350358
}
351359

352360
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.ignite.IgniteException;
2525
import org.apache.ignite.cluster.ClusterNode;
2626
import org.apache.ignite.lang.IgniteProductVersion;
27+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2728
import org.apache.ignite.spi.IgniteSpiAdapter;
2829
import org.apache.ignite.spi.IgniteSpiException;
2930
import org.apache.ignite.spi.IgniteSpiNoop;
@@ -70,6 +71,11 @@ public class StandaloneNoopDiscoverySpi extends IgniteSpiAdapter implements Disc
7071

7172
}
7273

74+
/** {@inheritDoc} */
75+
@Override public void setMessageFactory(MessageFactory msgFactory) {
76+
77+
}
78+
7379
/** {@inheritDoc} */
7480
@Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
7581

modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.ignite.IgniteException;
2525
import org.apache.ignite.cluster.ClusterNode;
2626
import org.apache.ignite.lang.IgniteProductVersion;
27+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2728
import org.apache.ignite.spi.IgniteSpi;
2829
import org.apache.ignite.spi.IgniteSpiException;
2930
import org.jetbrains.annotations.Nullable;
@@ -94,6 +95,13 @@ public interface DiscoverySpi extends IgniteSpi {
9495
*/
9596
public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver);
9697

98+
/**
99+
* Sets {@link MessageFactory} used for serializing discovery messages.
100+
*
101+
* @param msgFactory Message factory.
102+
*/
103+
public void setMessageFactory(MessageFactory msgFactory);
104+
97105
/**
98106
* Sets a listener for discovery events. Refer to
99107
* {@link org.apache.ignite.events.DiscoveryEvent} for a set of all possible

modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.ignite.lang.IgniteFuture;
3535
import org.apache.ignite.lang.IgniteProductVersion;
3636
import org.apache.ignite.marshaller.Marshaller;
37+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
3738
import org.apache.ignite.plugin.security.SecurityCredentials;
3839
import org.apache.ignite.spi.IgniteSpiAdapter;
3940
import org.apache.ignite.spi.IgniteSpiContext;
@@ -128,6 +129,11 @@ public class IsolatedDiscoverySpi extends IgniteSpiAdapter implements IgniteDisc
128129
locNode = new IsolatedNode(ignite.configuration().getNodeId(), attrs, ver);
129130
}
130131

132+
/** {@inheritDoc} */
133+
@Override public void setMessageFactory(MessageFactory msgFactory) {
134+
// No-op.
135+
}
136+
131137
/** {@inheritDoc} */
132138
@Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
133139
this.lsnr = lsnr;

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.io.IOException;
2222
import java.io.InputStream;
2323
import java.io.InterruptedIOException;
24-
import java.io.OutputStream;
2524
import java.io.StreamCorruptedException;
2625
import java.net.InetSocketAddress;
2726
import java.net.Socket;
@@ -719,23 +718,23 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
719718
boolean openSock = false;
720719

721720
Socket sock = null;
722-
OutputStream out;
723721

724722
try {
725723
long tsNanos = System.nanoTime();
726724

727725
sock = spi.openSocket(addr, timeoutHelper);
728-
out = spi.socketStream(sock);
726+
727+
TcpDiscoveryIoSession ses = new TcpDiscoveryIoSession(sock, spi);
729728

730729
openSock = true;
731730

732731
TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
733732

734733
req.client(true);
735734

736-
spi.writeToSocket(sock, out, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
735+
spi.writeToSocket(ses, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
737736

738-
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
737+
TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, ackTimeout0);
739738

740739
UUID rmtNodeId = res.creatorNodeId();
741740

@@ -788,7 +787,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
788787
if (msg instanceof TraceableMessage)
789788
tracing.messages().beforeSend((TraceableMessage)msg);
790789

791-
spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
790+
spi.writeToSocket(ses, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
792791

793792
spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos));
794793

@@ -1173,14 +1172,13 @@ private void forceStopRead() throws InterruptedException {
11731172
}
11741173

11751174
Socket sock = sockStream.socket();
1175+
TcpDiscoveryIoSession ses = new TcpDiscoveryIoSession(sock, spi);
11761176

11771177
U.enhanceThreadName(U.id8(rmtNodeId)
11781178
+ ' ' + sockStream.sock.getInetAddress().getHostAddress()
11791179
+ ":" + sockStream.sock.getPort());
11801180

11811181
try {
1182-
InputStream in = sockStream.stream();
1183-
11841182
assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" +
11851183
" KeepAlive " + sock.getKeepAlive() +
11861184
" TcpNoDelay " + sock.getTcpNoDelay();
@@ -1189,7 +1187,7 @@ private void forceStopRead() throws InterruptedException {
11891187
TcpDiscoveryAbstractMessage msg;
11901188

11911189
try {
1192-
msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration()));
1190+
msg = spi.readMessage(ses, sock.getSoTimeout());
11931191
}
11941192
catch (IgniteCheckedException e) {
11951193
if (log.isDebugEnabled())
@@ -1266,6 +1264,9 @@ private class SocketWriter extends IgniteSpiThread {
12661264
/** */
12671265
private Socket sock;
12681266

1267+
/** */
1268+
private TcpDiscoveryIoSession ses;
1269+
12691270
/** */
12701271
private boolean clientAck;
12711272

@@ -1333,6 +1334,8 @@ private void setSocket(Socket sock, boolean clientAck) {
13331334
synchronized (mux) {
13341335
this.sock = sock;
13351336

1337+
ses = new TcpDiscoveryIoSession(sock, spi);
1338+
13361339
this.clientAck = clientAck;
13371340

13381341
unackedMsg = null;
@@ -1387,11 +1390,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
13871390
msg.client(true);
13881391

13891392
try {
1390-
spi.writeToSocket(
1391-
sock,
1392-
spi.socketStream(sock),
1393-
msg,
1394-
sockTimeout);
1393+
spi.writeToSocket(ses, msg, sockTimeout);
13951394
}
13961395
catch (IOException | IgniteCheckedException e) {
13971396
if (log.isDebugEnabled()) {
@@ -1434,11 +1433,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
14341433
}
14351434
}
14361435

1437-
spi.writeToSocket(
1438-
sock,
1439-
spi.socketStream(sock),
1440-
msg,
1441-
sockTimeout);
1436+
spi.writeToSocket(ses, msg, sockTimeout);
14421437

14431438
IgniteUuid latencyCheckId = msg instanceof TcpDiscoveryRingLatencyCheckMessage ?
14441439
msg.id() : null;
@@ -1601,6 +1596,7 @@ public void cancel() {
16011596
clientAck = joinRes.get2();
16021597

16031598
Socket sock = sockStream.socket();
1599+
TcpDiscoveryIoSession ses = new TcpDiscoveryIoSession(sock, spi);
16041600

16051601
if (isInterrupted())
16061602
throw new InterruptedException();
@@ -1612,17 +1608,14 @@ public void cancel() {
16121608

16131609
sock.setSoTimeout((int)spi.netTimeout);
16141610

1615-
InputStream in = sockStream.stream();
1616-
16171611
assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" +
16181612
" KeepAlive " + sock.getKeepAlive() +
16191613
" TcpNoDelay " + sock.getTcpNoDelay();
16201614

16211615
List<TcpDiscoveryAbstractMessage> msgs = null;
16221616

16231617
while (!isInterrupted()) {
1624-
TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
1625-
U.resolveClassLoader(spi.ignite().configuration()));
1618+
TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, sock.getSoTimeout());
16261619

16271620
if (msg instanceof TcpDiscoveryClientReconnectMessage) {
16281621
TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;

0 commit comments

Comments
 (0)