Skip to content

Commit 0439995

Browse files
authored
Increase export throughput (#2204)
* Revert "Allow concurrent exports (#2181)" This reverts commit b2c60e7. * Pending exports rewrite * Add log message
1 parent cace287 commit 0439995

File tree

6 files changed

+417
-473
lines changed

6 files changed

+417
-473
lines changed

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/BatchSpanProcessor.java

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import java.util.ArrayList;
2929
import java.util.Collections;
3030
import java.util.Queue;
31+
import java.util.Set;
3132
import java.util.concurrent.ArrayBlockingQueue;
3233
import java.util.concurrent.BlockingQueue;
34+
import java.util.concurrent.ConcurrentHashMap;
3335
import java.util.concurrent.TimeUnit;
3436
import java.util.concurrent.atomic.AtomicBoolean;
3537
import java.util.concurrent.atomic.AtomicInteger;
@@ -62,6 +64,7 @@ public static BatchSpanProcessorBuilder builder(TelemetryChannel spanExporter) {
6264
int maxQueueSize,
6365
int maxExportBatchSize,
6466
long exporterTimeoutNanos,
67+
int maxPendingExports,
6568
String queueName) {
6669
MpscArrayQueue<TelemetryItem> queue = new MpscArrayQueue<>(maxQueueSize);
6770
this.worker =
@@ -70,6 +73,7 @@ public static BatchSpanProcessorBuilder builder(TelemetryChannel spanExporter) {
7073
scheduleDelayNanos,
7174
maxExportBatchSize,
7275
exporterTimeoutNanos,
76+
maxPendingExports,
7377
queue,
7478
queue.capacity(),
7579
queueName);
@@ -100,6 +104,7 @@ private static final class Worker implements Runnable {
100104
private final long scheduleDelayNanos;
101105
private final int maxExportBatchSize;
102106
private final long exporterTimeoutNanos;
107+
private final int maxPendingExports;
103108

104109
private long nextExportTime;
105110

@@ -118,21 +123,29 @@ private static final class Worker implements Runnable {
118123
private volatile boolean continueWork = true;
119124
private final ArrayList<TelemetryItem> batch;
120125

126+
private final Set<CompletableResultCode> pendingExports =
127+
Collections.newSetFromMap(new ConcurrentHashMap<>());
128+
121129
private static final OperationLogger queuingSpanLogger =
122130
new OperationLogger(BatchSpanProcessor.class, "Queuing span");
123131

132+
private static final OperationLogger addAsyncExport =
133+
new OperationLogger(BatchSpanProcessor.class, "Add async export");
134+
124135
private Worker(
125136
TelemetryChannel spanExporter,
126137
long scheduleDelayNanos,
127138
int maxExportBatchSize,
128139
long exporterTimeoutNanos,
140+
int maxPendingExports,
129141
Queue<TelemetryItem> queue,
130142
int queueCapacity,
131143
String queueName) {
132144
this.spanExporter = spanExporter;
133145
this.scheduleDelayNanos = scheduleDelayNanos;
134146
this.maxExportBatchSize = maxExportBatchSize;
135147
this.exporterTimeoutNanos = exporterTimeoutNanos;
148+
this.maxPendingExports = maxPendingExports;
136149
this.queue = queue;
137150
this.queueCapacity = queueCapacity;
138151
this.queueName = queueName;
@@ -206,8 +219,12 @@ private void flush() {
206219
}
207220
}
208221
exportCurrentBatch();
209-
flushRequested.get().succeed();
210-
flushRequested.set(null);
222+
CompletableResultCode.ofAll(pendingExports).join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
223+
CompletableResultCode flushResult = flushRequested.get();
224+
if (flushResult != null) {
225+
flushResult.succeed();
226+
flushRequested.set(null);
227+
}
211228
}
212229

213230
private void updateNextExportTime() {
@@ -232,28 +249,6 @@ private CompletableResultCode shutdown() {
232249
}
233250

234251
private CompletableResultCode forceFlush() {
235-
CompletableResultCode overallResult = new CompletableResultCode();
236-
CompletableResultCode workerResult = forceFlushWorker();
237-
workerResult.whenComplete(
238-
() -> {
239-
if (!workerResult.isSuccess()) {
240-
overallResult.fail();
241-
return;
242-
}
243-
CompletableResultCode exporterResult = spanExporter.flush();
244-
exporterResult.whenComplete(
245-
() -> {
246-
if (exporterResult.isSuccess()) {
247-
overallResult.succeed();
248-
} else {
249-
overallResult.fail();
250-
}
251-
});
252-
});
253-
return overallResult;
254-
}
255-
256-
private CompletableResultCode forceFlushWorker() {
257252
CompletableResultCode flushResult = new CompletableResultCode();
258253
// we set the atomic here to trigger the worker loop to do a flush of the entire queue.
259254
if (flushRequested.compareAndSet(null, flushResult)) {
@@ -274,7 +269,20 @@ private void exportCurrentBatch() {
274269
try {
275270
// batching, retry, logging, and writing to disk on failure occur downstream
276271
CompletableResultCode result = spanExporter.send(Collections.unmodifiableList(batch));
277-
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
272+
if (pendingExports.size() < maxPendingExports - 1) {
273+
addAsyncExport.recordSuccess();
274+
pendingExports.add(result);
275+
result.whenComplete(
276+
() -> {
277+
pendingExports.remove(result);
278+
});
279+
} else {
280+
addAsyncExport.recordFailure(
281+
"Max number of concurrent exports "
282+
+ maxPendingExports
283+
+ " has been hit, may see some export throttling due to this");
284+
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
285+
}
278286
} finally {
279287
batch.clear();
280288
}

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/BatchSpanProcessorBuilder.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ final class BatchSpanProcessorBuilder {
3434
private static final int DEFAULT_MAX_QUEUE_SIZE = 2048;
3535
private static final int DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
3636
private static final int DEFAULT_EXPORT_TIMEOUT_MILLIS = 30_000;
37+
private static final int DEFAULT_MAX_PENDING_EXPORTS = 1;
3738

3839
private final TelemetryChannel spanExporter;
3940
private long scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_SCHEDULE_DELAY_MILLIS);
4041
private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
4142
private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE;
4243
private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS);
44+
private int maxPendingExports = DEFAULT_MAX_PENDING_EXPORTS;
4345

4446
BatchSpanProcessorBuilder(TelemetryChannel spanExporter) {
4547
this.spanExporter = requireNonNull(spanExporter, "spanExporter");
@@ -122,6 +124,31 @@ public BatchSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
122124
return this;
123125
}
124126

127+
/**
128+
* The maximum number of exports that can be pending at any time.
129+
*
130+
* <p>The {@link BatchSpanProcessor}'s single worker thread will keep processing as many batches
131+
* as it can without blocking on the {@link io.opentelemetry.sdk.common.CompletableResultCode}s
132+
* that are returned from the {@code spanExporter}, but it will limit the total number of pending
133+
* exports in flight to this number.
134+
*
135+
* <p>Default value is {@code 1}.
136+
*
137+
* @param maxPendingExports the maximum number of exports that can be pending at any time.
138+
* @return this.
139+
* @see BatchSpanProcessorBuilder#DEFAULT_MAX_PENDING_EXPORTS
140+
*/
141+
public BatchSpanProcessorBuilder setMaxPendingExports(int maxPendingExports) {
142+
checkArgument(maxPendingExports > 0, "maxPendingExports must be positive.");
143+
this.maxPendingExports = maxPendingExports;
144+
return this;
145+
}
146+
147+
// Visible for testing
148+
int getMaxPendingExports() {
149+
return maxPendingExports;
150+
}
151+
125152
/**
126153
* Returns a new {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor} that batches, then
127154
* converts spans to proto and forwards them to the given {@code spanExporter}.
@@ -136,6 +163,7 @@ public BatchSpanProcessor build(String queueName) {
136163
maxQueueSize,
137164
maxExportBatchSize,
138165
exporterTimeoutNanos,
166+
maxPendingExports,
139167
queueName);
140168
}
141169
}

0 commit comments

Comments
 (0)