Skip to content

Commit 1ffad2b

Browse files
authored
Add batching to AAD branch (#1689)
1 parent 14c7f39 commit 1ffad2b

File tree

11 files changed

+397
-36
lines changed

11 files changed

+397
-36
lines changed

agent/agent-tooling/gradle/dependency-locks/runtimeClasspath.lockfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ org.apache.httpcomponents:httpclient:4.5.13
9595
org.apache.httpcomponents:httpcore:4.4.13
9696
org.checkerframework:checker-qual:3.12.0
9797
org.codehaus.woodstox:stax2-api:4.2.1
98+
org.jctools:jctools-core:3.3.0
9899
org.jetbrains.kotlin:kotlin-bom:1.4.21
99100
org.reactivestreams:reactive-streams:1.0.3
100101
org.slf4j:jcl-over-slf4j:1.7.30

agent/exporter/gradle/dependency-locks/runtimeClasspath.lockfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,6 @@ org.apache.httpcomponents:httpclient:4.5.13
6969
org.apache.httpcomponents:httpcore:4.4.13
7070
org.checkerframework:checker-qual:3.8.0
7171
org.codehaus.woodstox:stax2-api:4.2.1
72+
org.jctools:jctools-core:3.3.0
7273
org.reactivestreams:reactive-streams:1.0.3
7374
org.slf4j:slf4j-api:1.7.30

agent/exporter/src/main/java/com/microsoft/applicationinsights/agent/Exporter.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131

3232
import com.azure.monitor.opentelemetry.exporter.implementation.models.*;
3333
import com.microsoft.applicationinsights.TelemetryUtil;
34-
import reactor.util.context.Context;
35-
import com.azure.core.util.tracing.Tracer;
3634
import com.google.common.base.Joiner;
3735
import com.google.common.base.Strings;
3836
import com.microsoft.applicationinsights.TelemetryClient;
@@ -127,20 +125,18 @@ public CompletableResultCode export(Collection<SpanData> spans) {
127125
return CompletableResultCode.ofSuccess();
128126
}
129127

130-
CompletableResultCode completableResultCode = new CompletableResultCode();
131128
try {
132129
List<TelemetryItem> telemetryItems = new ArrayList<>();
133130
for (SpanData span : spans) {
134131
logger.debug("exporting span: {}", span);
135132
export(span, telemetryItems);
136133
}
137-
telemetryClient.trackAsync(telemetryItems)
138-
.subscriberContext(Context.of(Tracer.DISABLE_TRACING_KEY, true))
139-
.subscribe(ignored -> { }, error -> completableResultCode.fail(), completableResultCode::succeed);
140-
return completableResultCode;
134+
telemetryClient.trackAsync(telemetryItems);
135+
// FIXME (trask)
136+
return CompletableResultCode.ofSuccess();
141137
} catch (Throwable t) {
142138
logger.error(t.getMessage(), t);
143-
return completableResultCode.fail();
139+
return CompletableResultCode.ofFailure();
144140
}
145141
}
146142

agent/instrumentation/gradle/dependency-locks/runtimeClasspath.lockfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ org.apache.httpcomponents:httpclient:4.5.13
167167
org.apache.httpcomponents:httpcore:4.4.13
168168
org.checkerframework:checker-qual:3.12.0
169169
org.codehaus.woodstox:stax2-api:4.2.1
170+
org.jctools:jctools-core:3.3.0
170171
org.jetbrains.kotlin:kotlin-bom:1.4.21
171172
org.reactivestreams:reactive-streams:1.0.3
172173
org.slf4j:jcl-over-slf4j:1.7.30

core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ dependencies {
6363

6464
implementation group: 'com.azure', name: 'azure-core', version: '1.15.0'
6565
implementation group: 'io.opentelemetry', name: 'opentelemetry-api', version: versions.opentelemetry
66+
implementation group: 'org.jctools', name: 'jctools-core', version: '3.3.0'
6667

6768
testImplementation group: 'junit', name: 'junit', version: versions.junit
6869
testImplementation group: 'org.hamcrest', name: 'hamcrest-core', version: versions.hamcrest

core/gradle/dependency-locks/compileClasspath.lockfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,6 @@ org.apache.httpcomponents:httpclient:4.5.13
6161
org.apache.httpcomponents:httpcore:4.4.13
6262
org.checkerframework:checker-qual:3.8.0
6363
org.codehaus.woodstox:stax2-api:4.2.1
64+
org.jctools:jctools-core:3.3.0
6465
org.reactivestreams:reactive-streams:1.0.3
6566
org.slf4j:slf4j-api:1.7.30

core/gradle/dependency-locks/runtimeClasspath.lockfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,6 @@ org.apache.httpcomponents:httpclient:4.5.13
6969
org.apache.httpcomponents:httpcore:4.4.13
7070
org.checkerframework:checker-qual:3.8.0
7171
org.codehaus.woodstox:stax2-api:4.2.1
72+
org.jctools:jctools-core:3.3.0
7273
org.reactivestreams:reactive-streams:1.0.3
7374
org.slf4j:slf4j-api:1.7.30

core/spotbugs.exclude.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,11 @@
8484
<Bug pattern="NM_METHOD_NAMING_CONVENTION" />
8585
</Or>
8686
</Match>
87+
<Match>
88+
<Or>
89+
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
90+
<Bug pattern="RV_RETURN_VALUE_IGNORED" />
91+
</Or>
92+
<Class name="com.microsoft.applicationinsights.BatchSpanProcessor$Worker" />
93+
</Match>
8794
</FindBugsFilter>
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package com.microsoft.applicationinsights;
7+
8+
import com.azure.core.util.tracing.Tracer;
9+
import com.azure.monitor.opentelemetry.exporter.implementation.ApplicationInsightsClientImpl;
10+
import com.azure.monitor.opentelemetry.exporter.implementation.models.ExportResult;
11+
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
12+
import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue;
13+
import io.opentelemetry.sdk.common.CompletableResultCode;
14+
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
15+
import reactor.core.publisher.Mono;
16+
import reactor.util.context.Context;
17+
18+
import java.util.ArrayList;
19+
import java.util.Collections;
20+
import java.util.Queue;
21+
import java.util.concurrent.ArrayBlockingQueue;
22+
import java.util.concurrent.BlockingQueue;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
import java.util.logging.Level;
28+
import java.util.logging.Logger;
29+
30+
// copied from io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder
31+
public final class BatchSpanProcessor {
32+
33+
private static final String WORKER_THREAD_NAME =
34+
BatchSpanProcessor.class.getSimpleName() + "_WorkerThread";
35+
36+
private final Worker worker;
37+
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
38+
39+
/**
40+
* Returns a new Builder for {@link BatchSpanProcessor}.
41+
*
42+
* @param spanExporter the {@code SpanExporter} to where the Spans are pushed.
43+
* @return a new {@link BatchSpanProcessor}.
44+
* @throws NullPointerException if the {@code spanExporter} is {@code null}.
45+
*/
46+
public static BatchSpanProcessorBuilder builder(ApplicationInsightsClientImpl spanExporter) {
47+
return new BatchSpanProcessorBuilder(spanExporter);
48+
}
49+
50+
BatchSpanProcessor(
51+
ApplicationInsightsClientImpl spanExporter,
52+
long scheduleDelayNanos,
53+
int maxQueueSize,
54+
int maxExportBatchSize,
55+
long exporterTimeoutNanos) {
56+
this.worker =
57+
new Worker(
58+
spanExporter,
59+
scheduleDelayNanos,
60+
maxExportBatchSize,
61+
exporterTimeoutNanos,
62+
new MpscArrayQueue<>(maxQueueSize));
63+
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
64+
workerThread.start();
65+
}
66+
67+
public void trackAsync(TelemetryItem span) {
68+
worker.addSpan(span);
69+
}
70+
71+
public CompletableResultCode shutdown() {
72+
if (isShutdown.getAndSet(true)) {
73+
return CompletableResultCode.ofSuccess();
74+
}
75+
return worker.shutdown();
76+
}
77+
78+
public CompletableResultCode forceFlush() {
79+
return worker.forceFlush();
80+
}
81+
82+
// Worker is a thread that batches multiple spans and calls the registered SpanExporter to export
83+
// the data.
84+
private static final class Worker implements Runnable {
85+
86+
private static final Logger logger = Logger.getLogger(Worker.class.getName());
87+
private final ApplicationInsightsClientImpl spanExporter;
88+
private final long scheduleDelayNanos;
89+
private final int maxExportBatchSize;
90+
private final long exporterTimeoutNanos;
91+
92+
private long nextExportTime;
93+
94+
private final Queue<TelemetryItem> queue;
95+
// When waiting on the spans queue, exporter thread sets this atomic to the number of more
96+
// spans it needs before doing an export. Writer threads would then wait for the queue to reach
97+
// spansNeeded size before notifying the exporter thread about new entries.
98+
// Integer.MAX_VALUE is used to imply that exporter thread is not expecting any signal. Since
99+
// exporter thread doesn't expect any signal initially, this value is initialized to
100+
// Integer.MAX_VALUE.
101+
private final AtomicInteger spansNeeded = new AtomicInteger(Integer.MAX_VALUE);
102+
private final BlockingQueue<Boolean> signal;
103+
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
104+
private volatile boolean continueWork = true;
105+
private final ArrayList<TelemetryItem> batch;
106+
107+
private Worker(
108+
ApplicationInsightsClientImpl spanExporter,
109+
long scheduleDelayNanos,
110+
int maxExportBatchSize,
111+
long exporterTimeoutNanos,
112+
Queue<TelemetryItem> queue) {
113+
this.spanExporter = spanExporter;
114+
this.scheduleDelayNanos = scheduleDelayNanos;
115+
this.maxExportBatchSize = maxExportBatchSize;
116+
this.exporterTimeoutNanos = exporterTimeoutNanos;
117+
this.queue = queue;
118+
this.signal = new ArrayBlockingQueue<>(1);
119+
this.batch = new ArrayList<>(this.maxExportBatchSize);
120+
}
121+
122+
private void addSpan(TelemetryItem span) {
123+
if (queue.offer(span)) {
124+
// FIXME (trask) log dropped span
125+
// droppedSpans.add(1);
126+
} else {
127+
if (queue.size() >= spansNeeded.get()) {
128+
signal.offer(true);
129+
}
130+
}
131+
}
132+
133+
@Override
134+
public void run() {
135+
updateNextExportTime();
136+
137+
while (continueWork) {
138+
if (flushRequested.get() != null) {
139+
flush();
140+
}
141+
while (!queue.isEmpty() && batch.size() < maxExportBatchSize) {
142+
batch.add(queue.poll());
143+
}
144+
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
145+
exportCurrentBatch();
146+
updateNextExportTime();
147+
}
148+
if (queue.isEmpty()) {
149+
try {
150+
long pollWaitTime = nextExportTime - System.nanoTime();
151+
if (pollWaitTime > 0) {
152+
spansNeeded.set(maxExportBatchSize - batch.size());
153+
signal.poll(pollWaitTime, TimeUnit.NANOSECONDS);
154+
spansNeeded.set(Integer.MAX_VALUE);
155+
}
156+
} catch (InterruptedException e) {
157+
Thread.currentThread().interrupt();
158+
return;
159+
}
160+
}
161+
}
162+
}
163+
164+
private void flush() {
165+
int spansToFlush = queue.size();
166+
while (spansToFlush > 0) {
167+
TelemetryItem span = queue.poll();
168+
assert span != null;
169+
batch.add(span);
170+
spansToFlush--;
171+
if (batch.size() >= maxExportBatchSize) {
172+
exportCurrentBatch();
173+
}
174+
}
175+
exportCurrentBatch();
176+
flushRequested.get().succeed();
177+
flushRequested.set(null);
178+
}
179+
180+
private void updateNextExportTime() {
181+
nextExportTime = System.nanoTime() + scheduleDelayNanos;
182+
}
183+
184+
private CompletableResultCode shutdown() {
185+
final CompletableResultCode result = new CompletableResultCode();
186+
187+
final CompletableResultCode flushResult = forceFlush();
188+
flushResult.whenComplete(
189+
() -> {
190+
continueWork = false;
191+
if (!flushResult.isSuccess()) {
192+
result.fail();
193+
} else {
194+
result.succeed();
195+
}
196+
});
197+
198+
return result;
199+
}
200+
201+
private CompletableResultCode forceFlush() {
202+
CompletableResultCode flushResult = new CompletableResultCode();
203+
// we set the atomic here to trigger the worker loop to do a flush of the entire queue.
204+
if (flushRequested.compareAndSet(null, flushResult)) {
205+
signal.offer(true);
206+
}
207+
CompletableResultCode possibleResult = flushRequested.get();
208+
// there's a race here where the flush happening in the worker loop could complete before we
209+
// get what's in the atomic. In that case, just return success, since we know it succeeded in
210+
// the interim.
211+
return possibleResult == null ? CompletableResultCode.ofSuccess() : possibleResult;
212+
}
213+
214+
private void exportCurrentBatch() {
215+
if (batch.isEmpty()) {
216+
return;
217+
}
218+
219+
try {
220+
spanExporter.trackAsync(Collections.unmodifiableList(batch))
221+
.subscriberContext(Context.of(Tracer.DISABLE_TRACING_KEY, true))
222+
.subscribe();
223+
// FIXME (trask)
224+
//.subscribe(ignored -> { }, error -> completableResultCode.fail(), completableResultCode::succeed);
225+
// FIXME (trask)
226+
// result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
227+
// if (!result.isSuccess()) {
228+
// logger.log(Level.FINE, "Exporter failed");
229+
// }
230+
} catch (RuntimeException e) {
231+
logger.log(Level.WARNING, "Exporter threw an Exception", e);
232+
} finally {
233+
batch.clear();
234+
}
235+
}
236+
}
237+
}

0 commit comments

Comments
 (0)