Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCacheMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer;
Expand All @@ -29,6 +30,10 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryNodesMetricsMapMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
Expand All @@ -37,6 +42,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCacheMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse;
Expand All @@ -46,6 +52,10 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodesMetricsMapMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
Expand All @@ -54,6 +64,11 @@
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new,
new TcpDiscoveryNodeFullMetricsMessageSerializer());
factory.register((short)-104, TcpDiscoveryNodesMetricsMapMessage::new, new TcpDiscoveryNodesMetricsMapMessageSerializer());
factory.register((short)-103, TcpDiscoveryCacheMetricsMessage::new, new TcpDiscoveryCacheMetricsMessageSerializer());
factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer());
factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());

Expand All @@ -70,5 +85,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)10, TcpDiscoveryHandshakeResponse::new, new TcpDiscoveryHandshakeResponseSerializer());
factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer());
factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer());
factory.register((short)13, TcpDiscoveryMetricsUpdateMessage::new, new TcpDiscoveryMetricsUpdateMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ class ClusterNodeMetrics {

/** */
public ClusterNodeMetrics(NodeFullMetricsMessage msg) {
nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMsg());
nodeMetrics = new ClusterMetricsSnapshot(msg.nodeMetricsMessage());

cacheMetrics = new HashMap<>(msg.cachesMetrics().size(), 1.0f);
cacheMetrics = new HashMap<>(msg.cachesMetricsMessages().size(), 1.0f);

msg.cachesMetrics().entrySet().forEach(e -> cacheMetrics.put(e.getKey(), new CacheMetricsSnapshot(e.getValue())));
msg.cachesMetricsMessages().forEach((key, value) -> cacheMetrics.put(key, new CacheMetricsSnapshot(value)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,59 @@

package org.apache.ignite.internal.processors.cluster;

import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;

/** Node compound metrics message. */
public final class NodeFullMetricsMessage implements Message {
public class NodeFullMetricsMessage implements Message {
/** */
public static final short TYPE_CODE = 138;

/** Node metrics wrapper message. */
@Order(0)
@Order(value = 0, method = "nodeMetricsMessage")
private NodeMetricsMessage nodeMetricsMsg;

/** Cache metrics wrapper message. */
@Order(1)
private Map<Integer, CacheMetricsMessage> cachesMetrics;
@Order(value = 1, method = "cachesMetricsMessages")
private Map<Integer, CacheMetricsMessage> cachesMetricsMsgs;

/** Empty constructor for {@link GridIoMessageFactory}. */
public NodeFullMetricsMessage() {

// No-op.
}

/** */
public NodeFullMetricsMessage(ClusterMetrics nodeMetrics, Map<Integer, CacheMetrics> cacheMetrics) {
nodeMetricsMsg = new NodeMetricsMessage(nodeMetrics);

cachesMetrics = new HashMap<>(cacheMetrics.size(), 1.0f);
cachesMetricsMsgs = U.newHashMap(cacheMetrics.size());

cacheMetrics.forEach((key, value) -> cachesMetrics.put(key, new CacheMetricsMessage(value)));
cacheMetrics.forEach((key, value) -> cachesMetricsMsgs.put(key, new CacheMetricsMessage(value)));
}

/** */
public Map<Integer, CacheMetricsMessage> cachesMetrics() {
return cachesMetrics;
public Map<Integer, CacheMetricsMessage> cachesMetricsMessages() {
return cachesMetricsMsgs;
}

/** */
public void cachesMetrics(Map<Integer, CacheMetricsMessage> cacheMetricsMsg) {
cachesMetrics = cacheMetricsMsg;
public void cachesMetricsMessages(Map<Integer, CacheMetricsMessage> cacheMetricsMsg) {
cachesMetricsMsgs = cacheMetricsMsg;
}

/** */
public NodeMetricsMessage nodeMetricsMsg() {
public NodeMetricsMessage nodeMetricsMessage() {
return nodeMetricsMsg;
}

/** */
public void nodeMetricsMsg(NodeMetricsMessage nodeMetricsMsg) {
public void nodeMetricsMessage(NodeMetricsMessage nodeMetricsMsg) {
this.nodeMetricsMsg = nodeMetricsMsg;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2526,8 +2526,8 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
log.debug("Received metrics response: " + msg);
}
else {
if (msg.hasMetrics())
processMsgCacheMetrics(msg, System.nanoTime());
if (!F.isEmpty(msg.serversFullMetricsMessages()))
processCacheMetricsMessage(msg, System.nanoTime());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2208,8 +2208,7 @@ private boolean recordable(TcpDiscoveryAbstractMessage msg) {
* @param nodeId Node ID.
*/
private static void removeMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) {
msg.removeMetrics(nodeId);
msg.removeCacheMetrics(nodeId);
msg.removeServerMetrics(nodeId);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -3379,10 +3378,11 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
if (msgBytes == null) {
try {
msgBytes = U.marshal(spi.marshaller(), msg);
msgBytes = clientMsgWorker.ses.serializeMessage(msg);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal message: " + msg, e);
catch (IgniteCheckedException | IOException e) {
U.error(log, "Failed to serialize message to a client: " + msg + ", client id: "
+ clientMsgWorker.clientNodeId, e);

break;
}
Expand Down Expand Up @@ -6032,30 +6032,30 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {

long tsNanos = System.nanoTime();

if (spiStateCopy() == CONNECTED && msg.hasMetrics())
processMsgCacheMetrics(msg, tsNanos);
if (spiStateCopy() == CONNECTED && !F.isEmpty(msg.serversFullMetricsMessages()))
processCacheMetricsMessage(msg, tsNanos);

if (sendMessageToRemotes(msg)) {
if (laps == 0 && spiStateCopy() == CONNECTED) {
// Message is on its first ring or just created on coordinator.
msg.setMetrics(locNodeId, spi.metricsProvider.metrics());
msg.setCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics());
msg.addServerMetrics(locNodeId, spi.metricsProvider.metrics());
msg.addServerCacheMetrics(locNodeId, spi.metricsProvider.cacheMetrics());

for (Map.Entry<UUID, ClientMessageWorker> e : clientMsgWorkers.entrySet()) {
UUID nodeId = e.getKey();
ClusterMetrics metrics = e.getValue().metrics();

if (metrics != null)
msg.setClientMetrics(locNodeId, nodeId, metrics);

msg.addClientNodeId(nodeId);
}
}
else {
// Message is on its second ring.
removeMetrics(msg, locNodeId);
msg.removeServerMetrics(locNodeId);

Collection<UUID> clientNodeIds = msg.clientNodeIds();
Collection<UUID> clientNodeIds = F.isEmpty(msg.connectedClientsMetricsMessages())
? Collections.emptySet()
: msg.connectedClientsMetricsMessages().keySet();

for (TcpDiscoveryNode clientNode : ring.clientNodes()) {
if (clientNode.visible()) {
Expand All @@ -6065,9 +6065,7 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
if (!clientNode.clientAliveTimeSet())
clientNode.clientAliveTime(spi.clientFailureDetectionTimeout());

boolean aliveCheck = clientNode.isClientAlive();

if (!aliveCheck && isLocalNodeCoordinator()) {
if (!clientNode.isClientAlive() && isLocalNodeCoordinator()) {
boolean failedNode;

synchronized (mux) {
Expand Down Expand Up @@ -8416,7 +8414,8 @@ else if (laps == 1) {
private int passedLaps(TcpDiscoveryMetricsUpdateMessage msg) {
UUID locNodeId = getLocalNodeId();

boolean hasLocMetrics = hasMetrics(msg, locNodeId);
boolean hasLocMetrics = !F.isEmpty(msg.serversFullMetricsMessages())
&& msg.serversFullMetricsMessages().get(locNodeId) != null;

if (locNodeId.equals(msg.creatorNodeId()) && !hasLocMetrics && msg.senderNodeId() != null)
return 2;
Expand All @@ -8425,15 +8424,5 @@ else if (msg.senderNodeId() == null || !hasLocMetrics)
else
return 1;
}

/**
* @param msg Metrics update message to check.
* @param nodeId Node ID for which the check should be performed.
* @return {@code True} is the message contains metrics of the node with the provided ID.
* {@code False} otherwise.
*/
private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) {
return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot;
import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
import org.apache.ignite.internal.processors.tracing.NoopTracing;
import org.apache.ignite.internal.processors.tracing.Tracing;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -49,6 +53,8 @@
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodesMetricsMapMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -415,27 +421,45 @@ protected void clearNodeSensitiveData(TcpDiscoveryNode node) {
}

/** */
public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) {
for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();
public void processCacheMetricsMessage(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) {
for (Map.Entry<UUID, TcpDiscoveryNodeFullMetricsMessage> e : msg.serversFullMetricsMessages().entrySet()) {
UUID srvrId = e.getKey();
Map<Integer, CacheMetricsMessage> cacheMetricsMsgs = e.getValue().cachesMetricsMessages();
NodeMetricsMessage srvrMetricsMsg = e.getValue().nodeMetricsMessage();

TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();
assert srvrMetricsMsg != null;

Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.emptyMap();
Map<Integer, CacheMetrics> cacheMetrics;

if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis()
&& cacheMetrics.size() >= METRICS_QNT_WARN) {
if (!F.isEmpty(cacheMetricsMsgs)) {
cacheMetrics = U.newHashMap(cacheMetricsMsgs.size());

cacheMetricsMsgs.forEach((cacheId, cacheMetricsMsg) ->
cacheMetrics.put(cacheId, new CacheMetricsSnapshot(cacheMetricsMsg)));
}
else
cacheMetrics = Collections.emptyMap();

if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis() && cacheMetrics.size() >= METRICS_QNT_WARN) {
log.warning("The Discovery message has metrics for " + cacheMetrics.size() + " caches.\n" +
"To prevent Discovery blocking use -DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option.");

endTimeMetricsSizeProcessWait = U.currentTimeMillis() + LOG_WARN_MSG_TIMEOUT;
}

updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);
updateMetrics(srvrId, new ClusterMetricsSnapshot(srvrMetricsMsg), cacheMetrics, tsNanos);

TcpDiscoveryNodesMetricsMapMessage clientsMetricsMsg = F.isEmpty(msg.connectedClientsMetricsMessages())
? null
: msg.connectedClientsMetricsMessages().get(srvrId);

if (clientsMetricsMsg == null)
return;

assert clientsMetricsMsg.nodesMetricsMessages() != null;

for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
clientsMetricsMsg.nodesMetricsMessages().forEach((clientId, clientNodeMetricsMsg) ->
updateMetrics(clientId, new ClusterMetricsSnapshot(clientNodeMetricsMsg), cacheMetrics, tsNanos));
}
}

Expand Down
Loading