Skip to content

Commit 0f50569

Browse files
committed
CNDB-7237: add metrics to track replica response sizes
1 parent 71f1fc0 commit 0f50569

11 files changed

+646
-0
lines changed

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,8 @@ public enum CassandraRelevantProperties
435435
TABLE_METRICS_DEFAULT_HISTOGRAMS_AGGREGATION("cassandra.table_metrics_default_histograms_aggregation", TableMetrics.MetricsAggregation.INDIVIDUAL.name()),
436436
// Determines if table metrics should be also exported to shared global metric
437437
TABLE_METRICS_EXPORT_GLOBALS("cassandra.table_metrics_export_globals", "true"),
438+
// Enable/disable replica response size metrics collection
439+
REPLICA_RESPONSE_SIZE_METRICS_ENABLED("cassandra.replica_response_size_metrics_enabled", "true"),
438440
FILE_CACHE_SIZE_IN_MB("cassandra.file_cache_size_in_mb", "2048"),
439441
CUSTOM_HINTS_RATE_LIMITER_FACTORY("cassandra.custom_hints_rate_limiter_factory"),
440442

src/java/org/apache/cassandra/db/MultiRangeReadResponse.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ public String toDebugString(ReadCommand command, DecoratedKey key)
108108
throw new UnsupportedOperationException();
109109
}
110110

111+
@Override
112+
public boolean supportsResponseSizeTracking()
113+
{
114+
return false;
115+
}
116+
111117
/**
112118
* A local response that is not meant to be serialized or used for caching remote endpoint's multi-range response.
113119
*/
@@ -235,6 +241,12 @@ public boolean isDigestResponse()
235241
{
236242
return false;
237243
}
244+
245+
@Override
246+
public boolean supportsResponseSizeTracking()
247+
{
248+
return false;
249+
}
238250
}
239251

240252
/**

src/java/org/apache/cassandra/db/ReadResponse.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,18 @@ public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data
7575

7676
public abstract boolean isDigestResponse();
7777

78+
/**
79+
* Indicates whether this response type supports response size tracking for metrics.
80+
* Some response types (like MultiRangeReadResponse) may not support payload size calculation
81+
* and will throw UnsupportedOperationException when attempting to serialize for size calculation.
82+
*
83+
* @return true if this response supports size tracking, false otherwise
84+
*/
85+
public boolean supportsResponseSizeTracking()
86+
{
87+
return true;
88+
}
89+
7890
/**
7991
* Creates a string of the requested partition in this read response suitable for debugging.
8092
*/
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.cassandra.metrics;
19+
20+
import com.codahale.metrics.Counter;
21+
import com.codahale.metrics.Histogram;
22+
23+
import org.apache.cassandra.config.CassandraRelevantProperties;
24+
25+
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
26+
27+
/**
28+
* Metrics for tracking result sizes coming from replicas/writers to coordinators.
29+
*/
30+
public class ReplicaResponseSizeMetrics
31+
{
32+
private static final String TYPE = "ReplicaResponseSize";
33+
34+
/**
35+
* Controls whether replica response size metrics collection is enabled.
36+
*/
37+
private static final boolean METRICS_ENABLED = CassandraRelevantProperties.REPLICA_RESPONSE_SIZE_METRICS_ENABLED.getBoolean();
38+
39+
/** Total bytes received from replicas in response messages */
40+
public static final Counter totalBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "TotalBytesReceived", null));
41+
42+
/** Histogram of response sizes from replicas */
43+
public static final Histogram bytesReceivedPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesReceivedPerResponse", null), true);
44+
45+
/** Total bytes received from replicas in read responses */
46+
public static final Counter readResponseBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "ReadResponseBytesReceived", null));
47+
48+
/** Histogram of read response sizes from replicas */
49+
public static final Histogram readResponseBytesPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "ReadResponseBytesPerResponse", null), true);
50+
51+
/** Total bytes received from replicas in write responses */
52+
public static final Counter writeResponseBytesReceived = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "WriteResponseBytesReceived", null));
53+
54+
/** Histogram of write response sizes from replicas */
55+
public static final Histogram writeResponseBytesPerResponse = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "WriteResponseBytesPerResponse", null), true);
56+
57+
/**
58+
* Check if metrics collection is enabled
59+
* @return true if metrics are enabled, false otherwise
60+
*/
61+
public static boolean isMetricsEnabled()
62+
{
63+
return METRICS_ENABLED;
64+
}
65+
66+
/**
67+
* Record the size of a read response received from a replica
68+
* @param responseSize the size of the response in bytes
69+
*/
70+
public static void recordReadResponseSize(int responseSize)
71+
{
72+
if (!METRICS_ENABLED)
73+
return;
74+
75+
totalBytesReceived.inc(responseSize);
76+
bytesReceivedPerResponse.update(responseSize);
77+
readResponseBytesReceived.inc(responseSize);
78+
readResponseBytesPerResponse.update(responseSize);
79+
}
80+
81+
/**
82+
* Record the size of a write response received from a replica
83+
* @param responseSize the size of the response in bytes
84+
*/
85+
public static void recordWriteResponseSize(int responseSize)
86+
{
87+
if (!METRICS_ENABLED)
88+
return;
89+
90+
totalBytesReceived.inc(responseSize);
91+
bytesReceivedPerResponse.update(responseSize);
92+
writeResponseBytesReceived.inc(responseSize);
93+
writeResponseBytesPerResponse.update(responseSize);
94+
}
95+
}

src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import org.apache.cassandra.sensors.RequestSensors;
4848
import org.apache.cassandra.sensors.RequestTracker;
4949
import org.apache.cassandra.utils.concurrent.SimpleCondition;
50+
import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics;
51+
import org.apache.cassandra.net.MessagingService;
5052

5153
import static java.util.concurrent.TimeUnit.NANOSECONDS;
5254
import static org.apache.cassandra.locator.Replicas.countInOurDc;
@@ -262,6 +264,24 @@ public boolean waitingFor(InetAddressAndPort from)
262264
* null message means "response from local write"
263265
*/
264266
public abstract void onResponse(Message<T> msg);
267+
268+
/**
269+
* Track the size of a response message from a replica
270+
* @param msg the response message
271+
*/
272+
protected void trackReplicaResponseSize(Message<T> msg)
273+
{
274+
if (!ReplicaResponseSizeMetrics.isMetricsEnabled())
275+
return;
276+
277+
// Only track remote responses (local responses have null from field)
278+
// Also check that we have a valid payload and serializer
279+
if (msg != null && msg.from() != null && msg.payload != null && msg.verb().serializer() != null)
280+
{
281+
int responseSize = msg.payloadSize(MessagingService.current_version);
282+
ReplicaResponseSizeMetrics.recordWriteResponseSize(responseSize);
283+
}
284+
}
265285

266286
public void signal()
267287
{

src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
7373

7474
public void onResponse(Message<T> message)
7575
{
76+
trackReplicaResponseSize(message);
7677
try
7778
{
7879
String dataCenter = message == null

src/java/org/apache/cassandra/service/WriteResponseHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.cassandra.net.Message;
2828
import org.apache.cassandra.db.WriteType;
2929
import org.apache.cassandra.net.Verb;
30+
import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics;
3031

3132
/**
3233
* Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels.
@@ -64,6 +65,7 @@ public boolean trackLatencyForSnitch(Verb responseVerb, boolean isTimeout)
6465
@Override
6566
public void onResponse(Message<T> m)
6667
{
68+
trackReplicaResponseSize(m);
6769
if (responsesUpdater.decrementAndGet(this) == 0)
6870
signal();
6971
//Must be last after all subclass processing

src/java/org/apache/cassandra/service/reads/ReadCallback.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import org.apache.cassandra.sensors.RequestTracker;
4343
import org.apache.cassandra.tracing.Tracing;
4444
import org.apache.cassandra.utils.concurrent.SimpleCondition;
45+
import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics;
46+
import org.apache.cassandra.net.MessagingService;
4547

4648
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4749

@@ -159,7 +161,10 @@ public int blockFor()
159161
public void onResponse(Message<ReadResponse> message)
160162
{
161163
assertWaitingFor(message.from());
164+
162165
resolver.preprocess(message);
166+
167+
trackReplicaResponseSize(message);
163168

164169
/*
165170
* Ensure that data is present and the response accumulator has properly published the
@@ -171,6 +176,25 @@ public void onResponse(Message<ReadResponse> message)
171176
condition.signalAll();
172177
}
173178

179+
/**
180+
* Track the size of a response message from a replica
181+
* @param message the response message
182+
*/
183+
private void trackReplicaResponseSize(Message<ReadResponse> message)
184+
{
185+
if (!ReplicaResponseSizeMetrics.isMetricsEnabled())
186+
return;
187+
188+
// Only track remote responses (local responses have null from field)
189+
// check that we have a valid payload and serializer and the response type supports size tracking
190+
if (message != null && message.from() != null && message.payload != null
191+
&& message.verb().serializer() != null && message.payload.supportsResponseSizeTracking())
192+
{
193+
int responseSize = message.payloadSize(MessagingService.current_version);
194+
ReplicaResponseSizeMetrics.recordReadResponseSize(responseSize);
195+
}
196+
}
197+
174198
public void response(ReadResponse result)
175199
{
176200
Verb kind = command.isRangeRequest() ? Verb.RANGE_RSP : Verb.READ_RSP;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.cassandra.metrics;
19+
20+
import org.junit.BeforeClass;
21+
import org.junit.Test;
22+
23+
import com.codahale.metrics.Snapshot;
24+
25+
import static org.junit.Assert.*;
26+
27+
public class ReplicaResponseSizeMetricsTest
28+
{
29+
@BeforeClass
30+
public static void setup()
31+
{
32+
// Note: Counter metrics cannot be reset, so tests track deltas
33+
}
34+
35+
@Test
36+
public void testReadResponseMetrics()
37+
{
38+
long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount();
39+
long initialRead = ReplicaResponseSizeMetrics.readResponseBytesReceived.getCount();
40+
41+
int responseSize = 1024;
42+
ReplicaResponseSizeMetrics.recordReadResponseSize(responseSize);
43+
44+
assertEquals(initialTotal + responseSize, ReplicaResponseSizeMetrics.totalBytesReceived.getCount());
45+
assertEquals(initialRead + responseSize, ReplicaResponseSizeMetrics.readResponseBytesReceived.getCount());
46+
47+
Snapshot readSnapshot = ReplicaResponseSizeMetrics.readResponseBytesPerResponse.getSnapshot();
48+
assertTrue(readSnapshot.size() > 0);
49+
assertTrue(readSnapshot.getMax() >= responseSize);
50+
}
51+
52+
@Test
53+
public void testWriteResponseMetrics()
54+
{
55+
long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount();
56+
long initialWrite = ReplicaResponseSizeMetrics.writeResponseBytesReceived.getCount();
57+
58+
int responseSize = 256;
59+
ReplicaResponseSizeMetrics.recordWriteResponseSize(responseSize);
60+
61+
assertEquals(initialTotal + responseSize, ReplicaResponseSizeMetrics.totalBytesReceived.getCount());
62+
assertEquals(initialWrite + responseSize, ReplicaResponseSizeMetrics.writeResponseBytesReceived.getCount());
63+
64+
Snapshot writeSnapshot = ReplicaResponseSizeMetrics.writeResponseBytesPerResponse.getSnapshot();
65+
assertTrue(writeSnapshot.size() > 0);
66+
}
67+
68+
@Test
69+
public void testMultipleResponses()
70+
{
71+
long initialTotal = ReplicaResponseSizeMetrics.totalBytesReceived.getCount();
72+
73+
int[] sizes = {100, 200, 300, 400, 500};
74+
int expectedTotal = 0;
75+
76+
for (int size : sizes)
77+
{
78+
// Alternate between read and write responses
79+
if (size % 2 == 0)
80+
ReplicaResponseSizeMetrics.recordReadResponseSize(size);
81+
else
82+
ReplicaResponseSizeMetrics.recordWriteResponseSize(size);
83+
expectedTotal += size;
84+
}
85+
86+
assertEquals(initialTotal + expectedTotal, ReplicaResponseSizeMetrics.totalBytesReceived.getCount());
87+
88+
Snapshot totalSnapshot = ReplicaResponseSizeMetrics.bytesReceivedPerResponse.getSnapshot();
89+
assertTrue(totalSnapshot.size() >= sizes.length);
90+
assertTrue(totalSnapshot.getMean() > 0);
91+
}
92+
}

0 commit comments

Comments
 (0)