Skip to content

Commit aeb08f4

Browse files
authored
Propagate flush to PeriodicMetricReader's metricExporter. (#7410)
1 parent b0a9deb commit aeb08f4

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,21 @@ public MemoryMode getMemoryMode() {
8181

8282
@Override
8383
public CompletableResultCode forceFlush() {
84-
return scheduled.doRun();
84+
CompletableResultCode result = new CompletableResultCode();
85+
CompletableResultCode doRunResult = scheduled.doRun();
86+
doRunResult.whenComplete(
87+
() -> {
88+
CompletableResultCode flushResult = exporter.flush();
89+
flushResult.whenComplete(
90+
() -> {
91+
if (doRunResult.isSuccess() && flushResult.isSuccess()) {
92+
result.succeed();
93+
} else {
94+
result.fail();
95+
}
96+
});
97+
});
98+
return result;
8599
}
86100

87101
@Override

sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,35 @@ void flush() throws Exception {
154154
}
155155
}
156156

157+
@Test
158+
void forceflush_callsFlush() {
159+
MetricExporter metricExporter = mock(MetricExporter.class);
160+
when(metricExporter.export(any()))
161+
.thenReturn(CompletableResultCode.ofSuccess())
162+
.thenReturn(CompletableResultCode.ofSuccess())
163+
.thenThrow(new RuntimeException("Export Failed!"));
164+
when(metricExporter.flush())
165+
.thenReturn(CompletableResultCode.ofSuccess())
166+
.thenReturn(CompletableResultCode.ofFailure())
167+
.thenReturn(CompletableResultCode.ofSuccess());
168+
when(metricExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
169+
170+
PeriodicMetricReader reader =
171+
PeriodicMetricReader.builder(metricExporter)
172+
.setInterval(Duration.ofNanos(Long.MAX_VALUE))
173+
.build();
174+
175+
try {
176+
reader.register(collectionRegistration);
177+
assertThat(reader.forceFlush().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
178+
assertThat(reader.forceFlush().join(10, TimeUnit.SECONDS).isSuccess()).isFalse();
179+
assertThat(reader.forceFlush().join(10, TimeUnit.SECONDS).isSuccess()).isFalse();
180+
} finally {
181+
reader.shutdown();
182+
}
183+
verify(metricExporter, times(3)).flush();
184+
}
185+
157186
@Test
158187
@Timeout(2)
159188
@SuppressLogger(PeriodicMetricReader.class)

0 commit comments

Comments
 (0)