From 8faafece73cbe41bc4b6ccc516e2636556091e8b Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Mon, 7 Jul 2025 13:22:14 -0500 Subject: [PATCH] CNDB-7237: add metrics to track replica response sizes --- .../metrics/ReplicaResponseSizeMetrics.java | 71 ++++++++++++++ .../service/AbstractWriteResponseHandler.java | 15 +++ .../DatacenterSyncWriteResponseHandler.java | 1 + .../service/WriteResponseHandler.java | 2 + .../cassandra/service/reads/ReadCallback.java | 7 ++ .../ReplicaResponseSizeMetricsTest.java | 98 +++++++++++++++++++ 6 files changed, 194 insertions(+) create mode 100644 src/java/org/apache/cassandra/metrics/ReplicaResponseSizeMetrics.java create mode 100644 test/unit/org/apache/cassandra/metrics/ReplicaResponseSizeMetricsTest.java diff --git a/src/java/org/apache/cassandra/metrics/ReplicaResponseSizeMetrics.java b/src/java/org/apache/cassandra/metrics/ReplicaResponseSizeMetrics.java new file mode 100644 index 000000000000..df56cee0daa1 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/ReplicaResponseSizeMetrics.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +/** + * Metrics for tracking result sizes coming from replicas/writers to coordinators. + */ +public class ReplicaResponseSizeMetrics +{ + private static final String TYPE = "ReplicaResponseSize"; + + /** Total bytes received from replicas in response messages */ + public static final Counter totalBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "TotalBytesReceived", null)); + + /** Histogram of response sizes from replicas */ + public static final Histogram bytesReceivedPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesReceivedPerResponse", null), true); + + /** Total bytes received from replicas in read responses */ + public static final Counter readResponseBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "ReadResponseBytesReceived", null)); + + /** Histogram of read response sizes from replicas */ + public static final Histogram readResponseBytesPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "ReadResponseBytesPerResponse", null), true); + + /** Total bytes received from replicas in write responses */ + public static final Counter writeResponseBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "WriteResponseBytesReceived", null)); + + /** Histogram of write response sizes from replicas */ + public static final Histogram writeResponseBytesPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "WriteResponseBytesPerResponse", null), true); + + /** + * Record the size of a response received from a replica + * @param responseSize the size of the response in bytes + * @param isReadResponse true if this is a read response, false for write response + */ + public static void recordReplicaResponseSize(int responseSize, boolean isReadResponse) + { + totalBytesReceived.inc(responseSize); + bytesReceivedPerResponse.update(responseSize); + + if (isReadResponse) + { + readResponseBytesReceived.inc(responseSize); + readResponseBytesPerResponse.update(responseSize); + } + else + { + writeResponseBytesReceived.inc(responseSize); + writeResponseBytesPerResponse.update(responseSize); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 5711df1afed3..ec42cb50d4a4 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -47,6 +47,8 @@ import org.apache.cassandra.sensors.RequestSensors; import org.apache.cassandra.sensors.RequestTracker; import org.apache.cassandra.utils.concurrent.SimpleCondition; +import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics; +import org.apache.cassandra.net.MessagingService; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.locator.Replicas.countInOurDc; @@ -262,6 +264,19 @@ public boolean waitingFor(InetAddressAndPort from) * null message means "response from local write" */ public abstract void onResponse(Message msg); + + /** + * Track the size of a response message from a replica + * @param msg the response message + */ + protected void trackReplicaResponseSize(Message msg) + { + if (msg != null) + { + int responseSize = msg.payloadSize(MessagingService.current_version); + ReplicaResponseSizeMetrics.recordReplicaResponseSize(responseSize, false); + } + } public void signal() { diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index 018fea2ec936..9597c7d516b2 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -73,6 +73,7 @@ public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, public void onResponse(Message message) { + trackReplicaResponseSize(message); try { String dataCenter = message == null diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 83667cc7dc66..aa9d95d38051 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -27,6 +27,7 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics; /** * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels. @@ -64,6 +65,7 @@ public boolean trackLatencyForSnitch(Verb responseVerb, boolean isTimeout) @Override public void onResponse(Message m) { + trackReplicaResponseSize(m); if (responsesUpdater.decrementAndGet(this) == 0) signal(); //Must be last after all subclass processing diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java index 074f947cd21e..436b32a1bbae 100644 --- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java @@ -42,6 +42,8 @@ import org.apache.cassandra.sensors.RequestTracker; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.concurrent.SimpleCondition; +import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics; +import org.apache.cassandra.net.MessagingService; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -159,6 +161,11 @@ public int blockFor() public void onResponse(Message message) { assertWaitingFor(message.from()); + + // Track the response size from replica to coordinator + int responseSize = message.payloadSize(MessagingService.current_version); + ReplicaResponseSizeMetrics.recordReplicaResponseSize(responseSize, true); + resolver.preprocess(message); /* diff --git a/test/unit/org/apache/cassandra/metrics/ReplicaResponseSizeMetricsTest.java b/test/unit/org/apache/cassandra/metrics/ReplicaResponseSizeMetricsTest.java new file mode 100644 index 000000000000..73be691c7059 --- /dev/null +++ b/test/unit/org/apache/cassandra/metrics/ReplicaResponseSizeMetricsTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.metrics; + +import org.junit.BeforeClass; +import org.junit.Test; + +import com.codahale.metrics.Snapshot; + +import static org.junit.Assert.*; + +public class ReplicaResponseSizeMetricsTest +{ + @BeforeClass + public static void setup() + { + // Note: Counter metrics cannot be reset, so tests track deltas + } + + @Test + public void testReadResponseMetrics() + { + long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount(); + long initialRead = ReplicaResponseSizeMetrics.readResponseBytesReceived.getCount(); + + // Record a read response + int responseSize = 1024; + ReplicaResponseSizeMetrics.recordReplicaResponseSize(responseSize, true); + + // Verify counters + assertEquals(initialTotal + responseSize, ReplicaResponseSizeMetrics.totalBytesReceived.getCount()); + assertEquals(initialRead + responseSize, ReplicaResponseSizeMetrics.readResponseBytesReceived.getCount()); + + // Verify histogram recorded the value + Snapshot readSnapshot = ReplicaResponseSizeMetrics.readResponseBytesPerResponse.getSnapshot(); + assertTrue(readSnapshot.size() > 0); + // Check that the histogram contains values in the expected range + assertTrue(readSnapshot.getMax() >= responseSize); + } + + @Test + public void testWriteResponseMetrics() + { + long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount(); + long initialWrite = ReplicaResponseSizeMetrics.writeResponseBytesReceived.getCount(); + + // Record a write response + int responseSize = 256; + ReplicaResponseSizeMetrics.recordReplicaResponseSize(responseSize, false); + + // Verify counters + assertEquals(initialTotal + responseSize, ReplicaResponseSizeMetrics.totalBytesReceived.getCount()); + assertEquals(initialWrite + responseSize, ReplicaResponseSizeMetrics.writeResponseBytesReceived.getCount()); + + // Verify histogram + Snapshot writeSnapshot = ReplicaResponseSizeMetrics.writeResponseBytesPerResponse.getSnapshot(); + assertTrue(writeSnapshot.size() > 0); + } + + @Test + public void testMultipleResponses() + { + long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount(); + + // Record multiple responses + int[] sizes = {100, 200, 300, 400, 500}; + int expectedTotal = 0; + + for (int size : sizes) + { + ReplicaResponseSizeMetrics.recordReplicaResponseSize(size, size % 2 == 0); + expectedTotal += size; + } + + // Verify total + assertEquals(initialTotal + expectedTotal, ReplicaResponseSizeMetrics.totalBytesReceived.getCount()); + + // Verify histogram captures all sizes + Snapshot totalSnapshot = ReplicaResponseSizeMetrics.bytesReceivedPerResponse.getSnapshot(); + assertTrue(totalSnapshot.size() >= sizes.length); + assertTrue(totalSnapshot.getMean() > 0); + } +} \ No newline at end of file