Skip to content

Commit 2400cdc

Browse files
authored
AAD branch: fix shutdown hook (#1705)
* Fix flushing shutdown hook * Introduce low-level client where retry and write-to-disk will take place * Optimizations * More comments * Spotbugs * LGTM * Comments and renames * Return buffers to pool on exception * Comment out work-in-progress * Extract class
1 parent 934bc7b commit 2400cdc

File tree

15 files changed

+334
-209
lines changed

15 files changed

+334
-209
lines changed

agent/agent-tooling/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ dependencies {
6060
implementation group: 'org.apache.httpcomponents', name: 'httpclient', version: versions.apacheHttpClient
6161

6262
implementation group: 'org.checkerframework', name: 'checker-qual', version: versions.checker
63+
implementation 'com.google.code.findbugs:annotations:3.0.1'
6364

6465
implementation(project(':core')) {
6566
// excluding commons-logging and replacing it with jcl-over-slf4j (below)

agent/agent-tooling/gradle/dependency-locks/compileClasspath.lockfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.12.2
1616
com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.12.2
1717
com.fasterxml.jackson:jackson-bom:2.12.2
1818
com.fasterxml.woodstox:woodstox-core:6.2.4
19+
com.google.code.findbugs:annotations:3.0.1
1920
com.google.code.findbugs:jsr305:3.0.2
2021
com.google.errorprone:error_prone_annotations:2.5.1
2122
com.google.guava:failureaccess:1.0.1
@@ -64,6 +65,7 @@ io.projectreactor:reactor-core:3.4.3
6465
jakarta.activation:jakarta.activation-api:1.2.1
6566
jakarta.xml.bind:jakarta.xml.bind-api:2.3.2
6667
net.bytebuddy:byte-buddy:1.10.18
68+
net.jcip:jcip-annotations:1.0
6769
org.apache.commons:commons-lang3:3.11
6870
org.apache.commons:commons-text:1.9
6971
org.apache.httpcomponents:httpclient:4.5.13

agent/agent-tooling/gradle/dependency-locks/runtimeClasspath.lockfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ com.github.stephenc.jcip:jcip-annotations:1.0-1
2424
com.google.auto.service:auto-service-annotations:1.0-rc7
2525
com.google.auto.service:auto-service:1.0-rc7
2626
com.google.auto:auto-common:0.10
27+
com.google.code.findbugs:annotations:3.0.1
2728
com.google.code.findbugs:jsr305:3.0.2
2829
com.google.code.gson:gson:2.8.2
2930
com.google.errorprone:error_prone_annotations:2.5.1
@@ -99,6 +100,7 @@ net.bytebuddy:byte-buddy-agent:1.10.18
99100
net.bytebuddy:byte-buddy:1.10.18
100101
net.java.dev.jna:jna-platform:5.7.0
101102
net.java.dev.jna:jna:5.7.0
103+
net.jcip:jcip-annotations:1.0
102104
net.minidev:accessors-smart:1.2
103105
net.minidev:json-smart:2.3
104106
org.apache.commons:commons-lang3:3.11

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/AiComponentInstaller.java

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.microsoft.applicationinsights.agent.bootstrap.diagnostics.SdkVersionFinder;
2929
import com.microsoft.applicationinsights.agent.internal.instrumentation.sdk.*;
3030
import com.microsoft.applicationinsights.agent.internal.wasbootstrap.MainEntryPoint;
31+
import com.microsoft.applicationinsights.agent.internal.wasbootstrap.OpenTelemetryConfigurer;
3132
import com.microsoft.applicationinsights.agent.internal.wasbootstrap.configuration.Configuration;
3233
import com.microsoft.applicationinsights.agent.internal.wasbootstrap.configuration.Configuration.JmxMetric;
3334
import com.microsoft.applicationinsights.agent.internal.wasbootstrap.configuration.Configuration.ProcessorConfig;
@@ -46,6 +47,7 @@
4647
import com.microsoft.applicationinsights.profiler.config.ServiceProfilerServiceConfig;
4748
import io.opentelemetry.instrumentation.api.aisdk.AiLazyConfiguration;
4849
import io.opentelemetry.javaagent.spi.ComponentInstaller;
50+
import io.opentelemetry.sdk.common.CompletableResultCode;
4951
import org.apache.http.HttpHost;
5052
import org.checkerframework.checker.nullness.qual.Nullable;
5153
import org.slf4j.Logger;
@@ -169,21 +171,7 @@ private static void start(Instrumentation instrumentation) {
169171

170172
// this is currently used by Micrometer instrumentation in addition to 2.x SDK
171173
BytecodeUtil.setDelegate(new BytecodeUtilImpl());
172-
Runtime.getRuntime().addShutdownHook(new Thread() {
173-
@Override
174-
public void run() {
175-
startupLogger.debug("running shutdown hook");
176-
try {
177-
telemetryClient.flush();
178-
telemetryClient.shutdown(5, SECONDS);
179-
startupLogger.debug("completed shutdown hook");
180-
} catch (InterruptedException e) {
181-
startupLogger.debug("interrupted while flushing telemetry during shutdown");
182-
} catch (Throwable t) {
183-
startupLogger.debug(t.getMessage(), t);
184-
}
185-
}
186-
});
174+
Runtime.getRuntime().addShutdownHook(new ShutdownHook(telemetryClient));
187175

188176
RpConfiguration rpConfiguration = MainEntryPoint.getRpConfiguration();
189177
if (rpConfiguration != null) {
@@ -303,4 +291,37 @@ private static ParamXmlElement newParamXml(String name, String value) {
303291
paramXml.setValue(value);
304292
return paramXml;
305293
}
294+
295+
private static class ShutdownHook extends Thread {
296+
private final TelemetryClient telemetryClient;
297+
298+
public ShutdownHook(TelemetryClient telemetryClient) {
299+
this.telemetryClient = telemetryClient;
300+
}
301+
302+
@Override
303+
public void run() {
304+
startupLogger.debug("running shutdown hook");
305+
CompletableResultCode otelFlush = OpenTelemetryConfigurer.flush();
306+
CompletableResultCode result = new CompletableResultCode();
307+
otelFlush.whenComplete(() -> {
308+
CompletableResultCode batchingClientFlush = telemetryClient.flushChannelBatcher();
309+
batchingClientFlush.whenComplete(() -> {
310+
if (otelFlush.isSuccess() && batchingClientFlush.isSuccess()) {
311+
result.succeed();
312+
} else {
313+
result.fail();
314+
}
315+
});
316+
});
317+
result.join(5, SECONDS);
318+
if (result.isSuccess()) {
319+
startupLogger.debug("flushing telemetry on shutdown completed successfully");
320+
} else if (Thread.interrupted()) {
321+
startupLogger.debug("interrupted while flushing telemetry on shutdown");
322+
} else {
323+
startupLogger.debug("flushing telemetry on shutdown has taken more than 5 seconds, shutting down anyways...");
324+
}
325+
}
326+
}
306327
}

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/instrumentation/sdk/BytecodeUtilImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ private SeverityLevel getSeverityLevel(int value) {
334334
@Override
335335
public void flush() {
336336
// this is not null because sdk instrumentation is not added until Global.setTelemetryClient() is called
337-
checkNotNull(Global.getTelemetryClient()).flush();
337+
checkNotNull(Global.getTelemetryClient()).flushChannelBatcher();
338338
}
339339

340340
@Override

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/wasbootstrap/OpenTelemetryConfigurer.java

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,24 @@
1515
import com.microsoft.applicationinsights.agent.internal.propagator.DelegatingPropagator;
1616
import com.microsoft.applicationinsights.agent.internal.sampling.DelegatingSampler;
1717
import com.microsoft.applicationinsights.agent.internal.sampling.Samplers;
18+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1819
import io.opentelemetry.sdk.autoconfigure.spi.SdkTracerProviderConfigurer;
20+
import io.opentelemetry.sdk.common.CompletableResultCode;
1921
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
2022
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
2123
import io.opentelemetry.sdk.trace.export.SpanExporter;
2224

2325
public class OpenTelemetryConfigurer implements SdkTracerProviderConfigurer {
2426

27+
private static volatile BatchSpanProcessor batchSpanProcessor;
28+
29+
public static CompletableResultCode flush() {
30+
return batchSpanProcessor.forceFlush();
31+
}
32+
2533
@Override
34+
@SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
35+
justification = "this method is only called once during initialization")
2636
public void configure(SdkTracerProviderBuilder tracerProvider) {
2737
TelemetryClient telemetryClient = Global.getTelemetryClient();
2838
if (telemetryClient == null) {
@@ -47,37 +57,23 @@ public void configure(SdkTracerProviderBuilder tracerProvider) {
4757
// Reversing the order of processors before passing it to SpanProcessor
4858
Collections.reverse(processors);
4959

50-
SpanExporter exporter = new Exporter(TelemetryClient.getActive());
60+
SpanExporter currExporter = new Exporter(TelemetryClient.getActive());
5161

5262
// NOTE if changing the span processor to something async, flush it in the shutdown hook before flushing TelemetryClient
5363
if (!processors.isEmpty()) {
54-
SpanExporter currExporter = null;
5564
for (ProcessorConfig processorConfig : processors) {
56-
57-
if (currExporter == null) {
58-
currExporter = processorConfig.type == ProcessorType.attribute ?
59-
new ExporterWithAttributeProcessor(processorConfig, exporter) :
60-
new ExporterWithSpanProcessor(processorConfig, exporter);
61-
62-
} else {
63-
currExporter = processorConfig.type == ProcessorType.attribute ?
64-
new ExporterWithAttributeProcessor(processorConfig, currExporter) :
65-
new ExporterWithSpanProcessor(processorConfig, currExporter);
66-
}
65+
currExporter = processorConfig.type == ProcessorType.attribute ?
66+
new ExporterWithAttributeProcessor(processorConfig, currExporter) :
67+
new ExporterWithSpanProcessor(processorConfig, currExporter);
6768
}
68-
69-
// using batch size 1 here because batching is done at a lower level
70-
// but still using BatchSpanProcessor in order to get off of the application thread as soon as possible
71-
tracerProvider.addSpanProcessor(BatchSpanProcessor.builder(currExporter)
72-
.setMaxExportBatchSize(1)
73-
.build());
74-
75-
} else {
76-
// using batch size 1 here because batching is done at a lower level
77-
// but still using BatchSpanProcessor in order to get off of the application thread as soon as possible
78-
tracerProvider.addSpanProcessor(BatchSpanProcessor.builder(exporter)
79-
.setMaxExportBatchSize(1)
80-
.build());
8169
}
70+
71+
// using BatchSpanProcessor in order to get off of the application thread as soon as possible
72+
// using batch size 1 because need to convert to SpanData as soon as possible to grab data for live metrics
73+
// real batching is done at a lower level
74+
batchSpanProcessor = BatchSpanProcessor.builder(currExporter)
75+
.setMaxExportBatchSize(1)
76+
.build();
77+
tracerProvider.addSpanProcessor(batchSpanProcessor);
8278
}
8379
}

agent/exporter/src/main/java/com/microsoft/applicationinsights/agent/Exporter.java

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,13 @@ public CompletableResultCode export(Collection<SpanData> spans) {
126126
}
127127

128128
try {
129-
List<TelemetryItem> telemetryItems = new ArrayList<>();
130129
for (SpanData span : spans) {
131130
logger.debug("exporting span: {}", span);
132-
export(span, telemetryItems);
131+
export(span);
133132
}
134-
telemetryClient.trackAsync(telemetryItems);
135-
// FIXME (trask)
133+
// batching, retry, throttling, and writing to disk on failure occur downstream
134+
// for simplicity not reporting back success/failure from this layer
135+
// only that it was successfully delivered to the next layer
136136
return CompletableResultCode.ofSuccess();
137137
} catch (Throwable t) {
138138
logger.error(t.getMessage(), t);
@@ -150,30 +150,30 @@ public CompletableResultCode shutdown() {
150150
return CompletableResultCode.ofSuccess();
151151
}
152152

153-
private void export(SpanData span, List<TelemetryItem> telemetryItems) {
153+
private void export(SpanData span) {
154154
SpanKind kind = span.getKind();
155155
String instrumentationName = span.getInstrumentationLibraryInfo().getName();
156156
Matcher matcher = COMPONENT_PATTERN.matcher(instrumentationName);
157157
String stdComponent = matcher.matches() ? matcher.group(1) : null;
158158
if (kind == SpanKind.INTERNAL) {
159159
Boolean isLog = span.getAttributes().get(AI_LOG_KEY);
160160
if (isLog != null && isLog) {
161-
exportLogSpan(span, telemetryItems);
161+
exportLogSpan(span);
162162
} else if ("spring-scheduling".equals(stdComponent) && !span.getParentSpanContext().isValid()) {
163163
// TODO (trask) need semantic convention for determining whether to map INTERNAL to request or
164164
// dependency (or need clarification to use SERVER for this)
165-
exportRequest(span, telemetryItems);
165+
exportRequest(span);
166166
} else {
167-
exportRemoteDependency(span, true, telemetryItems);
167+
exportRemoteDependency(span, true);
168168
}
169169
} else if (kind == SpanKind.CLIENT || kind == SpanKind.PRODUCER) {
170-
exportRemoteDependency(span, false, telemetryItems);
170+
exportRemoteDependency(span, false);
171171
} else if (kind == SpanKind.CONSUMER && !span.getParentSpanContext().isRemote()) {
172172
// TODO need spec clarification, but it seems polling for messages can be CONSUMER also
173173
// in which case the span will not have a remote parent and should be treated as a dependency instead of a request
174-
exportRemoteDependency(span, false, telemetryItems);
174+
exportRemoteDependency(span, false);
175175
} else if (kind == SpanKind.SERVER || kind == SpanKind.CONSUMER) {
176-
exportRequest(span, telemetryItems);
176+
exportRequest(span);
177177
} else {
178178
throw new UnsupportedOperationException(kind.name());
179179
}
@@ -195,8 +195,7 @@ private static List<TelemetryExceptionDetails> minimalParse(String errorStack) {
195195
return Collections.singletonList(details);
196196
}
197197

198-
private void exportRemoteDependency(SpanData span, boolean inProc,
199-
List<TelemetryItem> telemetryItems) {
198+
private void exportRemoteDependency(SpanData span, boolean inProc) {
200199
TelemetryItem telemetry = new TelemetryItem();
201200
RemoteDependencyData data = new RemoteDependencyData();
202201
telemetryClient.initRemoteDependencyTelemetry(telemetry, data);
@@ -228,8 +227,8 @@ private void exportRemoteDependency(SpanData span, boolean inProc,
228227

229228
float samplingPercentage = getSamplingPercentage(span.getSpanContext().getTraceState());
230229
telemetry.setSampleRate(samplingPercentage);
231-
telemetryItems.add(telemetry);
232-
exportEvents(span, samplingPercentage, telemetryItems);
230+
telemetryClient.trackAsync(telemetry);
231+
exportEvents(span, samplingPercentage);
233232
}
234233

235234
private static float getSamplingPercentage(TraceState traceState) {
@@ -259,16 +258,16 @@ private void applySemanticConventions(Attributes attributes, RemoteDependencyDat
259258
}
260259
}
261260

262-
private void exportLogSpan(SpanData span, List<TelemetryItem> telemetryItems) {
261+
private void exportLogSpan(SpanData span) {
263262
String errorStack = span.getAttributes().get(AI_LOG_ERROR_STACK_KEY);
264263
if (errorStack == null) {
265-
trackTrace(span, telemetryItems);
264+
trackTrace(span);
266265
} else {
267-
trackTraceAsException(span, errorStack, telemetryItems);
266+
trackTraceAsException(span, errorStack);
268267
}
269268
}
270269

271-
private void trackTrace(SpanData span, List<TelemetryItem> telemetryItems) {
270+
private void trackTrace(SpanData span) {
272271
Attributes attributes = span.getAttributes();
273272
String level = attributes.get(AI_LOG_LEVEL_KEY);
274273
String loggerName = attributes.get(AI_LOGGER_NAME_KEY);
@@ -292,10 +291,10 @@ private void trackTrace(SpanData span, List<TelemetryItem> telemetryItems) {
292291

293292
float samplingPercentage = getSamplingPercentage(span.getSpanContext().getTraceState());
294293
telemetry.setSampleRate(samplingPercentage);
295-
telemetryItems.add(telemetry);
294+
telemetryClient.trackAsync(telemetry);
296295
}
297296

298-
private void trackTraceAsException(SpanData span, String errorStack, List<TelemetryItem> telemetryItems) {
297+
private void trackTraceAsException(SpanData span, String errorStack) {
299298
Attributes attributes = span.getAttributes();
300299
String level = attributes.get(AI_LOG_LEVEL_KEY);
301300
String loggerName = attributes.get(AI_LOGGER_NAME_KEY);
@@ -318,7 +317,7 @@ private void trackTraceAsException(SpanData span, String errorStack, List<Teleme
318317

319318
float samplingPercentage = getSamplingPercentage(span.getSpanContext().getTraceState());
320319
telemetry.setSampleRate(samplingPercentage);
321-
telemetryItems.add(telemetry);
320+
telemetryClient.trackAsync(telemetry);
322321
}
323322

324323
private static void setLoggerProperties(MonitorDomain data, String level, String loggerName) {
@@ -498,7 +497,7 @@ private static int getDefaultPortForDbSystem(String dbSystem) {
498497
}
499498
}
500499

501-
private void exportRequest(SpanData span, List<TelemetryItem> telemetryItems) {
500+
private void exportRequest(SpanData span) {
502501
TelemetryItem telemetry = new TelemetryItem();
503502
RequestData data = new RequestData();
504503
telemetryClient.initRequestTelemetry(telemetry, data);
@@ -585,8 +584,8 @@ private void exportRequest(SpanData span, List<TelemetryItem> telemetryItems) {
585584

586585
float samplingPercentage = getSamplingPercentage(span.getSpanContext().getTraceState());
587586
telemetry.setSampleRate(samplingPercentage);
588-
telemetryItems.add(telemetry);
589-
exportEvents(span, samplingPercentage, telemetryItems);
587+
telemetryClient.trackAsync(telemetry);
588+
exportEvents(span, samplingPercentage);
590589
}
591590

592591
private String getTelemetryName(SpanData span) {
@@ -611,7 +610,7 @@ private static String nullAwareConcat(String str1, String str2, String separator
611610
return str1 + separator + str2;
612611
}
613612

614-
private void exportEvents(SpanData span, float samplingPercentage, List<TelemetryItem> telemetryItems) {
613+
private void exportEvents(SpanData span, float samplingPercentage) {
615614
for (EventData event : span.getEvents()) {
616615
boolean lettuce51 =
617616
span.getInstrumentationLibraryInfo().getName().equals("io.opentelemetry.javaagent.lettuce-5.1");
@@ -635,17 +634,17 @@ private void exportEvents(SpanData span, float samplingPercentage, List<Telemetr
635634
// TODO map OpenTelemetry exception to Application Insights exception better
636635
String stacktrace = event.getAttributes().get(SemanticAttributes.EXCEPTION_STACKTRACE);
637636
if (stacktrace != null) {
638-
trackException(stacktrace, span, operationId, span.getSpanId(), samplingPercentage, telemetryItems);
637+
trackException(stacktrace, span, operationId, span.getSpanId(), samplingPercentage);
639638
}
640639
} else {
641640
telemetry.setSampleRate(samplingPercentage);
642-
telemetryItems.add(telemetry);
641+
telemetryClient.trackAsync(telemetry);
643642
}
644643
}
645644
}
646645

647646
private void trackException(String errorStack, SpanData span, String operationId,
648-
String id, float samplingPercentage, List<TelemetryItem> telemetryItems) {
647+
String id, float samplingPercentage) {
649648
TelemetryItem telemetry = new TelemetryItem();
650649
TelemetryExceptionData data = new TelemetryExceptionData();
651650
telemetryClient.initExceptionTelemetry(telemetry, data);
@@ -655,7 +654,7 @@ private void trackException(String errorStack, SpanData span, String operationId
655654
telemetry.setTime(getFormattedTime(span.getEndEpochNanos()));
656655
telemetry.setSampleRate(samplingPercentage);
657656
data.setExceptions(minimalParse(errorStack));
658-
telemetryItems.add(telemetry);
657+
telemetryClient.trackAsync(telemetry);
659658
}
660659

661660
private static final long NANOSECONDS_PER_DAY = DAYS.toNanos(1);

agent/instrumentation/gradle/dependency-locks/runtimeClasspath.lockfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ com.github.stephenc.jcip:jcip-annotations:1.0-1
2525
com.google.auto.service:auto-service-annotations:1.0-rc7
2626
com.google.auto.service:auto-service:1.0-rc7
2727
com.google.auto:auto-common:0.10
28+
com.google.code.findbugs:annotations:3.0.1
2829
com.google.code.findbugs:jsr305:3.0.2
2930
com.google.code.gson:gson:2.8.2
3031
com.google.errorprone:error_prone_annotations:2.5.1
@@ -171,6 +172,7 @@ net.bytebuddy:byte-buddy-agent:1.10.18
171172
net.bytebuddy:byte-buddy:1.10.18
172173
net.java.dev.jna:jna-platform:5.7.0
173174
net.java.dev.jna:jna:5.7.0
175+
net.jcip:jcip-annotations:1.0
174176
net.minidev:accessors-smart:1.2
175177
net.minidev:json-smart:2.3
176178
org.apache.commons:commons-lang3:3.11

0 commit comments

Comments
 (0)