Skip to content

Commit a1abd2c

Browse files
authored
IGNITE-26111 TcpDiscoverySpi uses MessageSerializer (#12243)
1 parent 898d7be commit a1abd2c

31 files changed

+537
-243
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.managers.discovery;
19+
20+
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
21+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
22+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
23+
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
24+
25+
/** Message factory for discovery messages. */
26+
public class DiscoveryMessageFactory implements MessageFactoryProvider {
27+
/** {@inheritDoc} */
28+
@Override public void registerAll(MessageFactory factory) {
29+
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
30+
}
31+
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 16 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@
1717

1818
package org.apache.ignite.spi.discovery.tcp;
1919

20-
import java.io.BufferedInputStream;
2120
import java.io.IOException;
22-
import java.io.InputStream;
2321
import java.io.InterruptedIOException;
24-
import java.io.OutputStream;
2522
import java.io.StreamCorruptedException;
2623
import java.net.InetSocketAddress;
2724
import java.net.Socket;
@@ -719,23 +716,23 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
719716
boolean openSock = false;
720717

721718
Socket sock = null;
722-
OutputStream out;
723719

724720
try {
725721
long tsNanos = System.nanoTime();
726722

727723
sock = spi.openSocket(addr, timeoutHelper);
728-
out = spi.socketStream(sock);
724+
725+
TcpDiscoveryIoSession ses = createSession(sock);
729726

730727
openSock = true;
731728

732729
TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
733730

734731
req.client(true);
735732

736-
spi.writeToSocket(sock, out, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
733+
spi.writeMessage(ses, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
737734

738-
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
735+
TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, ackTimeout0);
739736

740737
UUID rmtNodeId = res.creatorNodeId();
741738

@@ -788,7 +785,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
788785
if (msg instanceof TraceableMessage)
789786
tracing.messages().beforeSend((TraceableMessage)msg);
790787

791-
spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
788+
spi.writeMessage(ses, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
792789

793790
spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos));
794791

@@ -1179,7 +1176,7 @@ private void forceStopRead() throws InterruptedException {
11791176
+ ":" + sockStream.sock.getPort());
11801177

11811178
try {
1182-
InputStream in = sockStream.stream();
1179+
TcpDiscoveryIoSession ses = createSession(sock);
11831180

11841181
assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" +
11851182
" KeepAlive " + sock.getKeepAlive() +
@@ -1189,7 +1186,7 @@ private void forceStopRead() throws InterruptedException {
11891186
TcpDiscoveryAbstractMessage msg;
11901187

11911188
try {
1192-
msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration()));
1189+
msg = spi.readMessage(ses, sock.getSoTimeout());
11931190
}
11941191
catch (IgniteCheckedException e) {
11951192
if (log.isDebugEnabled())
@@ -1266,6 +1263,9 @@ private class SocketWriter extends IgniteSpiThread {
12661263
/** */
12671264
private Socket sock;
12681265

1266+
/** */
1267+
private TcpDiscoveryIoSession ses;
1268+
12691269
/** */
12701270
private boolean clientAck;
12711271

@@ -1333,6 +1333,8 @@ private void setSocket(Socket sock, boolean clientAck) {
13331333
synchronized (mux) {
13341334
this.sock = sock;
13351335

1336+
ses = createSession(sock);
1337+
13361338
this.clientAck = clientAck;
13371339

13381340
unackedMsg = null;
@@ -1387,11 +1389,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
13871389
msg.client(true);
13881390

13891391
try {
1390-
spi.writeToSocket(
1391-
sock,
1392-
spi.socketStream(sock),
1393-
msg,
1394-
sockTimeout);
1392+
spi.writeMessage(ses, msg, sockTimeout);
13951393
}
13961394
catch (IOException | IgniteCheckedException e) {
13971395
if (log.isDebugEnabled()) {
@@ -1434,11 +1432,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
14341432
}
14351433
}
14361434

1437-
spi.writeToSocket(
1438-
sock,
1439-
spi.socketStream(sock),
1440-
msg,
1441-
sockTimeout);
1435+
spi.writeMessage(ses, msg, sockTimeout);
14421436

14431437
IgniteUuid latencyCheckId = msg instanceof TcpDiscoveryRingLatencyCheckMessage ?
14441438
msg.id() : null;
@@ -1601,6 +1595,7 @@ public void cancel() {
16011595
clientAck = joinRes.get2();
16021596

16031597
Socket sock = sockStream.socket();
1598+
TcpDiscoveryIoSession ses = createSession(sock);
16041599

16051600
if (isInterrupted())
16061601
throw new InterruptedException();
@@ -1612,17 +1607,14 @@ public void cancel() {
16121607

16131608
sock.setSoTimeout((int)spi.netTimeout);
16141609

1615-
InputStream in = sockStream.stream();
1616-
16171610
assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" +
16181611
" KeepAlive " + sock.getKeepAlive() +
16191612
" TcpNoDelay " + sock.getTcpNoDelay();
16201613

16211614
List<TcpDiscoveryAbstractMessage> msgs = null;
16221615

16231616
while (!isInterrupted()) {
1624-
TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
1625-
U.resolveClassLoader(spi.ignite().configuration()));
1617+
TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, sock.getSoTimeout());
16261618

16271619
if (msg instanceof TcpDiscoveryClientReconnectMessage) {
16281620
TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
@@ -2769,9 +2761,6 @@ private static class SocketStream {
27692761
/** */
27702762
private final Socket sock;
27712763

2772-
/** */
2773-
private final InputStream in;
2774-
27752764
/**
27762765
* @param sock Socket.
27772766
* @throws IOException If failed to create stream.
@@ -2780,8 +2769,6 @@ public SocketStream(Socket sock) throws IOException {
27802769
assert sock != null;
27812770

27822771
this.sock = sock;
2783-
2784-
this.in = new BufferedInputStream(sock.getInputStream());
27852772
}
27862773

27872774
/**
@@ -2792,13 +2779,6 @@ Socket socket() {
27922779

27932780
}
27942781

2795-
/**
2796-
* @return Socket input stream.
2797-
*/
2798-
InputStream stream() {
2799-
return in;
2800-
}
2801-
28022782
/** {@inheritDoc} */
28032783
@Override public String toString() {
28042784
return sock.toString();

0 commit comments

Comments
 (0)