Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.managers.discovery;

import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;

/** Message factory for discovery messages. */
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package org.apache.ignite.spi.discovery.tcp;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.StreamCorruptedException;
import java.net.InetSocketAddress;
import java.net.Socket;
Expand Down Expand Up @@ -719,23 +716,23 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
boolean openSock = false;

Socket sock = null;
OutputStream out;

try {
long tsNanos = System.nanoTime();

sock = spi.openSocket(addr, timeoutHelper);
out = spi.socketStream(sock);

TcpDiscoveryIoSession ses = createSession(sock);

openSock = true;

TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);

req.client(true);

spi.writeToSocket(sock, out, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.writeMessage(ses, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));

TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, ackTimeout0);

UUID rmtNodeId = res.creatorNodeId();

Expand Down Expand Up @@ -788,7 +785,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
if (msg instanceof TraceableMessage)
tracing.messages().beforeSend((TraceableMessage)msg);

spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.writeMessage(ses, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));

spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos));

Expand Down Expand Up @@ -1179,7 +1176,7 @@ private void forceStopRead() throws InterruptedException {
+ ":" + sockStream.sock.getPort());

try {
InputStream in = sockStream.stream();
TcpDiscoveryIoSession ses = createSession(sock);

assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" +
" KeepAlive " + sock.getKeepAlive() +
Expand All @@ -1189,7 +1186,7 @@ private void forceStopRead() throws InterruptedException {
TcpDiscoveryAbstractMessage msg;

try {
msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration()));
msg = spi.readMessage(ses, sock.getSoTimeout());
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
Expand Down Expand Up @@ -1266,6 +1263,9 @@ private class SocketWriter extends IgniteSpiThread {
/** */
private Socket sock;

/** */
private TcpDiscoveryIoSession ses;

/** */
private boolean clientAck;

Expand Down Expand Up @@ -1333,6 +1333,8 @@ private void setSocket(Socket sock, boolean clientAck) {
synchronized (mux) {
this.sock = sock;

ses = createSession(sock);

this.clientAck = clientAck;

unackedMsg = null;
Expand Down Expand Up @@ -1387,11 +1389,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
msg.client(true);

try {
spi.writeToSocket(
sock,
spi.socketStream(sock),
msg,
sockTimeout);
spi.writeMessage(ses, msg, sockTimeout);
}
catch (IOException | IgniteCheckedException e) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1434,11 +1432,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
}
}

spi.writeToSocket(
sock,
spi.socketStream(sock),
msg,
sockTimeout);
spi.writeMessage(ses, msg, sockTimeout);

IgniteUuid latencyCheckId = msg instanceof TcpDiscoveryRingLatencyCheckMessage ?
msg.id() : null;
Expand Down Expand Up @@ -1601,6 +1595,7 @@ public void cancel() {
clientAck = joinRes.get2();

Socket sock = sockStream.socket();
TcpDiscoveryIoSession ses = createSession(sock);

if (isInterrupted())
throw new InterruptedException();
Expand All @@ -1612,17 +1607,14 @@ public void cancel() {

sock.setSoTimeout((int)spi.netTimeout);

InputStream in = sockStream.stream();

assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" +
" KeepAlive " + sock.getKeepAlive() +
" TcpNoDelay " + sock.getTcpNoDelay();

List<TcpDiscoveryAbstractMessage> msgs = null;

while (!isInterrupted()) {
TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
U.resolveClassLoader(spi.ignite().configuration()));
TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, sock.getSoTimeout());

if (msg instanceof TcpDiscoveryClientReconnectMessage) {
TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
Expand Down Expand Up @@ -2769,9 +2761,6 @@ private static class SocketStream {
/** */
private final Socket sock;

/** */
private final InputStream in;

/**
* @param sock Socket.
* @throws IOException If failed to create stream.
Expand All @@ -2780,8 +2769,6 @@ public SocketStream(Socket sock) throws IOException {
assert sock != null;

this.sock = sock;

this.in = new BufferedInputStream(sock.getInputStream());
}

/**
Expand All @@ -2792,13 +2779,6 @@ Socket socket() {

}

/**
* @return Socket input stream.
*/
InputStream stream() {
return in;
}

/** {@inheritDoc} */
@Override public String toString() {
return sock.toString();
Expand Down
Loading
Loading