Skip to content

Commit 8349df0

Browse files
authored
Fix netty ByteBuf leak (#1822)
* Fix netty ByteBuf leak * Consume responses everywhere
1 parent f5e8c73 commit 8349df0

File tree

5 files changed

+38
-12
lines changed

5 files changed

+38
-12
lines changed

agent/agent-profiler/agent-service-profiler/src/main/java/com/microsoft/applicationinsights/serviceprofilerapi/client/ProfilerFrontendClientV2.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public Mono<BlobAccessPass> getUploadAccess(UUID profileId) {
8585
// this shouldn't happen, the mono should complete with a response or a failure
8686
throw new AssertionError("http response mono returned empty");
8787
}
88+
// response body is not consumed below
89+
consumeResponseBody(response);
90+
8891
if (response.getStatusCode() >= 300) {
8992
throw new HttpResponseException(response);
9093
}
@@ -97,6 +100,13 @@ public Mono<BlobAccessPass> getUploadAccess(UUID profileId) {
97100
});
98101
}
99102

103+
// need to consume response, otherwise get netty ByteBuf leak warnings:
104+
// io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before
105+
// it's garbage-collected (see https://github.com/Azure/azure-sdk-for-java/issues/10467)
106+
private static void consumeResponseBody(HttpResponse response) {
107+
response.getBody().subscribe();
108+
}
109+
100110
public Mono<HttpResponse> executePostWithRedirect(URL requestUrl) {
101111

102112
HttpRequest request = new HttpRequest(HttpMethod.POST, requestUrl);
@@ -124,6 +134,7 @@ public Mono<ArtifactAcceptedResponse> reportUploadFinish(UUID profileId, String
124134
int statusCode = response.getStatusCode();
125135
if (statusCode != 201 && statusCode != 202) {
126136
LOGGER.error("Trace upload failed: {}", statusCode);
137+
consumeResponseBody(response);
127138
return Mono.error(new AssertionError("http request failed"));
128139
}
129140

@@ -168,9 +179,7 @@ public Mono<String> getSettings(Date oldTimeStamp) {
168179
return Mono.error(new AssertionError("http response mono returned empty"));
169180
}
170181
if (response.getStatusCode() >= 300) {
171-
// FIXME (trask) does azure http client throw HttpResponseException already on >=
172-
// 300 response
173-
// above?
182+
consumeResponseBody(response);
174183
return Mono.error(new HttpResponseException(response));
175184
}
176185
return response.getBodyAsString();

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/httpclient/LazyHttpClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,13 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
147147
return getDelegate().send(request, context);
148148
}
149149

150+
// need to consume response, otherwise get netty ByteBuf leak warnings:
151+
// io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before
152+
// it's garbage-collected (see https://github.com/Azure/azure-sdk-for-java/issues/10467)
153+
public static void consumeResponseBody(HttpResponse response) {
154+
response.getBody().subscribe();
155+
}
156+
150157
private static HttpPipelinePolicy getAuthenticationPolicy(
151158
Configuration.AadAuthentication configuration) {
152159
switch (configuration.type) {

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/quickpulse/QuickPulseDataSender.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.azure.core.http.HttpPipeline;
2525
import com.azure.core.http.HttpRequest;
2626
import com.azure.core.http.HttpResponse;
27+
import com.microsoft.applicationinsights.agent.internal.httpclient.LazyHttpClient;
2728
import java.util.concurrent.ArrayBlockingQueue;
2829

2930
class QuickPulseDataSender implements Runnable {
@@ -57,7 +58,14 @@ public void run() {
5758

5859
long sendTime = System.nanoTime();
5960
try (HttpResponse response = httpPipeline.send(post).block()) {
60-
if (response != null && networkHelper.isSuccess(response)) {
61+
if (response == null) {
62+
// this shouldn't happen, the mono should complete with a response or a failure
63+
throw new AssertionError("http response mono returned empty");
64+
}
65+
// response body is not consumed below
66+
LazyHttpClient.consumeResponseBody(response);
67+
68+
if (networkHelper.isSuccess(response)) {
6169
QuickPulseHeaderInfo quickPulseHeaderInfo =
6270
networkHelper.getQuickPulseHeaderInfo(response);
6371
switch (quickPulseHeaderInfo.getQuickPulseStatus()) {

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/quickpulse/QuickPulsePingSender.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.microsoft.applicationinsights.agent.internal.common.ExceptionStats;
2828
import com.microsoft.applicationinsights.agent.internal.common.ExceptionUtils;
2929
import com.microsoft.applicationinsights.agent.internal.common.Strings;
30+
import com.microsoft.applicationinsights.agent.internal.httpclient.LazyHttpClient;
3031
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryClient;
3132
import java.util.Date;
3233
import java.util.concurrent.atomic.AtomicBoolean;
@@ -91,11 +92,15 @@ public QuickPulseHeaderInfo ping(String redirectedEndpoint) {
9192
request.setBody(buildPingEntity(currentDate.getTime()));
9293

9394
long sendTime = System.nanoTime();
94-
HttpResponse response = null;
95-
try {
95+
try (HttpResponse response = httpPipeline.send(request).block()) {
96+
if (response == null) {
97+
// this shouldn't happen, the mono should complete with a response or a failure
98+
throw new AssertionError("http response mono returned empty");
99+
}
100+
// response body is not consumed below
101+
LazyHttpClient.consumeResponseBody(response);
96102

97-
response = httpPipeline.send(request).block();
98-
if (response != null && networkHelper.isSuccess(response)) {
103+
if (networkHelper.isSuccess(response)) {
99104
QuickPulseHeaderInfo quickPulseHeaderInfo = networkHelper.getQuickPulseHeaderInfo(response);
100105
switch (quickPulseHeaderInfo.getQuickPulseStatus()) {
101106
case QP_IS_OFF:
@@ -111,10 +116,6 @@ public QuickPulseHeaderInfo ping(String redirectedEndpoint) {
111116
} catch (Throwable t) {
112117
exceptionStats.recordFailure(t.getMessage(), t);
113118
ExceptionUtils.parseError(t, getQuickPulseEndpoint(), friendlyExceptionThrown, logger);
114-
} finally {
115-
if (response != null) {
116-
response.close();
117-
}
118119
}
119120
return onPingError(sendTime);
120121
}

agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ private CompletableResultCode internalSend(
202202
.subscribe(
203203
response -> {
204204
parseResponseCode(response.getStatusCode(), byteBuffers, byteBuffers);
205+
LazyHttpClient.consumeResponseBody(response);
205206
},
206207
error -> {
207208
StatsbeatModule.get().getNetworkStatsbeat().incrementRequestFailureCount();

0 commit comments

Comments
 (0)