Skip to content

Commit fac2280

Browse files
committed
Add publishAverage support across push registries
- Added DistributionStatisticConfig.publishAverage flag handling in: • DatadogMeterRegistry • CloudWatchMeterRegistry • HumioMeterRegistry - Captured per-meter publishAverage at meter creation (Timer, Summary, FunctionTimer) - Updated AVG emission logic to respect the per-meter flag - Defaulted FunctionTimer to DistributionStatisticConfig.DEFAULT - No changes to protocols or non-AVG metrics This enables users to disable average publishing consistently across all push-based registries.
1 parent 33d71a2 commit fac2280

File tree

5 files changed

+194
-17
lines changed

5 files changed

+194
-17
lines changed

implementations/micrometer-registry-cloudwatch2/src/main/java/io/micrometer/cloudwatch2/CloudWatchMeterRegistry.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.micrometer.common.util.internal.logging.WarnThenDebugLogger;
2020
import io.micrometer.core.instrument.Timer;
2121
import io.micrometer.core.instrument.*;
22+
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
23+
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
2224
import io.micrometer.core.instrument.step.StepMeterRegistry;
2325
import io.micrometer.core.instrument.util.NamedThreadFactory;
2426
import org.jspecify.annotations.Nullable;
@@ -33,9 +35,12 @@
3335

3436
import java.time.Instant;
3537
import java.util.*;
38+
import java.util.concurrent.ConcurrentHashMap;
3639
import java.util.concurrent.CountDownLatch;
3740
import java.util.concurrent.ThreadFactory;
3841
import java.util.concurrent.TimeUnit;
42+
import java.util.function.ToDoubleFunction;
43+
import java.util.function.ToLongFunction;
3944
import java.util.stream.Stream;
4045

4146
import static java.util.stream.Collectors.toList;
@@ -79,6 +84,11 @@ public class CloudWatchMeterRegistry extends StepMeterRegistry {
7984

8085
private static final WarnThenDebugLogger tooManyTagsLogger = new WarnThenDebugLogger(CloudWatchMeterRegistry.class);
8186

87+
/**
88+
* Cache of per-meter `publishAverage` flags captured when meters are created.
89+
*/
90+
private final Map<Meter.Id, Boolean> publishAvgFlags = new ConcurrentHashMap<>();
91+
8292
public CloudWatchMeterRegistry(CloudWatchConfig config, Clock clock, CloudWatchAsyncClient cloudWatchAsyncClient) {
8393
this(config, clock, cloudWatchAsyncClient, new NamedThreadFactory("cloudwatch-metrics-publisher"));
8494
}
@@ -93,6 +103,27 @@ public CloudWatchMeterRegistry(CloudWatchConfig config, Clock clock, CloudWatchA
93103
start(threadFactory);
94104
}
95105

106+
@Override
107+
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig config, PauseDetector pauseDetector) {
108+
publishAvgFlags.put(id, config.isPublishingAverage());
109+
return super.newTimer(id, config, pauseDetector);
110+
}
111+
112+
@Override
113+
protected DistributionSummary newDistributionSummary(Meter.Id id, DistributionStatisticConfig config,
114+
double scale) {
115+
publishAvgFlags.put(id, config.isPublishingAverage());
116+
return super.newDistributionSummary(id, config, scale);
117+
}
118+
119+
@Override
120+
protected <T> FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction<T> countFunction,
121+
ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit) {
122+
// FunctionTimer does NOT get DistributionStatisticConfig → fallback to DEFAULT
123+
publishAvgFlags.put(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage());
124+
return super.newFunctionTimer(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit);
125+
}
126+
96127
@Override
97128
protected void publish() {
98129
boolean interrupted = false;
@@ -193,10 +224,17 @@ Stream<MetricDatum> timerData(Timer timer) {
193224
.add(metricDatum(timer.getId(), "sum", getBaseTimeUnit().name(), timer.totalTime(getBaseTimeUnit())));
194225
long count = timer.count();
195226
metrics.add(metricDatum(timer.getId(), "count", StandardUnit.COUNT, (double) count));
227+
Boolean publishAvg = publishAvgFlags.getOrDefault(timer.getId(),
228+
DistributionStatisticConfig.DEFAULT.isPublishingAverage());
229+
196230
if (count > 0) {
197-
metrics.add(metricDatum(timer.getId(), "avg", getBaseTimeUnit().name(), timer.mean(getBaseTimeUnit())));
231+
if (publishAvg) {
232+
metrics.add(
233+
metricDatum(timer.getId(), "avg", getBaseTimeUnit().name(), timer.mean(getBaseTimeUnit())));
234+
}
198235
metrics.add(metricDatum(timer.getId(), "max", getBaseTimeUnit().name(), timer.max(getBaseTimeUnit())));
199236
}
237+
200238
return metrics.build();
201239
}
202240

@@ -206,10 +244,16 @@ Stream<MetricDatum> summaryData(DistributionSummary summary) {
206244
metrics.add(metricDatum(summary.getId(), "sum", summary.totalAmount()));
207245
long count = summary.count();
208246
metrics.add(metricDatum(summary.getId(), "count", StandardUnit.COUNT, (double) count));
247+
Boolean publishAvg = publishAvgFlags.getOrDefault(summary.getId(),
248+
DistributionStatisticConfig.DEFAULT.isPublishingAverage());
249+
209250
if (count > 0) {
210-
metrics.add(metricDatum(summary.getId(), "avg", summary.mean()));
251+
if (publishAvg) {
252+
metrics.add(metricDatum(summary.getId(), "avg", summary.mean()));
253+
}
211254
metrics.add(metricDatum(summary.getId(), "max", summary.max()));
212255
}
256+
213257
return metrics.build();
214258
}
215259

@@ -247,9 +291,15 @@ Stream<MetricDatum> functionTimerData(FunctionTimer timer) {
247291
double count = timer.count();
248292
metrics.add(metricDatum(timer.getId(), "count", StandardUnit.COUNT, count));
249293
metrics.add(metricDatum(timer.getId(), "sum", sum));
294+
Boolean publishAvg = publishAvgFlags.getOrDefault(timer.getId(),
295+
DistributionStatisticConfig.DEFAULT.isPublishingAverage());
296+
250297
if (count > 0) {
251-
metrics.add(metricDatum(timer.getId(), "avg", timer.mean(getBaseTimeUnit())));
298+
if (publishAvg) {
299+
metrics.add(metricDatum(timer.getId(), "avg", timer.mean(getBaseTimeUnit())));
300+
}
252301
}
302+
253303
return metrics.build();
254304
}
255305

implementations/micrometer-registry-datadog/src/main/java/io/micrometer/datadog/DatadogConfig.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,4 @@ default Validated<?> validate() {
7676
checkRequired("uri", DatadogConfig::uri));
7777
}
7878

79-
default boolean publishAverage() {
80-
return getBoolean(this, "publishAverage").orElse(true);
81-
}
82-
8379
}

implementations/micrometer-registry-datadog/src/main/java/io/micrometer/datadog/DatadogMeterRegistry.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package io.micrometer.datadog;
1717

1818
import io.micrometer.core.instrument.*;
19+
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
20+
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
1921
import io.micrometer.core.instrument.step.StepMeterRegistry;
2022
import io.micrometer.core.instrument.util.MeterPartition;
2123
import io.micrometer.core.instrument.util.NamedThreadFactory;
@@ -33,6 +35,8 @@
3335
import java.util.concurrent.ConcurrentHashMap;
3436
import java.util.concurrent.ThreadFactory;
3537
import java.util.concurrent.TimeUnit;
38+
import java.util.function.ToDoubleFunction;
39+
import java.util.function.ToLongFunction;
3640
import java.util.stream.Stream;
3741
import java.util.Objects;
3842

@@ -54,6 +58,11 @@ public class DatadogMeterRegistry extends StepMeterRegistry {
5458

5559
private final HttpSender httpClient;
5660

61+
/**
62+
* Cache of per-meter `publishAverage` settings captured at meter creation time.
63+
*/
64+
private final Map<Meter.Id, Boolean> publishAvgFlags = new ConcurrentHashMap<>();
65+
5766
/**
5867
* Metric names for which we have posted metadata concerning type and base unit
5968
*/
@@ -105,6 +114,29 @@ public void start(ThreadFactory threadFactory) {
105114
super.start(threadFactory);
106115
}
107116

117+
@Override
118+
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig config, PauseDetector pauseDetector) {
119+
publishAvgFlags.put(id, config.isPublishingAverage());
120+
return super.newTimer(id, config, pauseDetector);
121+
}
122+
123+
@Override
124+
protected DistributionSummary newDistributionSummary(Meter.Id id, DistributionStatisticConfig config,
125+
double scale) {
126+
publishAvgFlags.put(id, config.isPublishingAverage());
127+
return super.newDistributionSummary(id, config, scale);
128+
}
129+
130+
@Override
131+
protected <T> FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction<T> countFunction,
132+
ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit) {
133+
// FunctionTimer never receives a DistributionStatisticConfig.
134+
// So we fall back to DEFAULT.isPublishingAverage().
135+
136+
publishAvgFlags.put(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage());
137+
return super.newFunctionTimer(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit);
138+
}
139+
108140
@Override
109141
protected void publish() {
110142
Map<String, DatadogMetricMetadata> metadataToSend = new HashMap<>();
@@ -162,7 +194,7 @@ private Stream<String> writeTimer(FunctionTimer timer, Map<String, DatadogMetric
162194

163195
addToMetadataList(metadata, id, "count", Statistic.COUNT, "occurrence");
164196

165-
if (config.publishAverage()) {
197+
if (publishAvgFlags.getOrDefault(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage())) {
166198
addToMetadataList(metadata, id, "avg", Statistic.VALUE, null);
167199
}
168200

@@ -172,7 +204,7 @@ private Stream<String> writeTimer(FunctionTimer timer, Map<String, DatadogMetric
172204
// timer
173205
return Stream.of(writeMetric(id, "count", wallTime, timer.count(), Statistic.COUNT, "occurrence"),
174206

175-
config.publishAverage()
207+
publishAvgFlags.getOrDefault(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage())
176208
? writeMetric(id, "avg", wallTime, timer.mean(getBaseTimeUnit()), Statistic.VALUE, null) : null,
177209

178210
writeMetric(id, "sum", wallTime, timer.totalTime(getBaseTimeUnit()), Statistic.TOTAL_TIME, null))
@@ -193,7 +225,7 @@ private Stream<String> writeTimer(Timer timer, Map<String, DatadogMetricMetadata
193225
addToMetadataList(metadata, id, "count", Statistic.COUNT, "occurrence");
194226

195227
// publish avg only when enabled
196-
if (config.publishAverage()) {
228+
if (publishAvgFlags.getOrDefault(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage())) {
197229
metrics.add(writeMetric(id, "avg", wallTime, timer.mean(getBaseTimeUnit()), Statistic.VALUE, null));
198230
addToMetadataList(metadata, id, "avg", Statistic.VALUE, null);
199231
}
@@ -217,7 +249,7 @@ private Stream<String> writeSummary(DistributionSummary summary, Map<String, Dat
217249
addToMetadataList(metadata, id, "count", Statistic.COUNT, "occurrence");
218250

219251
// avg (only when enabled)
220-
if (config.publishAverage()) {
252+
if (publishAvgFlags.getOrDefault(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage())) {
221253
metrics.add(writeMetric(id, "avg", wallTime, summary.mean(), Statistic.VALUE, null));
222254
addToMetadataList(metadata, id, "avg", Statistic.VALUE, null);
223255
}

implementations/micrometer-registry-humio/src/main/java/io/micrometer/humio/HumioMeterRegistry.java

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.micrometer.humio;
1717

1818
import io.micrometer.core.instrument.*;
19+
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
1920
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
2021
import io.micrometer.core.instrument.step.StepMeterRegistry;
2122
import io.micrometer.core.instrument.util.DoubleFormat;
@@ -32,8 +33,11 @@
3233
import java.util.ArrayList;
3334
import java.util.List;
3435
import java.util.Map;
36+
import java.util.concurrent.ConcurrentHashMap;
3537
import java.util.concurrent.ThreadFactory;
3638
import java.util.concurrent.TimeUnit;
39+
import java.util.function.ToDoubleFunction;
40+
import java.util.function.ToLongFunction;
3741

3842
import static io.micrometer.core.instrument.util.StringEscapeUtils.escapeJson;
3943
import static java.util.stream.Collectors.joining;
@@ -52,6 +56,11 @@ public class HumioMeterRegistry extends StepMeterRegistry {
5256

5357
private final Logger logger = LoggerFactory.getLogger(HumioMeterRegistry.class);
5458

59+
/**
60+
* Cache of per-meter `publishAverage` flags captured when meters are created.
61+
*/
62+
private final Map<Meter.Id, Boolean> publishAvgFlags = new ConcurrentHashMap<>();
63+
5564
private final HumioConfig config;
5665

5766
private final HttpSender httpClient;
@@ -126,6 +135,32 @@ protected void publish() {
126135
}
127136
}
128137

138+
@Override
139+
protected Timer newTimer(Meter.Id id, io.micrometer.core.instrument.distribution.DistributionStatisticConfig config,
140+
io.micrometer.core.instrument.distribution.pause.PauseDetector pauseDetector) {
141+
142+
publishAvgFlags.put(id, config.isPublishingAverage());
143+
return super.newTimer(id, config, pauseDetector);
144+
}
145+
146+
@Override
147+
protected DistributionSummary newDistributionSummary(Meter.Id id,
148+
io.micrometer.core.instrument.distribution.DistributionStatisticConfig config, double scale) {
149+
150+
publishAvgFlags.put(id, config.isPublishingAverage());
151+
return super.newDistributionSummary(id, config, scale);
152+
}
153+
154+
@Override
155+
protected <T> FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction<T> countFunction,
156+
ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit) {
157+
158+
// FunctionTimer has NO DistributionStatisticConfig → fallback to DEFAULT
159+
publishAvgFlags.put(id,
160+
io.micrometer.core.instrument.distribution.DistributionStatisticConfig.DEFAULT.isPublishingAverage());
161+
return super.newFunctionTimer(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit);
162+
}
163+
129164
@Override
130165
protected TimeUnit getBaseTimeUnit() {
131166
return TimeUnit.MILLISECONDS;
@@ -225,8 +260,21 @@ String writeCounter(Counter counter) {
225260

226261
// VisibleForTesting
227262
String writeFunctionTimer(FunctionTimer timer) {
228-
return writeEvent(timer, event("count", timer.count()), event("sum", timer.totalTime(getBaseTimeUnit())),
229-
event("avg", timer.mean(getBaseTimeUnit())));
263+
Boolean publishAvg = publishAvgFlags.getOrDefault(timer.getId(),
264+
io.micrometer.core.instrument.distribution.DistributionStatisticConfig.DEFAULT
265+
.isPublishingAverage());
266+
267+
Attribute avgAttr = publishAvg ? event("avg", timer.mean(getBaseTimeUnit())) : null;
268+
269+
if (avgAttr != null) {
270+
return writeEvent(timer, event("count", timer.count()),
271+
event("sum", timer.totalTime(getBaseTimeUnit())), avgAttr);
272+
}
273+
else {
274+
return writeEvent(timer, event("count", timer.count()),
275+
event("sum", timer.totalTime(getBaseTimeUnit())));
276+
}
277+
230278
}
231279

232280
// VisibleForTesting
@@ -238,15 +286,42 @@ String writeLongTaskTimer(LongTaskTimer timer) {
238286
// VisibleForTesting
239287
String writeTimer(Timer timer) {
240288
HistogramSnapshot snap = timer.takeSnapshot();
241-
return writeEvent(timer, event("count", (double) snap.count()), event("sum", snap.total(getBaseTimeUnit())),
242-
event("avg", snap.mean(getBaseTimeUnit())), event("max", snap.max(getBaseTimeUnit())));
289+
290+
Boolean publishAvg = publishAvgFlags.getOrDefault(timer.getId(),
291+
DistributionStatisticConfig.DEFAULT.isPublishingAverage());
292+
293+
Attribute avgAttr = publishAvg ? event("avg", snap.mean(getBaseTimeUnit())) : null;
294+
295+
if (avgAttr != null) {
296+
return writeEvent(timer, event("count", (double) snap.count()),
297+
event("sum", snap.total(getBaseTimeUnit())), avgAttr,
298+
event("max", snap.max(getBaseTimeUnit())));
299+
}
300+
else {
301+
return writeEvent(timer, event("count", (double) snap.count()),
302+
event("sum", snap.total(getBaseTimeUnit())), event("max", snap.max(getBaseTimeUnit())));
303+
}
304+
243305
}
244306

245307
// VisibleForTesting
246308
String writeSummary(DistributionSummary summary) {
247309
HistogramSnapshot snap = summary.takeSnapshot();
248-
return writeEvent(summary, event("count", (double) snap.count()), event("sum", snap.total()),
249-
event("avg", snap.mean()), event("max", snap.max()));
310+
Boolean publishAvg = publishAvgFlags.getOrDefault(summary.getId(),
311+
io.micrometer.core.instrument.distribution.DistributionStatisticConfig.DEFAULT
312+
.isPublishingAverage());
313+
314+
Attribute avgAttr = publishAvg ? event("avg", snap.mean()) : null;
315+
316+
if (avgAttr != null) {
317+
return writeEvent(summary, event("count", (double) snap.count()), event("sum", snap.total()), avgAttr,
318+
event("max", snap.max()));
319+
}
320+
else {
321+
return writeEvent(summary, event("count", (double) snap.count()), event("sum", snap.total()),
322+
event("max", snap.max()));
323+
}
324+
250325
}
251326

252327
// VisibleForTesting

0 commit comments

Comments
 (0)