Skip to content

Commit 4077ee0

Browse files
thpierceAsakerMohd
andauthored
Added ForceFlush Implementation (#885)
### Description: Passed the forceflush function from SdkMeteringProvider to the AwsSpanMetricProcessor to forceFlush remaining metrics on shutdown to the cwAgent/Collector. ### Tesing: Increased the metricExporter interval and the BatchSpanProcessor delay to 10 minutes using: ``` OTEL_METRIC_EXPORT_INTERVAL=600000 \ OTEL_BSP_SCHEDULE_DELAY=600000 \ ``` Without the force flush change, exiting the sample app only flushed the traces without the metrics. With the forceFlush change, both traces and metrics were flushed to the collector. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --------- Co-authored-by: Mohamed Asaker <[email protected]>
1 parent cb31105 commit 4077ee0

File tree

4 files changed

+43
-11
lines changed

4 files changed

+43
-11
lines changed

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import io.opentelemetry.api.common.Attributes;
1919
import io.opentelemetry.api.common.AttributesBuilder;
20-
import io.opentelemetry.api.metrics.MeterProvider;
2120
import io.opentelemetry.contrib.awsxray.AlwaysRecordSampler;
2221
import io.opentelemetry.contrib.awsxray.ResourceHolder;
2322
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
@@ -164,14 +163,16 @@ private SdkTracerProviderBuilder customizeTracerProviderBuilder(
164163
MetricReader metricReader =
165164
PeriodicMetricReader.builder(metricsExporter).setInterval(exportInterval).build();
166165

167-
MeterProvider meterProvider =
166+
SdkMeterProvider meterProvider =
168167
SdkMeterProvider.builder()
169168
.setResource(ResourceHolder.getResource())
170169
.registerMetricReader(metricReader)
171170
.build();
171+
172172
// Construct and set application signals metrics processor
173173
SpanProcessor spanMetricsProcessor =
174-
AwsSpanMetricsProcessorBuilder.create(meterProvider, ResourceHolder.getResource())
174+
AwsSpanMetricsProcessorBuilder.create(
175+
meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush)
175176
.build();
176177
tracerProviderBuilder.addSpanProcessor(spanMetricsProcessor);
177178
}

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessor.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import io.opentelemetry.api.metrics.LongHistogram;
2323
import io.opentelemetry.api.trace.StatusCode;
2424
import io.opentelemetry.context.Context;
25+
import io.opentelemetry.sdk.common.CompletableResultCode;
2526
import io.opentelemetry.sdk.resources.Resource;
2627
import io.opentelemetry.sdk.trace.ReadWriteSpan;
2728
import io.opentelemetry.sdk.trace.ReadableSpan;
2829
import io.opentelemetry.sdk.trace.SpanProcessor;
2930
import io.opentelemetry.sdk.trace.data.SpanData;
3031
import java.util.Map;
32+
import java.util.function.Supplier;
3133
import javax.annotation.concurrent.Immutable;
3234

3335
/**
@@ -64,29 +66,38 @@ public final class AwsSpanMetricsProcessor implements SpanProcessor {
6466

6567
private final MetricAttributeGenerator generator;
6668
private final Resource resource;
69+
private final Supplier<CompletableResultCode> forceFlushAction;
6770

6871
/** Use {@link AwsSpanMetricsProcessorBuilder} to construct this processor. */
6972
static AwsSpanMetricsProcessor create(
7073
LongHistogram errorHistogram,
7174
LongHistogram faultHistogram,
7275
DoubleHistogram latencyHistogram,
7376
MetricAttributeGenerator generator,
74-
Resource resource) {
77+
Resource resource,
78+
Supplier<CompletableResultCode> forceFlushAction) {
7579
return new AwsSpanMetricsProcessor(
76-
errorHistogram, faultHistogram, latencyHistogram, generator, resource);
80+
errorHistogram, faultHistogram, latencyHistogram, generator, resource, forceFlushAction);
7781
}
7882

7983
private AwsSpanMetricsProcessor(
8084
LongHistogram errorHistogram,
8185
LongHistogram faultHistogram,
8286
DoubleHistogram latencyHistogram,
8387
MetricAttributeGenerator generator,
84-
Resource resource) {
88+
Resource resource,
89+
Supplier<CompletableResultCode> forceFlushAction) {
8590
this.errorHistogram = errorHistogram;
8691
this.faultHistogram = faultHistogram;
8792
this.latencyHistogram = latencyHistogram;
8893
this.generator = generator;
8994
this.resource = resource;
95+
this.forceFlushAction = forceFlushAction;
96+
}
97+
98+
@Override
99+
public CompletableResultCode forceFlush() {
100+
return forceFlushAction.get();
90101
}
91102

92103
@Override

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorBuilder.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import io.opentelemetry.api.metrics.LongHistogram;
2323
import io.opentelemetry.api.metrics.Meter;
2424
import io.opentelemetry.api.metrics.MeterProvider;
25+
import io.opentelemetry.sdk.common.CompletableResultCode;
2526
import io.opentelemetry.sdk.resources.Resource;
27+
import java.util.function.Supplier;
2628

2729
/** A builder for {@link AwsSpanMetricsProcessor} */
2830
public final class AwsSpanMetricsProcessorBuilder {
@@ -42,18 +44,29 @@ public final class AwsSpanMetricsProcessorBuilder {
4244
private final MeterProvider meterProvider;
4345
private final Resource resource;
4446

47+
// ForceFlush action provided from {@link SdkMeterProvider#forceFlush()} so that when the
48+
// application exits The spanMetricProcessor calls the meterProvder.forceFlush to flush
49+
// any remaining metrics before shutdown
50+
private final Supplier<CompletableResultCode> forceFlushAction;
51+
4552
// Optional builder elements
4653
private MetricAttributeGenerator generator = DEFAULT_GENERATOR;
4754
private String scopeName = DEFAULT_SCOPE_NAME;
4855

4956
public static AwsSpanMetricsProcessorBuilder create(
50-
MeterProvider meterProvider, Resource resource) {
51-
return new AwsSpanMetricsProcessorBuilder(meterProvider, resource);
57+
MeterProvider meterProvider,
58+
Resource resource,
59+
Supplier<CompletableResultCode> forceFlushAction) {
60+
return new AwsSpanMetricsProcessorBuilder(meterProvider, resource, forceFlushAction);
5261
}
5362

54-
private AwsSpanMetricsProcessorBuilder(MeterProvider meterProvider, Resource resource) {
63+
private AwsSpanMetricsProcessorBuilder(
64+
MeterProvider meterProvider,
65+
Resource resource,
66+
Supplier<CompletableResultCode> forceFlushAction) {
5567
this.meterProvider = meterProvider;
5668
this.resource = resource;
69+
this.forceFlushAction = forceFlushAction;
5770
}
5871

5972
/**
@@ -86,6 +99,6 @@ public AwsSpanMetricsProcessor build() {
8699
meter.histogramBuilder(LATENCY).setUnit(LATENCY_UNITS).build();
87100

88101
return AwsSpanMetricsProcessor.create(
89-
errorHistogram, faultHistogram, latencyHistogram, generator, resource);
102+
errorHistogram, faultHistogram, latencyHistogram, generator, resource, forceFlushAction);
90103
}
91104
}

awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsSpanMetricsProcessorTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ private enum ExpectedStatusMetric {
7777
private MetricAttributeGenerator generatorMock;
7878
private AwsSpanMetricsProcessor awsSpanMetricsProcessor;
7979

80+
// Mock forceFlush function that returns success when invoked similar
81+
// to the default implementation of forceFlush.
82+
private CompletableResultCode forceFlushAction() {
83+
return CompletableResultCode.ofSuccess();
84+
}
85+
8086
@BeforeEach
8187
public void setUpMocks() {
8288
errorHistogramMock = mock(LongHistogram.class);
@@ -90,7 +96,8 @@ public void setUpMocks() {
9096
faultHistogramMock,
9197
latencyHistogramMock,
9298
generatorMock,
93-
testResource);
99+
testResource,
100+
this::forceFlushAction);
94101
}
95102

96103
@Test

0 commit comments

Comments
 (0)