Skip to content

Commit 1505c4a

Browse files
authored
fix quick pulse memory leak (#1879)
* fix quick pulse memory leak * add/update tests * use list instead of arrayblocking queue * use quickpulsestatus instead of headerinfo
1 parent 24aa1ab commit 1505c4a

File tree

5 files changed

+79
-6
lines changed

5 files changed

+79
-6
lines changed

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/quickpulse/QuickPulseCoordinator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ private long sendData() {
8080
QuickPulseHeaderInfo currentQuickPulseHeaderInfo = dataSender.getQuickPulseHeaderInfo();
8181

8282
this.handleReceivedHeaders(currentQuickPulseHeaderInfo);
83-
83+
QuickPulseDataCollector.INSTANCE.setQuickPulseStatus(
84+
currentQuickPulseHeaderInfo.getQuickPulseStatus());
8485
switch (currentQuickPulseHeaderInfo.getQuickPulseStatus()) {
8586
case ERROR:
8687
pingMode = true;
@@ -105,6 +106,7 @@ private long sendData() {
105106
private long ping() {
106107
QuickPulseHeaderInfo pingResult = pingSender.ping(qpsServiceRedirectedEndpoint);
107108
this.handleReceivedHeaders(pingResult);
109+
QuickPulseDataCollector.INSTANCE.setQuickPulseStatus(pingResult.getQuickPulseStatus());
108110
switch (pingResult.getQuickPulseStatus()) {
109111
case ERROR:
110112
return waitOnErrorInMillis;

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/quickpulse/QuickPulseDataCollector.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ private CountAndDuration(long count, long duration) {
113113
static class Counters {
114114
private static final long MAX_COUNT = 524287L;
115115
private static final long MAX_DURATION = 17592186044415L;
116+
private static final int MAX_DOCUMENTS_SIZE = 1000;
116117

117118
public final AtomicInteger exceptions = new AtomicInteger(0);
118119

@@ -139,6 +140,7 @@ static CountAndDuration decodeCountAndDuration(long countAndDuration) {
139140
private final AtomicReference<Counters> counters = new AtomicReference<>(null);
140141
private final MemoryMXBean memory;
141142
private final CpuPerformanceCounterCalculator cpuPerformanceCounterCalculator;
143+
private volatile QuickPulseStatus quickPulseStatus;
142144

143145
QuickPulseDataCollector() {
144146
CpuPerformanceCounterCalculator temp;
@@ -162,17 +164,28 @@ static CountAndDuration decodeCountAndDuration(long countAndDuration) {
162164
}
163165
cpuPerformanceCounterCalculator = temp;
164166
memory = ManagementFactory.getMemoryMXBean();
167+
quickPulseStatus = QuickPulseStatus.QP_IS_OFF;
165168
}
166169

167170
public synchronized void disable() {
168171
counters.set(null);
172+
quickPulseStatus = QuickPulseStatus.QP_IS_OFF;
169173
}
170174

171175
public synchronized void enable(TelemetryClient telemetryClient) {
172176
this.telemetryClient = telemetryClient;
173177
counters.set(new Counters());
174178
}
175179

180+
public synchronized void setQuickPulseStatus(QuickPulseStatus quickPulseStatus) {
181+
this.quickPulseStatus = quickPulseStatus;
182+
}
183+
184+
// Used only in tests
185+
public synchronized QuickPulseStatus getQuickPulseStatus() {
186+
return this.quickPulseStatus;
187+
}
188+
176189
@Nullable
177190
public synchronized FinalCounters getAndRestart() {
178191
Counters currentCounters = counters.getAndSet(new Counters());
@@ -194,8 +207,8 @@ synchronized FinalCounters peek() {
194207
}
195208

196209
public void add(TelemetryItem telemetryItem) {
197-
if (telemetryClient == null) {
198-
// quick pulse is not enabled
210+
if (telemetryClient == null || quickPulseStatus != QuickPulseStatus.QP_IS_ON) {
211+
// quick pulse is not enabled or quick pulse data sender is not enabled
199212
return;
200213
}
201214

@@ -252,7 +265,9 @@ private void addDependency(RemoteDependencyData telemetry, int itemCount) {
252265
quickPulseDependencyDocument.setProperties(
253266
aggregateProperties(telemetry.getProperties(), telemetry.getMeasurements()));
254267
synchronized (counters.documentList) {
255-
counters.documentList.add(quickPulseDependencyDocument);
268+
if (counters.documentList.size() < Counters.MAX_DOCUMENTS_SIZE) {
269+
counters.documentList.add(quickPulseDependencyDocument);
270+
}
256271
}
257272
}
258273

@@ -287,7 +302,9 @@ private void addException(TelemetryExceptionData exceptionData, int itemCount) {
287302
quickPulseExceptionDocument.setExceptionType(exceptionList.get(0).getTypeName());
288303
}
289304
synchronized (counters.documentList) {
290-
counters.documentList.add(quickPulseExceptionDocument);
305+
if (counters.documentList.size() < Counters.MAX_DOCUMENTS_SIZE) {
306+
counters.documentList.add(quickPulseExceptionDocument);
307+
}
291308
}
292309
}
293310

@@ -315,7 +332,9 @@ private void addRequest(RequestData requestTelemetry, int itemCount) {
315332
quickPulseRequestDocument.setProperties(
316333
aggregateProperties(requestTelemetry.getProperties(), requestTelemetry.getMeasurements()));
317334
synchronized (counters.documentList) {
318-
counters.documentList.add(quickPulseRequestDocument);
335+
if (counters.documentList.size() < Counters.MAX_DOCUMENTS_SIZE) {
336+
counters.documentList.add(quickPulseRequestDocument);
337+
}
319338
}
320339
}
321340

agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/quickpulse/QuickPulseCoordinatorTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,29 @@
2121

2222
package com.microsoft.applicationinsights.agent.internal.quickpulse;
2323

24+
import static org.assertj.core.api.Assertions.assertThat;
2425
import static org.mockito.ArgumentMatchers.any;
2526
import static org.mockito.ArgumentMatchers.notNull;
2627
import static org.mockito.Mockito.mock;
2728

29+
import org.junit.jupiter.api.AfterEach;
30+
import org.junit.jupiter.api.BeforeEach;
2831
import org.junit.jupiter.api.Disabled;
2932
import org.junit.jupiter.api.Test;
3033
import org.mockito.Mockito;
3134

3235
class QuickPulseCoordinatorTest {
36+
37+
@BeforeEach
38+
void setup() {
39+
QuickPulseDataCollector.INSTANCE.disable();
40+
}
41+
42+
@AfterEach
43+
void tearDown() {
44+
QuickPulseDataCollector.INSTANCE.disable();
45+
}
46+
3347
@Test
3448
void testOnlyPings() throws InterruptedException {
3549
QuickPulseDataFetcher mockFetcher = mock(QuickPulseDataFetcher.class);
@@ -65,6 +79,9 @@ void testOnlyPings() throws InterruptedException {
6579
Mockito.verify(mockSender, Mockito.never()).getQuickPulseHeaderInfo();
6680

6781
Mockito.verify(mockPingSender, Mockito.atLeast(1)).ping(null);
82+
// make sure QP_IS_OFF after ping
83+
assertThat(QuickPulseDataCollector.INSTANCE.getQuickPulseStatus())
84+
.isEqualTo(QuickPulseStatus.QP_IS_OFF);
6885
}
6986

7087
@Test
@@ -107,6 +124,9 @@ void testOnePingAndThenOnePost() throws InterruptedException {
107124
Mockito.verify(mockSender, Mockito.times(1)).getQuickPulseHeaderInfo();
108125

109126
Mockito.verify(mockPingSender, Mockito.atLeast(1)).ping(null);
127+
// Make sure QP_IS_OFF after one post and ping
128+
assertThat(QuickPulseDataCollector.INSTANCE.getQuickPulseStatus())
129+
.isEqualTo(QuickPulseStatus.QP_IS_OFF);
110130
}
111131

112132
// FIXME (trask) sporadically failing on CI

agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/quickpulse/QuickPulseDataCollectorTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ void nullCountersAfterDisable() {
7878
void requestTelemetryIsCounted_DurationIsSum() {
7979
TelemetryClient telemetryClient = new TelemetryClient();
8080
telemetryClient.setInstrumentationKey(FAKE_INSTRUMENTATION_KEY);
81+
QuickPulseDataCollector.INSTANCE.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON);
8182
QuickPulseDataCollector.INSTANCE.enable(telemetryClient);
8283

8384
// add a success and peek
@@ -120,6 +121,7 @@ void requestTelemetryIsCounted_DurationIsSum() {
120121
void dependencyTelemetryIsCounted_DurationIsSum() {
121122
TelemetryClient telemetryClient = new TelemetryClient();
122123
telemetryClient.setInstrumentationKey(FAKE_INSTRUMENTATION_KEY);
124+
QuickPulseDataCollector.INSTANCE.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON);
123125
QuickPulseDataCollector.INSTANCE.enable(telemetryClient);
124126

125127
// add a success and peek.
@@ -162,6 +164,7 @@ void dependencyTelemetryIsCounted_DurationIsSum() {
162164
void exceptionTelemetryIsCounted() {
163165
TelemetryClient telemetryClient = new TelemetryClient();
164166
telemetryClient.setInstrumentationKey(FAKE_INSTRUMENTATION_KEY);
167+
QuickPulseDataCollector.INSTANCE.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON);
165168
QuickPulseDataCollector.INSTANCE.enable(telemetryClient);
166169

167170
TelemetryItem telemetry = createExceptionTelemetry(new Exception());
@@ -253,4 +256,30 @@ private static void assertCountersReset(FinalCounters counters) {
253256

254257
assertThat(counters.exceptions).isEqualTo(0);
255258
}
259+
260+
@Test
261+
void checkDocumentsListSize() {
262+
TelemetryClient telemetryClient = new TelemetryClient();
263+
telemetryClient.setInstrumentationKey(FAKE_INSTRUMENTATION_KEY);
264+
QuickPulseDataCollector.INSTANCE.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON);
265+
QuickPulseDataCollector.INSTANCE.enable(telemetryClient);
266+
267+
final long duration = 112233L;
268+
TelemetryItem telemetry =
269+
createRequestTelemetry("request-test", new Date(), duration, "200", true);
270+
telemetry.setInstrumentationKey(FAKE_INSTRUMENTATION_KEY);
271+
for (int i = 0; i < 1005; i++) {
272+
QuickPulseDataCollector.INSTANCE.add(telemetry);
273+
}
274+
// check max documentList size
275+
assertThat(QuickPulseDataCollector.INSTANCE.getAndRestart().documentList.size())
276+
.isEqualTo(1000);
277+
278+
QuickPulseDataCollector.INSTANCE.setQuickPulseStatus(QuickPulseStatus.QP_IS_OFF);
279+
for (int i = 0; i < 5; i++) {
280+
QuickPulseDataCollector.INSTANCE.add(telemetry);
281+
}
282+
// no telemetry items are added when QP_IS_OFF
283+
assertThat(QuickPulseDataCollector.INSTANCE.getAndRestart().documentList.size()).isEqualTo(0);
284+
}
256285
}

agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/quickpulse/QuickPulseIntegrationTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public void testPostRequest() throws InterruptedException {
110110
telemetryClient.setConnectionString(connectionString);
111111
QuickPulseDataFetcher dataFetcher =
112112
new QuickPulseDataFetcher(sendQueue, telemetryClient, "machine1", "instance1", null);
113+
QuickPulseDataCollector.INSTANCE.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON);
113114
QuickPulseDataCollector.INSTANCE.enable(telemetryClient);
114115
final long duration = 112233L;
115116
// Request Telemetry
@@ -148,6 +149,8 @@ public void testPostRequest() throws InterruptedException {
148149
Thread.sleep(50);
149150
assertTrue(pingCountDown.await(1, TimeUnit.SECONDS));
150151
assertThat(quickPulseHeaderInfo.getQuickPulseStatus()).isEqualTo(QuickPulseStatus.QP_IS_ON);
152+
assertThat(QuickPulseDataCollector.INSTANCE.getQuickPulseStatus())
153+
.isEqualTo(QuickPulseStatus.QP_IS_ON);
151154
assertTrue(postCountDown.await(1, TimeUnit.SECONDS));
152155
senderThread.interrupt();
153156
coordinatorThread.interrupt();

0 commit comments

Comments
 (0)