Skip to content

Commit 1e08f48

Browse files
metrics collection bugfix
1 parent 5e84cba commit 1e08f48

File tree

4 files changed

+42
-5
lines changed

4 files changed

+42
-5
lines changed

.idea/runConfigurations/kafka_clients_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

superstream-clients/dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>ai.superstream</groupId>
55
<artifactId>superstream-clients</artifactId>
66
<name>Superstream Kafka Client Optimizer</name>
7-
<version>1.0.206</version>
7+
<version>1.0.207</version>
88
<description>A Java library that dynamically optimizes Kafka client configuration based on recommendations</description>
99
<url>https://github.com/superstreamlabs/superstream-clients-java</url>
1010
<developers>

superstream-clients/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>ai.superstream</groupId>
88
<artifactId>superstream-clients</artifactId>
9-
<version>1.0.206</version>
9+
<version>1.0.207</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Superstream Kafka Client Optimizer</name>

superstream-clients/src/main/java/ai/superstream/core/ClientStatsReporter.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,31 @@ private static String normalizeBootstrapServers(String servers) {
201201
* present in {@code metrics} overwrites the previous value – even when the new
202202
* value is {@code 0.0}, negative or otherwise – but keys that are <em>missing</em>
203203
* are left untouched.
204+
* <p>
205+
* Special handling for compression-rate-avg and record-size-avg: if the new value is 0 and the previous
206+
* value is greater than 0, preserve the previous value.
204207
*/
205208
public void updateProducerMetrics(java.util.Map<String, Double> metrics) {
206209
if (!disabled && metrics != null) {
207210
latestMetrics.updateAndGet(prev -> {
208211
java.util.Map<String, Double> merged = new java.util.HashMap<>(prev);
209-
merged.putAll(metrics);
212+
213+
// Special handling for compression-rate-avg and record-size-avg
214+
for (java.util.Map.Entry<String, Double> entry : metrics.entrySet()) {
215+
String key = entry.getKey();
216+
Double newValue = entry.getValue();
217+
218+
if (("compression-rate-avg".equals(key) || "record-size-avg".equals(key)) && newValue != null && newValue == 0.0) {
219+
Double prevValue = merged.get(key);
220+
if (prevValue != null && prevValue > 0.0) {
221+
// Keep the previous non-zero value instead of overwriting with 0
222+
continue;
223+
}
224+
}
225+
226+
merged.put(key, newValue);
227+
}
228+
210229
return merged;
211230
});
212231
}
@@ -216,12 +235,30 @@ public void updateProducerMetrics(java.util.Map<String, Double> metrics) {
216235
* Merge the latest per-topic metrics. Same rationale as above, but we first
217236
* locate / create the nested map for each topic, then merge its individual
218237
* metric values.
238+
* <p>
239+
* Special handling for compression-rate: if the new value is 0 and the previous
240+
* value is greater than 0, preserve the previous value.
219241
*/
220242
public void updateTopicMetrics(java.util.Map<String, java.util.Map<String, Double>> topicMetrics) {
221243
if (!disabled && topicMetrics != null) {
222244
topicMetrics.forEach((topic, metricMap) -> {
223245
java.util.Map<String, Double> existing = latestTopicMetrics.computeIfAbsent(topic, k -> new java.util.HashMap<>());
224-
existing.putAll(metricMap);
246+
247+
// Special handling for compression-rate
248+
for (java.util.Map.Entry<String, Double> entry : metricMap.entrySet()) {
249+
String key = entry.getKey();
250+
Double newValue = entry.getValue();
251+
252+
if ("compression-rate".equals(key) && newValue != null && newValue == 0.0) {
253+
Double prevValue = existing.get(key);
254+
if (prevValue != null && prevValue > 0.0) {
255+
// Keep the previous non-zero value instead of overwriting with 0
256+
continue;
257+
}
258+
}
259+
260+
existing.put(key, newValue);
261+
}
225262
});
226263
}
227264
}

0 commit comments

Comments
 (0)