Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.micrometer.common.util.internal.logging.WarnThenDebugLogger;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepMeterRegistry;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import org.jspecify.annotations.Nullable;
Expand All @@ -33,9 +35,12 @@

import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -79,6 +84,11 @@ public class CloudWatchMeterRegistry extends StepMeterRegistry {

private static final WarnThenDebugLogger tooManyTagsLogger = new WarnThenDebugLogger(CloudWatchMeterRegistry.class);

/**
* Cache of per-meter `publishAverage` flags captured when meters are created.
*/
private final Map<Meter.Id, Boolean> publishAvgFlags = new ConcurrentHashMap<>();

public CloudWatchMeterRegistry(CloudWatchConfig config, Clock clock, CloudWatchAsyncClient cloudWatchAsyncClient) {
this(config, clock, cloudWatchAsyncClient, new NamedThreadFactory("cloudwatch-metrics-publisher"));
}
Expand All @@ -93,6 +103,27 @@ public CloudWatchMeterRegistry(CloudWatchConfig config, Clock clock, CloudWatchA
start(threadFactory);
}

@Override
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig config, PauseDetector pauseDetector) {
publishAvgFlags.put(id, config.isPublishingAverage());
return super.newTimer(id, config, pauseDetector);
}

@Override
protected DistributionSummary newDistributionSummary(Meter.Id id, DistributionStatisticConfig config,
double scale) {
publishAvgFlags.put(id, config.isPublishingAverage());
return super.newDistributionSummary(id, config, scale);
}

@Override
protected <T> FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction<T> countFunction,
ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit) {
// FunctionTimer does NOT get DistributionStatisticConfig → fallback to DEFAULT
publishAvgFlags.put(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage());
return super.newFunctionTimer(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit);
}

@Override
protected void publish() {
boolean interrupted = false;
Expand Down Expand Up @@ -193,10 +224,17 @@ Stream<MetricDatum> timerData(Timer timer) {
.add(metricDatum(timer.getId(), "sum", getBaseTimeUnit().name(), timer.totalTime(getBaseTimeUnit())));
long count = timer.count();
metrics.add(metricDatum(timer.getId(), "count", StandardUnit.COUNT, (double) count));
Boolean publishAvg = publishAvgFlags.getOrDefault(timer.getId(),
DistributionStatisticConfig.DEFAULT.isPublishingAverage());

if (count > 0) {
metrics.add(metricDatum(timer.getId(), "avg", getBaseTimeUnit().name(), timer.mean(getBaseTimeUnit())));
if (publishAvg) {
metrics.add(
metricDatum(timer.getId(), "avg", getBaseTimeUnit().name(), timer.mean(getBaseTimeUnit())));
}
metrics.add(metricDatum(timer.getId(), "max", getBaseTimeUnit().name(), timer.max(getBaseTimeUnit())));
}

return metrics.build();
}

Expand All @@ -206,10 +244,16 @@ Stream<MetricDatum> summaryData(DistributionSummary summary) {
metrics.add(metricDatum(summary.getId(), "sum", summary.totalAmount()));
long count = summary.count();
metrics.add(metricDatum(summary.getId(), "count", StandardUnit.COUNT, (double) count));
Boolean publishAvg = publishAvgFlags.getOrDefault(summary.getId(),
DistributionStatisticConfig.DEFAULT.isPublishingAverage());

if (count > 0) {
metrics.add(metricDatum(summary.getId(), "avg", summary.mean()));
if (publishAvg) {
metrics.add(metricDatum(summary.getId(), "avg", summary.mean()));
}
metrics.add(metricDatum(summary.getId(), "max", summary.max()));
}

return metrics.build();
}

Expand Down Expand Up @@ -247,9 +291,15 @@ Stream<MetricDatum> functionTimerData(FunctionTimer timer) {
double count = timer.count();
metrics.add(metricDatum(timer.getId(), "count", StandardUnit.COUNT, count));
metrics.add(metricDatum(timer.getId(), "sum", sum));
Boolean publishAvg = publishAvgFlags.getOrDefault(timer.getId(),
DistributionStatisticConfig.DEFAULT.isPublishingAverage());

if (count > 0) {
metrics.add(metricDatum(timer.getId(), "avg", timer.mean(getBaseTimeUnit())));
if (publishAvg) {
metrics.add(metricDatum(timer.getId(), "avg", timer.mean(getBaseTimeUnit())));
}
}

return metrics.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.micrometer.datadog;

import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepMeterRegistry;
import io.micrometer.core.instrument.util.MeterPartition;
import io.micrometer.core.instrument.util.NamedThreadFactory;
Expand All @@ -33,7 +35,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Stream;
import java.util.Objects;

import static io.micrometer.core.instrument.util.StringEscapeUtils.escapeJson;
import static java.util.stream.Collectors.joining;
Expand All @@ -53,6 +58,11 @@ public class DatadogMeterRegistry extends StepMeterRegistry {

private final HttpSender httpClient;

/**
* Cache of per-meter `publishAverage` settings captured at meter creation time.
*/
private final Map<Meter.Id, Boolean> publishAvgFlags = new ConcurrentHashMap<>();

/**
* Metric names for which we have posted metadata concerning type and base unit
*/
Expand Down Expand Up @@ -104,6 +114,29 @@ public void start(ThreadFactory threadFactory) {
super.start(threadFactory);
}

@Override
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig config, PauseDetector pauseDetector) {
publishAvgFlags.put(id, config.isPublishingAverage());
return super.newTimer(id, config, pauseDetector);
}

@Override
protected DistributionSummary newDistributionSummary(Meter.Id id, DistributionStatisticConfig config,
double scale) {
publishAvgFlags.put(id, config.isPublishingAverage());
return super.newDistributionSummary(id, config, scale);
}

@Override
protected <T> FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction<T> countFunction,
ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit) {
// FunctionTimer never receives a DistributionStatisticConfig.
// So we fall back to DEFAULT.isPublishingAverage().

publishAvgFlags.put(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage());
return super.newFunctionTimer(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit);
}

@Override
protected void publish() {
Map<String, DatadogMetricMetadata> metadataToSend = new HashMap<>();
Expand Down Expand Up @@ -160,29 +193,44 @@ private Stream<String> writeTimer(FunctionTimer timer, Map<String, DatadogMetric
Meter.Id id = timer.getId();

addToMetadataList(metadata, id, "count", Statistic.COUNT, "occurrence");
addToMetadataList(metadata, id, "avg", Statistic.VALUE, null);

if (publishAvgFlags.getOrDefault(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage())) {
addToMetadataList(metadata, id, "avg", Statistic.VALUE, null);
}

addToMetadataList(metadata, id, "sum", Statistic.TOTAL_TIME, null);

// we can't know anything about max and percentiles originating from a function
// timer
return Stream.of(writeMetric(id, "count", wallTime, timer.count(), Statistic.COUNT, "occurrence"),
writeMetric(id, "avg", wallTime, timer.mean(getBaseTimeUnit()), Statistic.VALUE, null),
writeMetric(id, "sum", wallTime, timer.totalTime(getBaseTimeUnit()), Statistic.TOTAL_TIME, null));

publishAvgFlags.getOrDefault(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage())
? writeMetric(id, "avg", wallTime, timer.mean(getBaseTimeUnit()), Statistic.VALUE, null) : null,

writeMetric(id, "sum", wallTime, timer.totalTime(getBaseTimeUnit()), Statistic.TOTAL_TIME, null))
// filter out null avg when disabled
.filter(Objects::nonNull);
}

private Stream<String> writeTimer(Timer timer, Map<String, DatadogMetricMetadata> metadata) {
final long wallTime = clock.wallTime();
final Stream.Builder<String> metrics = Stream.builder();

Meter.Id id = timer.getId();
metrics.add(writeMetric(id, "sum", wallTime, timer.totalTime(getBaseTimeUnit()), Statistic.TOTAL_TIME, null));
metrics.add(writeMetric(id, "count", wallTime, (double) timer.count(), Statistic.COUNT, "occurrence"));
metrics.add(writeMetric(id, "avg", wallTime, timer.mean(getBaseTimeUnit()), Statistic.VALUE, null));
metrics.add(writeMetric(id, "max", wallTime, timer.max(getBaseTimeUnit()), Statistic.MAX, null));

metrics.add(writeMetric(id, "sum", wallTime, timer.totalTime(getBaseTimeUnit()), Statistic.TOTAL_TIME, null));
addToMetadataList(metadata, id, "sum", Statistic.TOTAL_TIME, null);

metrics.add(writeMetric(id, "count", wallTime, (double) timer.count(), Statistic.COUNT, "occurrence"));
addToMetadataList(metadata, id, "count", Statistic.COUNT, "occurrence");
addToMetadataList(metadata, id, "avg", Statistic.VALUE, null);

// publish avg only when enabled
if (publishAvgFlags.getOrDefault(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage())) {
metrics.add(writeMetric(id, "avg", wallTime, timer.mean(getBaseTimeUnit()), Statistic.VALUE, null));
addToMetadataList(metadata, id, "avg", Statistic.VALUE, null);
}

metrics.add(writeMetric(id, "max", wallTime, timer.max(getBaseTimeUnit()), Statistic.MAX, null));
addToMetadataList(metadata, id, "max", Statistic.MAX, null);

return metrics.build();
Expand All @@ -193,14 +241,20 @@ private Stream<String> writeSummary(DistributionSummary summary, Map<String, Dat
final Stream.Builder<String> metrics = Stream.builder();

Meter.Id id = summary.getId();
metrics.add(writeMetric(id, "sum", wallTime, summary.totalAmount(), Statistic.TOTAL, null));
metrics.add(writeMetric(id, "count", wallTime, (double) summary.count(), Statistic.COUNT, "occurrence"));
metrics.add(writeMetric(id, "avg", wallTime, summary.mean(), Statistic.VALUE, null));
metrics.add(writeMetric(id, "max", wallTime, summary.max(), Statistic.MAX, null));

metrics.add(writeMetric(id, "sum", wallTime, summary.totalAmount(), Statistic.TOTAL, null));
addToMetadataList(metadata, id, "sum", Statistic.TOTAL, null);

metrics.add(writeMetric(id, "count", wallTime, (double) summary.count(), Statistic.COUNT, "occurrence"));
addToMetadataList(metadata, id, "count", Statistic.COUNT, "occurrence");
addToMetadataList(metadata, id, "avg", Statistic.VALUE, null);

// avg (only when enabled)
if (publishAvgFlags.getOrDefault(id, DistributionStatisticConfig.DEFAULT.isPublishingAverage())) {
metrics.add(writeMetric(id, "avg", wallTime, summary.mean(), Statistic.VALUE, null));
addToMetadataList(metadata, id, "avg", Statistic.VALUE, null);
}

metrics.add(writeMetric(id, "max", wallTime, summary.max(), Statistic.MAX, null));
addToMetadataList(metadata, id, "max", Statistic.MAX, null);

return metrics.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.micrometer.humio;

import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.step.StepMeterRegistry;
import io.micrometer.core.instrument.util.DoubleFormat;
Expand All @@ -32,8 +33,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;

import static io.micrometer.core.instrument.util.StringEscapeUtils.escapeJson;
import static java.util.stream.Collectors.joining;
Expand All @@ -52,6 +56,11 @@ public class HumioMeterRegistry extends StepMeterRegistry {

private final Logger logger = LoggerFactory.getLogger(HumioMeterRegistry.class);

/**
* Cache of per-meter `publishAverage` flags captured when meters are created.
*/
private final Map<Meter.Id, Boolean> publishAvgFlags = new ConcurrentHashMap<>();

private final HumioConfig config;

private final HttpSender httpClient;
Expand Down Expand Up @@ -126,6 +135,32 @@ protected void publish() {
}
}

@Override
protected Timer newTimer(Meter.Id id, io.micrometer.core.instrument.distribution.DistributionStatisticConfig config,
io.micrometer.core.instrument.distribution.pause.PauseDetector pauseDetector) {

publishAvgFlags.put(id, config.isPublishingAverage());
return super.newTimer(id, config, pauseDetector);
}

@Override
protected DistributionSummary newDistributionSummary(Meter.Id id,
io.micrometer.core.instrument.distribution.DistributionStatisticConfig config, double scale) {

publishAvgFlags.put(id, config.isPublishingAverage());
return super.newDistributionSummary(id, config, scale);
}

@Override
protected <T> FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction<T> countFunction,
ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit) {

// FunctionTimer has NO DistributionStatisticConfig → fallback to DEFAULT
publishAvgFlags.put(id,
io.micrometer.core.instrument.distribution.DistributionStatisticConfig.DEFAULT.isPublishingAverage());
return super.newFunctionTimer(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit);
}

@Override
protected TimeUnit getBaseTimeUnit() {
return TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -225,8 +260,21 @@ String writeCounter(Counter counter) {

// VisibleForTesting
String writeFunctionTimer(FunctionTimer timer) {
return writeEvent(timer, event("count", timer.count()), event("sum", timer.totalTime(getBaseTimeUnit())),
event("avg", timer.mean(getBaseTimeUnit())));
Boolean publishAvg = publishAvgFlags.getOrDefault(timer.getId(),
io.micrometer.core.instrument.distribution.DistributionStatisticConfig.DEFAULT
.isPublishingAverage());

Attribute avgAttr = publishAvg ? event("avg", timer.mean(getBaseTimeUnit())) : null;

if (avgAttr != null) {
return writeEvent(timer, event("count", timer.count()),
event("sum", timer.totalTime(getBaseTimeUnit())), avgAttr);
}
else {
return writeEvent(timer, event("count", timer.count()),
event("sum", timer.totalTime(getBaseTimeUnit())));
}

}

// VisibleForTesting
Expand All @@ -238,15 +286,42 @@ String writeLongTaskTimer(LongTaskTimer timer) {
// VisibleForTesting
String writeTimer(Timer timer) {
HistogramSnapshot snap = timer.takeSnapshot();
return writeEvent(timer, event("count", (double) snap.count()), event("sum", snap.total(getBaseTimeUnit())),
event("avg", snap.mean(getBaseTimeUnit())), event("max", snap.max(getBaseTimeUnit())));

Boolean publishAvg = publishAvgFlags.getOrDefault(timer.getId(),
DistributionStatisticConfig.DEFAULT.isPublishingAverage());

Attribute avgAttr = publishAvg ? event("avg", snap.mean(getBaseTimeUnit())) : null;

if (avgAttr != null) {
return writeEvent(timer, event("count", (double) snap.count()),
event("sum", snap.total(getBaseTimeUnit())), avgAttr,
event("max", snap.max(getBaseTimeUnit())));
}
else {
return writeEvent(timer, event("count", (double) snap.count()),
event("sum", snap.total(getBaseTimeUnit())), event("max", snap.max(getBaseTimeUnit())));
}

}

// VisibleForTesting
String writeSummary(DistributionSummary summary) {
HistogramSnapshot snap = summary.takeSnapshot();
return writeEvent(summary, event("count", (double) snap.count()), event("sum", snap.total()),
event("avg", snap.mean()), event("max", snap.max()));
Boolean publishAvg = publishAvgFlags.getOrDefault(summary.getId(),
io.micrometer.core.instrument.distribution.DistributionStatisticConfig.DEFAULT
.isPublishingAverage());

Attribute avgAttr = publishAvg ? event("avg", snap.mean()) : null;

if (avgAttr != null) {
return writeEvent(summary, event("count", (double) snap.count()), event("sum", snap.total()), avgAttr,
event("max", snap.max()));
}
else {
return writeEvent(summary, event("count", (double) snap.count()), event("sum", snap.total()),
event("max", snap.max()));
}

}

// VisibleForTesting
Expand Down
Loading