diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 1c01592fd0dd5..dc176cd4540fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -20,6 +20,7 @@ import org.apache.ignite.internal.codegen.InetAddressMessageSerializer; import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryCacheMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; @@ -29,6 +30,10 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodesMetricsMapMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; @@ -37,6 +42,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCacheMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; @@ -46,6 +52,10 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodesMetricsMapMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; @@ -54,6 +64,11 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { + factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new, + new TcpDiscoveryNodeFullMetricsMessageSerializer()); + factory.register((short)-104, TcpDiscoveryNodesMetricsMapMessage::new, new TcpDiscoveryNodesMetricsMapMessageSerializer()); + factory.register((short)-103, TcpDiscoveryCacheMetricsMessage::new, new TcpDiscoveryCacheMetricsMessageSerializer()); + factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer()); factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer()); factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer()); @@ -70,5 +85,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer()); factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); + factory.register((short)13, TcpDiscoveryMetricsUpdateMessage::new, new TcpDiscoveryMetricsUpdateMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java index a1ac2dc3c8c47..7bba244fc7fc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java @@ -46,11 +46,11 @@ class ClusterNodeMetrics { /** */ public ClusterNodeMetrics(NodeFullMetricsMessage msg) { - nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMsg()); + nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMessage()); - cacheMetrics = new HashMap<>(msg.cachesMetrics().size(), 1.0f); + cacheMetrics = new HashMap<>(msg.cachesMetricsMessages().size(), 1.0f); - msg.cachesMetrics().entrySet().forEach(e -> cacheMetrics.put(e.getKey(), new CacheMetricsSnapshot(e.getValue()))); + msg.cachesMetricsMessages().forEach((key, value) -> cacheMetrics.put(key, new CacheMetricsSnapshot(value))); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java index 2974ab93351a6..0ca3ee6800f6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/NodeFullMetricsMessage.java @@ -17,59 +17,59 @@ package org.apache.ignite.internal.processors.cluster; -import java.util.HashMap; import java.util.Map; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; /** Node compound metrics message. */ -public final class NodeFullMetricsMessage implements Message { +public class NodeFullMetricsMessage implements Message { /** */ public static final short TYPE_CODE = 138; /** Node metrics wrapper message. */ - @Order(0) + @Order(value = 0, method = "nodeMetricsMessage") private NodeMetricsMessage nodeMetricsMsg; /** Cache metrics wrapper message. */ - @Order(1) - private Map cachesMetrics; + @Order(value = 1, method = "cachesMetricsMessages") + private Map cachesMetricsMsgs; /** Empty constructor for {@link GridIoMessageFactory}. */ public NodeFullMetricsMessage() { - + // No-op. } /** */ public NodeFullMetricsMessage(ClusterMetrics nodeMetrics, Map cacheMetrics) { nodeMetricsMsg = new NodeMetricsMessage(nodeMetrics); - cachesMetrics = new HashMap<>(cacheMetrics.size(), 1.0f); + cachesMetricsMsgs = U.newHashMap(cacheMetrics.size()); - cacheMetrics.forEach((key, value) -> cachesMetrics.put(key, new CacheMetricsMessage(value))); + cacheMetrics.forEach((key, value) -> cachesMetricsMsgs.put(key, new CacheMetricsMessage(value))); } /** */ - public Map cachesMetrics() { - return cachesMetrics; + public Map cachesMetricsMessages() { + return cachesMetricsMsgs; } /** */ - public void cachesMetrics(Map cacheMetricsMsg) { - cachesMetrics = cacheMetricsMsg; + public void cachesMetricsMessages(Map cacheMetricsMsg) { + cachesMetricsMsgs = cacheMetricsMsg; } /** */ - public NodeMetricsMessage nodeMetricsMsg() { + public NodeMetricsMessage nodeMetricsMessage() { return nodeMetricsMsg; } /** */ - public void nodeMetricsMsg(NodeMetricsMessage nodeMetricsMsg) { + public void nodeMetricsMessage(NodeMetricsMessage nodeMetricsMsg) { this.nodeMetricsMsg = nodeMetricsMsg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 5ecbe893fb852..238fdd13eabd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -2526,8 +2526,8 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { log.debug("Received metrics response: " + msg); } else { - if (msg.hasMetrics()) - processMsgCacheMetrics(msg, System.nanoTime()); + if (!F.isEmpty(msg.serversFullMetricsMessages())) + processCacheMetricsMessage(msg, System.nanoTime()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index aef67a57809d8..12b58668ca4da 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2208,8 +2208,7 @@ private boolean recordable(TcpDiscoveryAbstractMessage msg) { * @param nodeId Node ID. */ private static void removeMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) { - msg.removeMetrics(nodeId); - msg.removeCacheMetrics(nodeId); + msg.removeServerMetrics(nodeId); } /** {@inheritDoc} */ @@ -3379,10 +3378,11 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { if (msgBytes == null) { try { - msgBytes = U.marshal(spi.marshaller(), msg); + msgBytes = clientMsgWorker.ses.serializeMessage(msg); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal message: " + msg, e); + catch (IgniteCheckedException | IOException e) { + U.error(log, "Failed to serialize message to a client: " + msg + ", client id: " + + clientMsgWorker.clientNodeId, e); break; } @@ -6032,14 +6032,14 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { long tsNanos = System.nanoTime(); - if (spiStateCopy() == CONNECTED && msg.hasMetrics()) - processMsgCacheMetrics(msg, tsNanos); + if (spiStateCopy() == CONNECTED && !F.isEmpty(msg.serversFullMetricsMessages())) + processCacheMetricsMessage(msg, tsNanos); if (sendMessageToRemotes(msg)) { if (laps == 0 && spiStateCopy() == CONNECTED) { // Message is on its first ring or just created on coordinator. - msg.setMetrics(locNodeId, spi.metricsProvider.metrics()); - msg.setCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics()); + msg.addServerMetrics(locNodeId, spi.metricsProvider.metrics()); + msg.addServerCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics()); for (Map.Entry e : clientMsgWorkers.entrySet()) { UUID nodeId = e.getKey(); @@ -6047,15 +6047,15 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { if (metrics != null) msg.setClientMetrics(locNodeId, nodeId, metrics); - - msg.addClientNodeId(nodeId); } } else { // Message is on its second ring. - removeMetrics(msg, locNodeId); + msg.removeServerMetrics(locNodeId); - Collection clientNodeIds = msg.clientNodeIds(); + Collection clientNodeIds = F.isEmpty(msg.connectedClientsMetricsMessages()) + ? Collections.emptySet() + : msg.connectedClientsMetricsMessages().keySet(); for (TcpDiscoveryNode clientNode : ring.clientNodes()) { if (clientNode.visible()) { @@ -6065,9 +6065,7 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { if (!clientNode.clientAliveTimeSet()) clientNode.clientAliveTime(spi.clientFailureDetectionTimeout()); - boolean aliveCheck = clientNode.isClientAlive(); - - if (!aliveCheck && isLocalNodeCoordinator()) { + if (!clientNode.isClientAlive() && isLocalNodeCoordinator()) { boolean failedNode; synchronized (mux) { @@ -8416,7 +8414,8 @@ else if (laps == 1) { private int passedLaps(TcpDiscoveryMetricsUpdateMessage msg) { UUID locNodeId = getLocalNodeId(); - boolean hasLocMetrics = hasMetrics(msg, locNodeId); + boolean hasLocMetrics = !F.isEmpty(msg.serversFullMetricsMessages()) + && msg.serversFullMetricsMessages().get(locNodeId) != null; if (locNodeId.equals(msg.creatorNodeId()) && !hasLocMetrics && msg.senderNodeId() != null) return 2; @@ -8425,15 +8424,5 @@ else if (msg.senderNodeId() == null || !hasLocMetrics) else return 1; } - - /** - * @param msg Metrics update message to check. - * @param nodeId Node ID for which the check should be performed. - * @return {@code True} is the message contains metrics of the node with the provided ID. - * {@code False} otherwise. - */ - private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) { - return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId); - } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index f35f411479501..80b7879854606 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -35,10 +35,14 @@ import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot; +import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; import org.apache.ignite.internal.processors.tracing.NoopTracing; import org.apache.ignite.internal.processors.tracing.Tracing; -import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; @@ -49,6 +53,8 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodesMetricsMapMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; import org.jetbrains.annotations.Nullable; @@ -415,27 +421,45 @@ protected void clearNodeSensitiveData(TcpDiscoveryNode node) { } /** */ - public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) { - for (Map.Entry e : msg.metrics().entrySet()) { - UUID nodeId = e.getKey(); + public void processCacheMetricsMessage(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) { + for (Map.Entry e : msg.serversFullMetricsMessages().entrySet()) { + UUID srvrId = e.getKey(); + Map cacheMetricsMsgs = e.getValue().cachesMetricsMessages(); + NodeMetricsMessage srvrMetricsMsg = e.getValue().nodeMetricsMessage(); - TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue(); + assert srvrMetricsMsg != null; - Map cacheMetrics = msg.hasCacheMetrics(nodeId) ? - msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); + Map cacheMetrics; - if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis() - && cacheMetrics.size() >= METRICS_QNT_WARN) { + if (!F.isEmpty(cacheMetricsMsgs)) { + cacheMetrics = U.newHashMap(cacheMetricsMsgs.size()); + + cacheMetricsMsgs.forEach((cacheId, cacheMetricsMsg) -> + cacheMetrics.put(cacheId, new CacheMetricsSnapshot(cacheMetricsMsg))); + } + else + cacheMetrics = Collections.emptyMap(); + + if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis() && cacheMetrics.size() >= METRICS_QNT_WARN) { log.warning("The Discovery message has metrics for " + cacheMetrics.size() + " caches.\n" + "To prevent Discovery blocking use -DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option."); endTimeMetricsSizeProcessWait = U.currentTimeMillis() + LOG_WARN_MSG_TIMEOUT; } - updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos); + updateMetrics(srvrId, new ClusterMetricsSnapshot(srvrMetricsMsg), cacheMetrics, tsNanos); + + TcpDiscoveryNodesMetricsMapMessage clientsMetricsMsg = F.isEmpty(msg.connectedClientsMetricsMessages()) + ? null + : msg.connectedClientsMetricsMessages().get(srvrId); + + if (clientsMetricsMsg == null) + return; + + assert clientsMetricsMsg.nodesMetricsMessages() != null; - for (T2 t : metricsSet.clientMetrics()) - updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos); + clientsMetricsMsg.nodesMetricsMessages().forEach((clientId, clientNodeMetricsMsg) -> + updateMetrics(clientId, new ClusterMetricsSnapshot(clientNodeMetricsMsg), cacheMetrics, tsNanos)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 5c1946f0eac34..edcab473b4c91 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -170,13 +170,12 @@ T readMessage() throws IgniteCheckedException, IOException { if (MESSAGE_SERIALIZATION != serMode) { detectSslAlert(serMode, in); - throw new IgniteCheckedException("Received unexpected byte while reading discovery message: " + serMode); + // IOException type is important for ServerImpl. It may search the cause (X.hasCause). + // The connection error processing behavior depends on it. + throw new IOException("Received unexpected byte while reading discovery message: " + serMode); } - byte b0 = (byte)in.read(); - byte b1 = (byte)in.read(); - - Message msg = spi.messageFactory().create(makeMessageType(b0, b1)); + Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); msgReader.reset(); msgReader.setBuffer(msgBuf); @@ -185,19 +184,47 @@ T readMessage() throws IgniteCheckedException, IOException { boolean finished; - do { - // Should be cleared before first operation. - msgBuf.clear(); + msgBuf.clear(); - int read = in.read(msgBuf.array(), 0, msgBuf.limit()); + do { + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); if (read == -1) throw new EOFException("Connection closed before message was fully read."); - msgBuf.limit(read); + if (msgBuf.position() > 0) { + msgBuf.limit(msgBuf.position() + read); + + // We've stored an unprocessed tail before. + msgBuf.rewind(); + } + else + msgBuf.limit(read); finished = msgSer.readFrom(msg, msgReader); - } while (!finished); + + // We rely on the fact that Discovery only sends next message upon receiving a receipt for the previous one. + // This behaviour guarantees that we never read a next message from the buffer right after the end of + // the previous message. + assert msgBuf.remaining() == 0 || !finished : "Some data was read from the socket but left unprocessed."; + + if (finished) + break; + + // We must keep the uprocessed bytes read from the socket. It won't return them again. + byte[] unprocessedTail = null; + + if (msgBuf.remaining() > 0) { + unprocessedTail = new byte[msgBuf.remaining()]; + msgBuf.get(unprocessedTail, 0, msgBuf.remaining()); + } + + msgBuf.clear(); + + if (unprocessedTail != null) + msgBuf.put(unprocessedTail); + } + while (true); return (T)msg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java new file mode 100644 index 0000000000000..3fbfc2faa7164 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCacheMetricsMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * We cannot directly reuse {@link CacheMetricsMessage} in Discovery as it is registered in a message factory of + * Communication component and thus is unavailable in Discovery. We have to extend {@link CacheMetricsMessage} and + * register this subclass in message factory of Discovery component. + */ +public class TcpDiscoveryCacheMetricsMessage extends CacheMetricsMessage { + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryCacheMetricsMessage() { + // No-op. + } + + /** @param cacheMetricsMsg Cache metric message. */ + public TcpDiscoveryCacheMetricsMessage(CacheMetrics cacheMetricsMsg) { + super(cacheMetricsMsg); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -103; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryCacheMetricsMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java index e6835320fa5b2..37787cc5e905b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMetricsUpdateMessage.java @@ -17,25 +17,19 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.UUID; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; /** * Metrics update message. @@ -52,20 +46,24 @@ * second pass). */ @TcpDiscoveryRedirectToClient -public class TcpDiscoveryMetricsUpdateMessage extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryMetricsUpdateMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; - /** Map to store nodes metrics. */ + /** Connected clients metrics: server id -> client id -> clients metrics. */ @GridToStringExclude - private final Map metrics = new HashMap<>(); + @Order(value = 5, method = "connectedClientsMetricsMessages") + private Map connectedClientsMetricsMsgs; - /** Client node IDs. */ - private final Collection clientNodeIds = new HashSet<>(); - - /** Cahce metrics by node. */ + /** Servers full metrics: server id -> server metrics + metrics of server's caches. */ @GridToStringExclude - private final Map> cacheMetrics = new HashMap<>(); + @Order(value = 6, method = "serversFullMetricsMessages") + private @Nullable Map serversFullMetricsMsgs; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryMetricsUpdateMessage() { + // No-op. + } /** * Constructor. @@ -79,139 +77,118 @@ public TcpDiscoveryMetricsUpdateMessage(UUID creatorNodeId) { /** * Sets metrics for particular node. * - * @param nodeId Node ID. - * @param metrics Node metrics. + * @param srvrId Server ID. + * @param newMetrics New server metrics to add. */ - public void setMetrics(UUID nodeId, ClusterMetrics metrics) { - assert nodeId != null; - assert metrics != null; - assert !this.metrics.containsKey(nodeId); + public void addServerMetrics(UUID srvrId, ClusterMetrics newMetrics) { + assert srvrId != null; + assert newMetrics != null; + + if (serversFullMetricsMsgs == null) + serversFullMetricsMsgs = new HashMap<>(); + + assert !serversFullMetricsMsgs.containsKey(srvrId); - this.metrics.put(nodeId, new MetricsSet(metrics)); + serversFullMetricsMsgs.compute(srvrId, (srvrId0, srvrFullMetrics) -> { + if (srvrFullMetrics == null) + srvrFullMetrics = new TcpDiscoveryNodeFullMetricsMessage(); + + srvrFullMetrics.nodeMetricsMessage(new TcpDiscoveryNodeMetricsMessage(newMetrics)); + + return srvrFullMetrics; + }); } /** * Sets cache metrics for particular node. * - * @param nodeId Node ID. - * @param metrics Node cache metrics. + * @param srvrId Server ID. + * @param newCachesMetrics News server's caches metrics to add. */ - public void setCacheMetrics(UUID nodeId, Map metrics) { - assert nodeId != null; - assert metrics != null; - assert !this.cacheMetrics.containsKey(nodeId); + public void addServerCacheMetrics(UUID srvrId, Map newCachesMetrics) { + assert srvrId != null; + assert newCachesMetrics != null; - if (!F.isEmpty(metrics)) - this.cacheMetrics.put(nodeId, metrics); - } + if (serversFullMetricsMsgs == null) + serversFullMetricsMsgs = new HashMap<>(); - /** - * Sets metrics for a client node. - * - * @param nodeId Server node ID. - * @param clientNodeId Client node ID. - * @param metrics Node metrics. - */ - public void setClientMetrics(UUID nodeId, UUID clientNodeId, ClusterMetrics metrics) { - assert nodeId != null; - assert clientNodeId != null; - assert metrics != null; - assert this.metrics.containsKey(nodeId); + assert serversFullMetricsMsgs.containsKey(srvrId) && serversFullMetricsMsgs.get(srvrId).cachesMetricsMessages() == null; - this.metrics.get(nodeId).addClientMetrics(clientNodeId, metrics); - } + serversFullMetricsMsgs.compute(srvrId, (srvrId0, srvrFullMetrics) -> { + if (srvrFullMetrics == null) + srvrFullMetrics = new TcpDiscoveryNodeFullMetricsMessage(); - /** - * Removes metrics for particular node from the message. - * - * @param nodeId Node ID. - */ - public void removeMetrics(UUID nodeId) { - assert nodeId != null; + Map newCachesMsgsMap = U.newHashMap(newCachesMetrics.size()); - metrics.remove(nodeId); - } + newCachesMetrics.forEach((cacheId, cacheMetrics) -> + newCachesMsgsMap.put(cacheId, new TcpDiscoveryCacheMetricsMessage(cacheMetrics))); - /** - * Removes cache metrics for particular node from the message. - * - * @param nodeId Node ID. - */ - public void removeCacheMetrics(UUID nodeId) { - assert nodeId != null; + srvrFullMetrics.cachesMetricsMessages(newCachesMsgsMap); - cacheMetrics.remove(nodeId); + return srvrFullMetrics; + }); } /** - * Gets metrics map. + * Sets metrics for a connected client node. * - * @return Metrics map. + * @param srvrId Server node ID. + * @param clientNodeId Connected client node ID. + * @param clientMetrics Client metrics. */ - public Map metrics() { - return metrics; - } + public void setClientMetrics(UUID srvrId, UUID clientNodeId, ClusterMetrics clientMetrics) { + assert srvrId != null; + assert clientNodeId != null; + assert clientMetrics != null; - /** - * Gets cache metrics map. - * - * @return Cache metrics map. - */ - public Map> cacheMetrics() { - return cacheMetrics; - } + if (connectedClientsMetricsMsgs == null) + connectedClientsMetricsMsgs = new HashMap<>(); - /** - * @return {@code True} if this message contains metrics. - */ - public boolean hasMetrics() { - return !metrics.isEmpty(); - } + assert !connectedClientsMetricsMsgs.containsKey(srvrId) + || connectedClientsMetricsMsgs.get(srvrId).nodesMetricsMessages().get(clientNodeId) == null; - /** - * @return {@code True} this message contains cache metrics. - */ - public boolean hasCacheMetrics() { - return !cacheMetrics.isEmpty(); - } + connectedClientsMetricsMsgs.compute(srvrId, (srvrId0, clientsMetricsMsg) -> { + if (clientsMetricsMsg == null) { + clientsMetricsMsg = new TcpDiscoveryNodesMetricsMapMessage(); + clientsMetricsMsg.nodesMetricsMessages(new HashMap<>()); + } - /** - * @param nodeId Node ID. - * @return {@code True} if this message contains metrics. - */ - public boolean hasMetrics(UUID nodeId) { - assert nodeId != null; + clientsMetricsMsg.nodesMetricsMessages().put(clientNodeId, new TcpDiscoveryNodeMetricsMessage(clientMetrics)); - return metrics.get(nodeId) != null; + return clientsMetricsMsg; + }); } /** - * @param nodeId Node ID. + * Removes metrics for particular server node from the message. * - * @return {@code True} if this message contains cache metrics for particular node. + * @param srvrId Server ID. */ - public boolean hasCacheMetrics(UUID nodeId) { - assert nodeId != null; + public void removeServerMetrics(UUID srvrId) { + assert srvrId != null; + assert serversFullMetricsMsgs != null; - return cacheMetrics.get(nodeId) != null; + serversFullMetricsMsgs.remove(srvrId); } - /** - * Gets client node IDs for particular node. - * - * @return Client node IDs. - */ - public Collection clientNodeIds() { - return clientNodeIds; + /** @return Map of server full metrics messages. */ + public Map serversFullMetricsMessages() { + return serversFullMetricsMsgs; } - /** - * Adds client node ID. - * - * @param clientNodeId Client node ID. - */ - public void addClientNodeId(UUID clientNodeId) { - clientNodeIds.add(clientNodeId); + /** @param serversFullMetricsMsgs Map of server full metrics messages. */ + public void serversFullMetricsMessages(Map serversFullMetricsMsgs) { + this.serversFullMetricsMsgs = serversFullMetricsMsgs; + } + + /** @return Map of nodes metrics messages. */ + public @Nullable Map connectedClientsMetricsMessages() { + return connectedClientsMetricsMsgs; + } + + /** @param connectedClientsMetricsMsgs Map of nodes metrics messages. */ + public void connectedClientsMetricsMessages(Map connectedClientsMetricsMsgs) { + this.connectedClientsMetricsMsgs = connectedClientsMetricsMsgs; } /** {@inheritDoc} */ @@ -220,115 +197,12 @@ public void addClientNodeId(UUID clientNodeId) { } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoveryMetricsUpdateMessage.class, this, "super", super.toString()); + @Override public short directType() { + return 13; } - /** - * @param nodeId Node ID. - * @param metrics Metrics. - * @return Serialized metrics. - */ - private static byte[] serializeMetrics(UUID nodeId, ClusterMetrics metrics) { - assert nodeId != null; - assert metrics != null; - - byte[] buf = new byte[16 + ClusterMetricsSnapshot.METRICS_SIZE]; - - U.longToBytes(nodeId.getMostSignificantBits(), buf, 0); - U.longToBytes(nodeId.getLeastSignificantBits(), buf, 8); - - ClusterMetricsSnapshot.serialize(buf, 16, metrics); - - return buf; - } - - /** - */ - @SuppressWarnings("PublicInnerClass") - public static class MetricsSet implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Metrics. */ - private byte[] metrics; - - /** Client metrics. */ - private Collection clientMetrics; - - /** - */ - public MetricsSet() { - // No-op. - } - - /** - * @param metrics Metrics. - */ - public MetricsSet(ClusterMetrics metrics) { - assert metrics != null; - - this.metrics = ClusterMetricsSnapshot.serialize(metrics); - } - - /** - * @return Deserialized metrics. - */ - public ClusterMetrics metrics() { - return ClusterMetricsSnapshot.deserialize(metrics, 0); - } - - /** - * @return Client metrics. - */ - public Collection> clientMetrics() { - return F.viewReadOnly(clientMetrics, new C1>() { - @Override public T2 apply(byte[] bytes) { - UUID nodeId = new UUID(U.bytesToLong(bytes, 0), U.bytesToLong(bytes, 8)); - - return new T2<>(nodeId, ClusterMetricsSnapshot.deserialize(bytes, 16)); - } - }); - } - - /** - * @param nodeId Client node ID. - * @param metrics Client metrics. - */ - private void addClientMetrics(UUID nodeId, ClusterMetrics metrics) { - assert nodeId != null; - assert metrics != null; - - if (clientMetrics == null) - clientMetrics = new ArrayList<>(); - - clientMetrics.add(serializeMetrics(nodeId, metrics)); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeByteArray(out, metrics); - - out.writeInt(clientMetrics != null ? clientMetrics.size() : -1); - - if (clientMetrics != null) { - for (byte[] arr : clientMetrics) - U.writeByteArray(out, arr); - } - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - metrics = U.readByteArray(in); - - int clientMetricsSize = in.readInt(); - - if (clientMetricsSize >= 0) { - clientMetrics = new ArrayList<>(clientMetricsSize); - - for (int i = 0; i < clientMetricsSize; i++) - clientMetrics.add(U.readByteArray(in)); - } - } + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryMetricsUpdateMessage.class, this, "super", super.toString()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java new file mode 100644 index 0000000000000..7d121816da41d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFullMetricsMessage.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * We cannot directly reuse {@link NodeFullMetricsMessage} in Discovery as it is registered in a message factory of + * Communication component and thus is unavailable in Discovery. We have to extend {@link NodeFullMetricsMessage} and + * register this subclass in message factory of Discovery component. + */ +public class TcpDiscoveryNodeFullMetricsMessage extends NodeFullMetricsMessage { + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeFullMetricsMessage() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -105; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodeFullMetricsMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java new file mode 100644 index 0000000000000..f62b3df08728f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeMetricsMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * We cannot directly reuse {@link NodeMetricsMessage} in Discovery as it is registered in a message factory of + * Communication component and thus is unavailable in Discovery. We have to extend {@link NodeMetricsMessage} and + * register this subclass in message factory of Discovery component. + */ +public class TcpDiscoveryNodeMetricsMessage extends NodeMetricsMessage { + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeMetricsMessage() { + // No-op. + } + + /** @param nodeMetrics Node metrics. */ + public TcpDiscoveryNodeMetricsMessage(ClusterMetrics nodeMetrics) { + super(nodeMetrics); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -102; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodeMetricsMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java new file mode 100644 index 0000000000000..8ce0234cc06af --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Holds map of nodes metrics messages per node id. */ +public class TcpDiscoveryNodesMetricsMapMessage implements Message { + /** Map of nodes metrics messages per node id. */ + @Order(value = 0, method = "nodesMetricsMessages") + private Map nodesMetricsMsgs; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodesMetricsMapMessage() { + // No-op. + } + + /** @return Map of nodes metrics messages per node id. */ + public Map nodesMetricsMessages() { + return nodesMetricsMsgs; + } + + /** @param nodesMetricsMsgs Map of nodes metrics messages per node id. */ + public void nodesMetricsMessages(Map nodesMetricsMsgs) { + this.nodesMetricsMsgs = nodesMetricsMsgs; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -104; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryNodesMetricsMapMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java index d21204e99e1f2..b084c5f4517ea 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -92,7 +93,7 @@ public void testCacheSize() throws Exception { TcpDiscoveryMetricsUpdateMessage msg = new TcpDiscoveryMetricsUpdateMessage(UUID.randomUUID()); - msg.setCacheMetrics(UUID.randomUUID(), cacheMetrics); + msg.addServerCacheMetrics(UUID.randomUUID(), cacheMetrics); Marshaller marshaller = marshaller(grid(0)); @@ -104,11 +105,13 @@ public void testCacheSize() throws Exception { TcpDiscoveryMetricsUpdateMessage msg2 = (TcpDiscoveryMetricsUpdateMessage)readObj; - Map cacheMetrics2 = msg2.cacheMetrics().values().iterator().next(); + Map cacheMetrics2 = msg2.serversFullMetricsMessages().values().iterator().next() + .cachesMetricsMessages(); - CacheMetrics cacheMetric2 = cacheMetrics2.values().iterator().next(); + CacheMetrics cacheMetric2 = new CacheMetricsSnapshot(cacheMetrics2.values().iterator().next()); - assertEquals("TcpDiscoveryMetricsUpdateMessage serialization error, cacheSize is different", size, cacheMetric2.getCacheSize()); + assertEquals("TcpDiscoveryMetricsUpdateMessage serialization error, cacheSize is different", size, + cacheMetric2.getCacheSize()); IgniteCache cacheNode1 = grid(1).cache(DEFAULT_CACHE_NAME);