Skip to content

Commit c8be18f

Browse files
authored
Merge pull request #61 from JasmineJ1230/feature/metrics_exporter
Use delta agg.
2 parents d1d349f + a775e9c commit c8be18f

File tree

1 file changed

+33
-23
lines changed

1 file changed

+33
-23
lines changed

capa-spi-aws-telemetry/src/main/java/group/rxcloud/capa/spi/aws/telemetry/metrics/CloudWatchMetricsExporter.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import io.opentelemetry.api.common.Attributes;
2323
import io.opentelemetry.sdk.common.Clock;
2424
import io.opentelemetry.sdk.common.CompletableResultCode;
25+
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
2526
import io.opentelemetry.sdk.metrics.data.DoublePointData;
2627
import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData;
2728
import io.opentelemetry.sdk.metrics.data.LongPointData;
2829
import io.opentelemetry.sdk.metrics.data.MetricData;
2930
import io.opentelemetry.sdk.metrics.data.MetricDataType;
31+
import org.jetbrains.annotations.Nullable;
3032
import org.slf4j.Logger;
3133
import org.slf4j.LoggerFactory;
3234
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
@@ -66,7 +68,9 @@ public class CloudWatchMetricsExporter extends CapaMetricsExporterSpi {
6668
private static final int MAX_VALUE_LENGTH = 256;
6769

6870
private static final MetricsCache METRICS_CACHE = new MetricsCache();
71+
6972
private static final String APPID = "AppId";
73+
7074
private static final String UNKNOWN = "UNKNOWN";
7175

7276
public CloudWatchMetricsExporter(Supplier<SamplerConfig> samplerConfig) {
@@ -93,9 +97,9 @@ private static String getAppId() {
9397
static List<Dimension> buildDimension(Attributes attributes) {
9498
List<Dimension> dimensions = new ArrayList<>();
9599
dimensions.add(Dimension.builder()
96-
.name(APPID)
97-
.value(getAppId())
98-
.build());
100+
.name(APPID)
101+
.value(getAppId())
102+
.build());
99103
if (attributes.isEmpty()) {
100104
return dimensions;
101105
}
@@ -105,9 +109,9 @@ static List<Dimension> buildDimension(Attributes attributes) {
105109
valueStr = valueStr.substring(0, MAX_VALUE_LENGTH);
106110
}
107111
dimensions.add(Dimension.builder()
108-
.name(key.getKey())
109-
.value(valueStr)
110-
.build());
112+
.name(key.getKey())
113+
.value(valueStr)
114+
.build());
111115
});
112116
dimensions.sort(new Comparator<Dimension>() {
113117
@Override
@@ -139,7 +143,7 @@ static Map<String, List<CollectedMetrics>> collectedMetricsByNamespace(Collectio
139143

140144
Map<String, List<CollectedMetrics>> metricsMapGroupByNamespace = new HashMap<>();
141145
metricsMap.values()
142-
.forEach(m -> metricsMapGroupByNamespace.computeIfAbsent(m.nameSpace, k -> new ArrayList<>()).add(m));
146+
.forEach(m -> metricsMapGroupByNamespace.computeIfAbsent(m.nameSpace, k -> new ArrayList<>()).add(m));
143147
return metricsMapGroupByNamespace;
144148
}
145149

@@ -158,7 +162,7 @@ private static void processLongPoint(String namespace, String metricName, Map<St
158162
metricsMap.computeIfAbsent(
159163
getKey(namespace, metricName, millis, p.getAttributes()),
160164
k -> new CollectedMetrics(namespace, metricName, millis, buildDimension(p.getAttributes())))
161-
.addPoint(BigDecimal.valueOf(p.getValue()).doubleValue());
165+
.addPoint(BigDecimal.valueOf(p.getValue()).doubleValue());
162166
});
163167
}
164168

@@ -168,7 +172,7 @@ private static void processDoublePoint(String namespace, String metricName,
168172
long millis = TimeUnit.NANOSECONDS.toMillis(p.getEpochNanos());
169173
metricsMap.computeIfAbsent(getKey(namespace, metricName, millis, p.getAttributes()),
170174
k -> new CollectedMetrics(namespace, metricName, millis, buildDimension(p.getAttributes())))
171-
.addPoint(p.getValue());
175+
.addPoint(p.getValue());
172176
});
173177
}
174178

@@ -178,8 +182,8 @@ private static void processDoubleSummary(String namespace, String metricName,
178182
data.forEach(d -> {
179183
long millis = TimeUnit.NANOSECONDS.toMillis(d.getEpochNanos());
180184
StatisticSet.Builder setBuilder = StatisticSet.builder()
181-
.sum(d.getSum())
182-
.sampleCount(BigDecimal.valueOf(d.getCount()).doubleValue());
185+
.sum(d.getSum())
186+
.sampleCount(BigDecimal.valueOf(d.getCount()).doubleValue());
183187
if (d.getPercentileValues() != null) {
184188
d.getPercentileValues().forEach(percentile -> {
185189
if (Double.compare(0, percentile.getPercentile()) == 0) {
@@ -191,15 +195,15 @@ private static void processDoubleSummary(String namespace, String metricName,
191195
}
192196
metricsMap.computeIfAbsent(getKey(namespace, metricName, millis, d.getAttributes()),
193197
k -> new CollectedMetrics(namespace, metricName, millis, buildDimension(d.getAttributes())))
194-
.setStatisticSet(setBuilder.build());
198+
.setStatisticSet(setBuilder.build());
195199
});
196200
}
197201

198202
private static void send(String namespace, List<MetricDatum> data) {
199203
if (data != null && !data.isEmpty()) {
200204
PutMetricDataRequest request = PutMetricDataRequest.builder()
201-
.namespace(namespace)
202-
.metricData(data).build();
205+
.namespace(namespace)
206+
.metricData(data).build();
203207
PutMetricDataResponse response = CloudWatchClientProvider.get().putMetricData(request);
204208
if (!response.sdkHttpResponse().isSuccessful()) {
205209
log.info("Fail to export metrics to cloud watch. statusCode={}, msg={}.",
@@ -224,13 +228,13 @@ private static String getKey(String nameSpace, String metricName, long epocheMil
224228

225229
private static MetricDatum build(CollectedMetrics c, List<Double> values, List<Double> counts) {
226230
return MetricDatum.builder()
227-
.metricName(c.metricName)
228-
.unit(StandardUnit.NONE)
229-
.timestamp(c.instant)
230-
.dimensions(c.dimensions)
231-
.statisticValues(c.statisticSet)
232-
.values(values)
233-
.counts(counts).build();
231+
.metricName(c.metricName)
232+
.unit(StandardUnit.NONE)
233+
.timestamp(c.instant)
234+
.dimensions(c.dimensions)
235+
.statisticValues(c.statisticSet)
236+
.values(values)
237+
.counts(counts).build();
234238
}
235239

236240
private static void convertAndSend(String namespace, List<CollectedMetrics> list) {
@@ -270,6 +274,11 @@ private static void convertAndSend(String namespace, List<CollectedMetrics> list
270274
}
271275
}
272276

277+
@Nullable
278+
@Override
279+
public AggregationTemporality getPreferredTemporality() {
280+
return AggregationTemporality.DELTA;
281+
}
273282

274283
@Override
275284
protected CompletableResultCode doExport(Collection<MetricData> metrics) {
@@ -301,7 +310,8 @@ static final class MetricsCache {
301310

302311
private final AtomicInteger index = new AtomicInteger();
303312

304-
private final ReadWriteLock[] locks = new ReadWriteLock[]{new ReentrantReadWriteLock(), new ReentrantReadWriteLock()};
313+
private final ReadWriteLock[] locks = new ReadWriteLock[]{new ReentrantReadWriteLock(),
314+
new ReentrantReadWriteLock()};
305315

306316
MetricsCache() {
307317
}
@@ -331,7 +341,7 @@ <T> void recordHistogram(String namespace, String metricName, Attributes attribu
331341
histogramCache[currentIndex].computeIfAbsent(getKey(namespace, metricName, millis, attributes),
332342
k ->
333343
new CollectedMetrics(namespace, metricName, millis, buildDimension(attributes)))
334-
.addPoint(value);
344+
.addPoint(value);
335345
} finally {
336346
readLock.unlock();
337347
}

0 commit comments

Comments
 (0)