diff --git a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java index 8a757d8e9399..28cc87cf917b 100644 --- a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java +++ b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java @@ -88,4 +88,11 @@ public DroppedMessageMetrics(Verb verb) crossNodeDroppedLatency = Metrics.timer(createMetricName(TYPE, "CrossNodeDroppedLatency", scope)); } } + + public DroppedMessageMetrics(MetricNameFactory factory, String type, String scope) + { + dropped = Metrics.meter(factory.createMetricName("Dropped")); + internalDroppedLatency = Metrics.timer(factory.createMetricName("InternalDroppedLatency")); + crossNodeDroppedLatency = Metrics.timer(factory.createMetricName("CrossNodeDroppedLatency")); + } } diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java index 2f580cd483b0..589f9cc353eb 100644 --- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java +++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java @@ -98,6 +98,9 @@ private static final class DroppedForVerb // total dropped message counts for server lifetime private final Map droppedMessages = new ConcurrentHashMap<>(); + + // dropped mutations by table + private final Map droppedMutationsByTable = new ConcurrentHashMap<>(); public MessagingMetrics() { @@ -160,6 +163,25 @@ public void recordTotalMessageProcessingTime(Verb verb, InetAddressAndPort from, public void recordDroppedMessage(Message message, long timeElapsed, TimeUnit timeUnit) { recordDroppedMessage(message.verb(), timeElapsed, timeUnit, message.isCrossNode()); + + if (message.verb() == Verb.MUTATION_REQ && message.payload instanceof org.apache.cassandra.db.Mutation) + { + org.apache.cassandra.db.Mutation mutation = (org.apache.cassandra.db.Mutation) message.payload; + for (org.apache.cassandra.db.partitions.PartitionUpdate update : mutation.getPartitionUpdates()) + { + String tableKey = update.metadata().keyspace + '.' + update.metadata().name; + DroppedMessageMetrics tableMetrics = droppedMutationsByTable.computeIfAbsent(tableKey, + k -> { + DefaultNameFactory tableFactory = new DefaultNameFactory("DroppedMutations", tableKey); + return new DroppedMessageMetrics(tableFactory, "DroppedMutations", tableKey); + }); + tableMetrics.dropped.mark(); + if (message.isCrossNode()) + tableMetrics.crossNodeDroppedLatency.update(timeElapsed, timeUnit); + else + tableMetrics.internalDroppedLatency.update(timeElapsed, timeUnit); + } + } } public void recordDroppedMessage(Verb verb, long timeElapsed, TimeUnit timeUnit, boolean isCrossNode) @@ -200,6 +222,14 @@ public Map getDroppedMessages() map.put(entry.getKey().toString(), (int) entry.getValue().metrics.dropped.getCount()); return map; } + + public Map getDroppedMutationsByTable() + { + Map map = new HashMap<>(droppedMutationsByTable.size()); + for (Map.Entry entry : droppedMutationsByTable.entrySet()) + map.put(entry.getKey(), entry.getValue().dropped.getCount()); + return map; + } private void logDroppedMessages() { diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java index 886459e7f6c3..edf488d33ca6 100644 --- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java +++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java @@ -99,6 +99,11 @@ public interface MessagingServiceMBean * dropped message counts for server lifetime */ public Map getDroppedMessages(); + + /** + * dropped mutation counts by table for server lifetime + */ + public Map getDroppedMutationsByTable(); /** * Total number of timeouts happened on this node diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java b/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java index b77fa8396e1d..4d499bf1fea1 100644 --- a/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java +++ b/src/java/org/apache/cassandra/net/MessagingServiceMBeanImpl.java @@ -222,6 +222,12 @@ public Map getDroppedMessages() { return metrics.getDroppedMessages(); } + + @Override + public Map getDroppedMutationsByTable() + { + return metrics.getDroppedMutationsByTable(); + } @Override public long getTotalTimeouts()