Skip to content

Commit a9de263

Browse files
authored
Separate queue for metrics to avoid dropping telemetry (#2062)
* Separate queue for metrics to avoid dropping telemetry * And bump version
1 parent cb2c800 commit a9de263

File tree

6 files changed

+105
-54
lines changed

6 files changed

+105
-54
lines changed

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/configuration/Configuration.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,10 @@ public static class PreviewConfiguration {
244244

245245
public List<InstrumentationKeyOverride> instrumentationKeyOverrides = new ArrayList<>();
246246

247-
public int exportQueueCapacity = 2048;
247+
public int generalExportQueueCapacity = 2048;
248+
// metrics get flooded every 60 seconds by default, so need larger queue size to avoid dropping
249+
// telemetry (they are much smaller so a larger queue size is ok)
250+
public int metricsExportQueueCapacity = 65536;
248251
}
249252

250253
public static class InheritedAttribute {

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/init/AiComponentInstaller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ private static AppIdSupplier start(Instrumentation instrumentation) {
173173
.setIkeyEndpointMap(ikeyEndpointMap)
174174
.setStatsbeatModule(statsbeatModule)
175175
.setReadOnlyFileSystem(readOnlyFileSystem)
176-
.setMaxExportQueueSize(config.preview.exportQueueCapacity)
176+
.setGeneralExportQueueSize(config.preview.generalExportQueueCapacity)
177+
.setMetricsExportQueueSize(config.preview.metricsExportQueueCapacity)
177178
.setAadAuthentication(config.preview.authentication)
178179
.build();
179180

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/BatchSpanProcessor.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public static BatchSpanProcessorBuilder builder(TelemetryChannel spanExporter) {
6161
long scheduleDelayNanos,
6262
int maxQueueSize,
6363
int maxExportBatchSize,
64-
long exporterTimeoutNanos) {
64+
long exporterTimeoutNanos,
65+
String queueName) {
6566
MpscArrayQueue<TelemetryItem> queue = new MpscArrayQueue<>(maxQueueSize);
6667
this.worker =
6768
new Worker(
@@ -70,7 +71,8 @@ public static BatchSpanProcessorBuilder builder(TelemetryChannel spanExporter) {
7071
maxExportBatchSize,
7172
exporterTimeoutNanos,
7273
queue,
73-
queue.capacity());
74+
queue.capacity(),
75+
queueName);
7476
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
7577
workerThread.start();
7678
}
@@ -103,6 +105,7 @@ private static final class Worker implements Runnable {
103105

104106
private final Queue<TelemetryItem> queue;
105107
private final int queueCapacity;
108+
private final String queueName;
106109
// When waiting on the spans queue, exporter thread sets this atomic to the number of more
107110
// spans it needs before doing an export. Writer threads would then wait for the queue to reach
108111
// spansNeeded size before notifying the exporter thread about new entries.
@@ -124,25 +127,32 @@ private Worker(
124127
int maxExportBatchSize,
125128
long exporterTimeoutNanos,
126129
Queue<TelemetryItem> queue,
127-
int queueCapacity) {
130+
int queueCapacity,
131+
String queueName) {
128132
this.spanExporter = spanExporter;
129133
this.scheduleDelayNanos = scheduleDelayNanos;
130134
this.maxExportBatchSize = maxExportBatchSize;
131135
this.exporterTimeoutNanos = exporterTimeoutNanos;
132136
this.queue = queue;
133137
this.queueCapacity = queueCapacity;
138+
this.queueName = queueName;
134139
this.signal = new ArrayBlockingQueue<>(1);
135140
this.batch = new ArrayList<>(this.maxExportBatchSize);
136141
}
137142

138143
private void addSpan(TelemetryItem span) {
139144
if (!queue.offer(span)) {
140145
queuingSpanLogger.recordFailure(
141-
"Max export queue size of "
146+
"Max "
147+
+ queueName
148+
+ " export queue capacity of "
142149
+ queueCapacity
143-
+ " has been hit, dropping a span (max export queue size can be increased"
144-
+ " in the applicationinsights.json configuration file, e.g."
145-
+ " { \"preview\": { \"exportQueueCapacity\": "
150+
+ " has been hit, dropping a telemetry record (max "
151+
+ queueName
152+
+ " export queue capacity can be increased in the applicationinsights.json"
153+
+ " configuration file, e.g. { \"preview\": { \""
154+
+ queueName
155+
+ "ExportQueueCapacity\": "
146156
+ (queueCapacity * 2)
147157
+ " } }");
148158
} else {

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/BatchSpanProcessorBuilder.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,13 @@ public BatchSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
129129
* @return a new {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor}.
130130
* @throws NullPointerException if the {@code spanExporter} is {@code null}.
131131
*/
132-
public BatchSpanProcessor build() {
132+
public BatchSpanProcessor build(String queueName) {
133133
return new BatchSpanProcessor(
134-
spanExporter, scheduleDelayNanos, maxQueueSize, maxExportBatchSize, exporterTimeoutNanos);
134+
spanExporter,
135+
scheduleDelayNanos,
136+
maxQueueSize,
137+
maxExportBatchSize,
138+
exporterTimeoutNanos,
139+
queueName);
135140
}
136141
}

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryClient.java

Lines changed: 74 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,14 @@ public class TelemetryClient {
9595
private final Cache<String, String> ikeyEndpointMap;
9696
private final StatsbeatModule statsbeatModule;
9797
private final boolean readOnlyFileSystem;
98-
private final int exportQueueCapacity;
98+
private final int generalExportQueueCapacity;
99+
private final int metricsExportQueueCapacity;
99100

100101
@Nullable private final Configuration.AadAuthentication aadAuthentication;
101102

102103
private final Object channelInitLock = new Object();
103-
private volatile @MonotonicNonNull BatchSpanProcessor channelBatcher;
104+
private volatile @MonotonicNonNull BatchSpanProcessor generalChannelBatcher;
105+
private volatile @MonotonicNonNull BatchSpanProcessor metricsChannelBatcher;
104106
private volatile @MonotonicNonNull BatchSpanProcessor statsbeatChannelBatcher;
105107

106108
public static TelemetryClient.Builder builder() {
@@ -124,7 +126,8 @@ public TelemetryClient(Builder builder) {
124126
this.ikeyEndpointMap = builder.ikeyEndpointMap;
125127
this.statsbeatModule = builder.statsbeatModule;
126128
this.readOnlyFileSystem = builder.readOnlyFileSystem;
127-
this.exportQueueCapacity = builder.exportQueueCapacity;
129+
this.generalExportQueueCapacity = builder.generalExportQueueCapacity;
130+
this.metricsExportQueueCapacity = builder.metricsExportQueueCapacity;
128131
this.aadAuthentication = builder.aadAuthentication;
129132
}
130133

@@ -182,7 +185,11 @@ public void trackAsync(TelemetryItem telemetry) {
182185
// batching, retry, throttling, and writing to disk on failure occur downstream
183186
// for simplicity not reporting back success/failure from this layer
184187
// only that it was successfully delivered to the next layer
185-
getChannelBatcher().trackAsync(telemetry);
188+
if (data instanceof MetricsData) {
189+
getMetricsChannelBatcher().trackAsync(telemetry);
190+
} else {
191+
getGeneralChannelBatcher().trackAsync(telemetry);
192+
}
186193
}
187194

188195
public void trackStatsbeatAsync(TelemetryItem telemetry) {
@@ -193,50 +200,69 @@ public void trackStatsbeatAsync(TelemetryItem telemetry) {
193200
}
194201

195202
public CompletableResultCode flushChannelBatcher() {
196-
if (channelBatcher != null) {
197-
return channelBatcher.forceFlush();
203+
if (generalChannelBatcher != null) {
204+
return generalChannelBatcher.forceFlush();
198205
} else {
199206
return CompletableResultCode.ofSuccess();
200207
}
201208
}
202209

203-
public BatchSpanProcessor getChannelBatcher() {
204-
if (channelBatcher == null) {
210+
private BatchSpanProcessor getGeneralChannelBatcher() {
211+
if (generalChannelBatcher == null) {
205212
synchronized (channelInitLock) {
206-
if (channelBatcher == null) {
207-
File telemetryFolder;
208-
LocalFileLoader localFileLoader = null;
209-
LocalFileWriter localFileWriter = null;
210-
if (!readOnlyFileSystem) {
211-
telemetryFolder = LocalStorageUtils.getOfflineTelemetryFolder();
212-
LocalFileCache localFileCache = new LocalFileCache(telemetryFolder);
213-
localFileLoader =
214-
new LocalFileLoader(
215-
localFileCache, telemetryFolder, statsbeatModule.getNonessentialStatsbeat());
216-
localFileWriter =
217-
new LocalFileWriter(
218-
localFileCache, telemetryFolder, statsbeatModule.getNonessentialStatsbeat());
219-
}
220-
221-
TelemetryChannel channel =
222-
TelemetryChannel.create(
223-
endpointProvider.getIngestionEndpointUrl(),
224-
localFileWriter,
225-
ikeyEndpointMap,
226-
statsbeatModule,
227-
false,
228-
aadAuthentication);
229-
230-
if (!readOnlyFileSystem) {
231-
LocalFileSender.start(localFileLoader, channel);
232-
}
213+
if (generalChannelBatcher == null) {
214+
generalChannelBatcher = initChannelBatcher(generalExportQueueCapacity, 512, "general");
215+
}
216+
}
217+
}
218+
return generalChannelBatcher;
219+
}
233220

234-
channelBatcher =
235-
BatchSpanProcessor.builder(channel).setMaxQueueSize(exportQueueCapacity).build();
221+
// metrics get flooded every 60 seconds by default, so need much larger queue size to avoid
222+
// dropping telemetry (they are much smaller so a larger queue size and larger batch size are ok)
223+
private BatchSpanProcessor getMetricsChannelBatcher() {
224+
if (metricsChannelBatcher == null) {
225+
synchronized (channelInitLock) {
226+
if (metricsChannelBatcher == null) {
227+
metricsChannelBatcher = initChannelBatcher(metricsExportQueueCapacity, 2048, "metrics");
236228
}
237229
}
238230
}
239-
return channelBatcher;
231+
return metricsChannelBatcher;
232+
}
233+
234+
private BatchSpanProcessor initChannelBatcher(
235+
int exportQueueCapacity, int maxExportBatchSize, String queueName) {
236+
LocalFileLoader localFileLoader = null;
237+
LocalFileWriter localFileWriter = null;
238+
if (!readOnlyFileSystem) {
239+
File telemetryFolder = LocalStorageUtils.getOfflineTelemetryFolder();
240+
LocalFileCache localFileCache = new LocalFileCache(telemetryFolder);
241+
localFileLoader =
242+
new LocalFileLoader(
243+
localFileCache, telemetryFolder, statsbeatModule.getNonessentialStatsbeat());
244+
localFileWriter =
245+
new LocalFileWriter(
246+
localFileCache, telemetryFolder, statsbeatModule.getNonessentialStatsbeat());
247+
}
248+
249+
TelemetryChannel channel =
250+
TelemetryChannel.create(
251+
endpointProvider.getIngestionEndpointUrl(),
252+
localFileWriter,
253+
ikeyEndpointMap,
254+
statsbeatModule,
255+
false,
256+
aadAuthentication);
257+
258+
if (!readOnlyFileSystem) {
259+
LocalFileSender.start(localFileLoader, channel);
260+
}
261+
262+
return BatchSpanProcessor.builder(channel)
263+
.setMaxQueueSize(exportQueueCapacity)
264+
.setMaxExportBatchSize(maxExportBatchSize)
265+
.build(queueName);
240266
}
241267

242268
public BatchSpanProcessor getStatsbeatChannelBatcher() {
@@ -266,7 +292,7 @@ public BatchSpanProcessor getStatsbeatChannelBatcher() {
266292
LocalFileSender.start(localFileLoader, channel);
267293
}
268294

269-
statsbeatChannelBatcher = BatchSpanProcessor.builder(channel).build();
295+
statsbeatChannelBatcher = BatchSpanProcessor.builder(channel).build("statsbeat");
270296
}
271297
}
272298
}
@@ -486,7 +512,8 @@ public static class Builder {
486512
private Cache<String, String> ikeyEndpointMap;
487513
private StatsbeatModule statsbeatModule;
488514
private boolean readOnlyFileSystem;
489-
private int exportQueueCapacity;
515+
private int generalExportQueueCapacity;
516+
private int metricsExportQueueCapacity;
490517
@Nullable private Configuration.AadAuthentication aadAuthentication;
491518

492519
public Builder setCustomDimensions(Map<String, String> customDimensions) {
@@ -533,8 +560,13 @@ public Builder setReadOnlyFileSystem(boolean readOnlyFileSystem) {
533560
return this;
534561
}
535562

536-
public Builder setMaxExportQueueSize(int exportQueueCapacity) {
537-
this.exportQueueCapacity = exportQueueCapacity;
563+
public Builder setGeneralExportQueueSize(int generalExportQueueCapacity) {
564+
this.generalExportQueueCapacity = generalExportQueueCapacity;
565+
return this;
566+
}
567+
568+
public Builder setMetricsExportQueueSize(int metricsExportQueueCapacity) {
569+
this.metricsExportQueueCapacity = metricsExportQueueCapacity;
538570
return this;
539571
}
540572

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// Project properties
2-
version=3.2.5-BETA
2+
version=3.2.5-BETA.2
33
group=com.microsoft.azure
44
# gradle default is 256m which causes sporadic failures - https://docs.gradle.org/current/userguide/build_environment.html#sec:configuring_jvm_memory
55
org.gradle.jvmargs=-XX:MaxMetaspaceSize=512m

0 commit comments

Comments
 (0)