Skip to content

Commit 47fecdf

Browse files
authored
Handles the edge case in Netty client where HTTP/2 stream gets cleane… (#6649)
* Handles the edge case in Netty client where HTTP/2 stream gets cleaned up before metrics collection completes, causing NPE to be thrown * Remove unused imports and reuse method
1 parent a313987 commit 47fecdf

File tree

6 files changed

+59
-17
lines changed

6 files changed

+59
-17
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Netty NIO HTTP Client",
4+
"contributor": "",
5+
"description": "Handles the edge case in Netty client where HTTP/2 stream gets cleaned up before metrics collection completes, causing NPE to be thrown. See [#6561](https://github.com/aws/aws-sdk-java-v2/issues/6561)."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import software.amazon.awssdk.http.nio.netty.internal.AwaitCloseChannelPoolMap;
5151
import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
5252
import software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor;
53+
import software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics;
5354
import software.amazon.awssdk.http.nio.netty.internal.NonManagedEventLoopGroup;
5455
import software.amazon.awssdk.http.nio.netty.internal.RequestContext;
5556
import software.amazon.awssdk.http.nio.netty.internal.SdkChannelOptions;
@@ -132,7 +133,9 @@ private NettyNioAsyncHttpClient(DefaultBuilder builder, AttributeMap serviceDefa
132133
public CompletableFuture<Void> execute(AsyncExecuteRequest request) {
133134
failIfAlpnUsedWithHttp(request);
134135
RequestContext ctx = createRequestContext(request);
135-
ctx.metricCollector().reportMetric(HTTP_CLIENT_NAME, clientName()); // TODO: Can't this be done in core?
136+
NettyRequestMetrics.ifMetricsAreEnabled(
137+
ctx.metricCollector(),
138+
c -> c.reportMetric(HTTP_CLIENT_NAME, clientName())); // TODO: Can't this be done in core?
136139
return new NettyRequestExecutor(ctx).execute();
137140
}
138141

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestMetrics.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.http.nio.netty.internal;
1717

18+
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyClientLogger.getLogger;
19+
1820
import io.netty.channel.Channel;
1921
import io.netty.handler.codec.http2.Http2Connection;
2022
import io.netty.handler.codec.http2.Http2Stream;
@@ -25,6 +27,7 @@
2527
import java.util.function.Consumer;
2628
import software.amazon.awssdk.annotations.SdkInternalApi;
2729
import software.amazon.awssdk.http.Http2Metric;
30+
import software.amazon.awssdk.http.nio.netty.internal.utils.NettyClientLogger;
2831
import software.amazon.awssdk.metrics.MetricCollector;
2932
import software.amazon.awssdk.metrics.NoOpMetricCollector;
3033

@@ -33,6 +36,9 @@
3336
*/
3437
@SdkInternalApi
3538
public class NettyRequestMetrics {
39+
40+
private static final NettyClientLogger logger = getLogger(NettyRequestMetrics.class);
41+
3642
private NettyRequestMetrics() {
3743
}
3844

@@ -45,7 +51,11 @@ public static boolean metricsAreEnabled(MetricCollector metricCollector) {
4551

4652
public static void ifMetricsAreEnabled(MetricCollector metrics, Consumer<MetricCollector> metricsConsumer) {
4753
if (metricsAreEnabled(metrics)) {
48-
metricsConsumer.accept(metrics);
54+
try {
55+
metricsConsumer.accept(metrics);
56+
} catch (Exception e) {
57+
logger.warn(null, () -> "Failed to collect metrics", e);
58+
}
4959
}
5060
}
5161

@@ -54,13 +64,9 @@ public static void ifMetricsAreEnabled(MetricCollector metrics, Consumer<MetricC
5464
* the stream has been initialized. If the stream is not initialized when this is invoked, an exception will be thrown.
5565
*/
5666
public static void publishHttp2StreamMetrics(MetricCollector metricCollector, Channel channel) {
57-
if (!metricsAreEnabled(metricCollector)) {
58-
return;
59-
}
6067

61-
getHttp2Connection(channel).ifPresent(http2Connection -> {
62-
writeHttp2RequestMetrics(metricCollector, channel, http2Connection);
63-
});
68+
ifMetricsAreEnabled(metricCollector, collector -> getHttp2Connection(channel)
69+
.ifPresent(http2Connection -> writeHttp2RequestMetrics(collector, channel, http2Connection)));
6470
}
6571

6672
private static Optional<Http2Connection> getHttp2Connection(Channel channel) {
@@ -78,10 +84,12 @@ private static void writeHttp2RequestMetrics(MetricCollector metricCollector,
7884
int streamId = channel.attr(ChannelAttributeKey.HTTP2_FRAME_STREAM).get().id();
7985

8086
Http2Stream stream = http2Connection.stream(streamId);
81-
metricCollector.reportMetric(Http2Metric.LOCAL_STREAM_WINDOW_SIZE_IN_BYTES,
82-
http2Connection.local().flowController().windowSize(stream));
83-
metricCollector.reportMetric(Http2Metric.REMOTE_STREAM_WINDOW_SIZE_IN_BYTES,
84-
http2Connection.remote().flowController().windowSize(stream));
87+
if (stream != null) {
88+
metricCollector.reportMetric(Http2Metric.LOCAL_STREAM_WINDOW_SIZE_IN_BYTES,
89+
http2Connection.local().flowController().windowSize(stream));
90+
metricCollector.reportMetric(Http2Metric.REMOTE_STREAM_WINDOW_SIZE_IN_BYTES,
91+
http2Connection.remote().flowController().windowSize(stream));
92+
}
8593
}
8694

8795
/**

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClientWireMockTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,16 @@
7575
import org.junit.Before;
7676
import org.junit.Rule;
7777
import org.junit.Test;
78+
import org.junit.jupiter.api.Assertions;
7879
import org.junit.runner.RunWith;
80+
import org.mockito.ArgumentMatchers;
7981
import org.mockito.Mockito;
8082
import org.mockito.junit.MockitoJUnitRunner;
8183
import org.mockito.stubbing.Answer;
84+
import software.amazon.awssdk.http.EmptyPublisher;
8285
import software.amazon.awssdk.http.HttpMetric;
8386
import software.amazon.awssdk.http.HttpTestUtils;
87+
import software.amazon.awssdk.http.Protocol;
8488
import software.amazon.awssdk.http.ProtocolNegotiation;
8589
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
8690
import software.amazon.awssdk.http.SdkHttpFullRequest;
@@ -93,6 +97,7 @@
9397
import software.amazon.awssdk.http.nio.netty.internal.SdkChannelPool;
9498
import software.amazon.awssdk.http.nio.netty.internal.SdkChannelPoolMap;
9599
import software.amazon.awssdk.metrics.MetricCollection;
100+
import software.amazon.awssdk.metrics.MetricCollector;
96101
import software.amazon.awssdk.utils.AttributeMap;
97102

98103
@RunWith(MockitoJUnitRunner.class)
@@ -736,6 +741,31 @@ public void metricsAreCollectedForClosedClientCalls() throws Exception {
736741
assertThat(metrics.metricValues(HttpMetric.AVAILABLE_CONCURRENCY).get(0)).isBetween(0, 1);
737742
}
738743

744+
@Test
745+
public void metricReportingThrowException_shouldNotFailRequest() throws Exception {
746+
try (SdkAsyncHttpClient client = NettyNioAsyncHttpClient.builder()
747+
.protocol(Protocol.HTTP2)
748+
.maxConcurrency(10)
749+
.http2Configuration(c -> c.maxStreams(3L)
750+
.initialWindowSize(65535 * 3))
751+
.build()) {
752+
MetricCollector metricCollector = Mockito.mock(MetricCollector.class);
753+
Mockito.doThrow(new RuntimeException()).when(metricCollector).reportMetric(ArgumentMatchers.any(), ArgumentMatchers.any());
754+
755+
String body = randomAlphabetic(10);
756+
URI uri = URI.create("http://localhost:" + mockServer.port());
757+
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withBody(body)));
758+
SdkHttpRequest request = createRequest(uri);
759+
AsyncExecuteRequest asyncRequest = AsyncExecuteRequest.builder()
760+
.request(request)
761+
.requestContentPublisher(new EmptyPublisher())
762+
.responseHandler(new RecordingResponseHandler())
763+
.metricCollector(metricCollector)
764+
.build();
765+
Assertions.assertDoesNotThrow(() -> client.execute(asyncRequest).join());
766+
}
767+
}
768+
739769
private void verifyChannelRelease(Channel channel) throws InterruptedException {
740770
Thread.sleep(1000);
741771
assertThat(channel.attr(AttributeKey.valueOf("channelPool")).get()).isNull();

test/architecture-tests/archunit_store/a03c3b13-315b-478c-b6c8-82ffd09dafa8

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ Method <software.amazon.awssdk.core.async.listener.AsyncResponseTransformerListe
33
Method <software.amazon.awssdk.core.async.listener.PublisherListener$NotifyingPublisher.invoke(java.lang.Runnable, java.lang.String)> calls method <software.amazon.awssdk.utils.Logger.error(java.util.function.Supplier, java.lang.Throwable)> in (PublisherListener.java:75)
44
Method <software.amazon.awssdk.core.async.listener.SubscriberListener$NotifyingSubscriber.invoke(java.lang.Runnable, java.lang.String)> calls method <software.amazon.awssdk.utils.Logger.error(java.util.function.Supplier, java.lang.Throwable)> in (SubscriberListener.java:104)
55
Method <software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody.subscribe(org.reactivestreams.Subscriber)> calls method <software.amazon.awssdk.utils.Logger.error(java.util.function.Supplier, java.lang.Throwable)> in (ByteBuffersAsyncRequestBody.java:117)
6-
Method <software.amazon.awssdk.core.sync.ResponseTransformer.toFile(java.nio.file.Path)> calls method <software.amazon.awssdk.utils.Logger.error(java.util.function.Supplier, java.lang.Throwable)> in (ResponseTransformer.java:122)
76
Method <software.amazon.awssdk.imds.internal.AsyncTokenCache.startRefresh()> calls method <software.amazon.awssdk.utils.Logger.error(java.util.function.Supplier, java.lang.Throwable)> in (AsyncTokenCache.java:113)
87
Method <software.amazon.awssdk.metrics.publishers.cloudwatch.CloudWatchMetricPublisher.close()> calls method <software.amazon.awssdk.utils.Logger.error(java.util.function.Supplier, java.lang.Throwable)> in (CloudWatchMetricPublisher.java:344)
98
Method <software.amazon.awssdk.metrics.publishers.cloudwatch.CloudWatchMetricPublisher.close()> calls method <software.amazon.awssdk.utils.Logger.error(java.util.function.Supplier, java.lang.Throwable)> in (CloudWatchMetricPublisher.java:346)
Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
11
Class <software.amazon.awssdk.core.HttpChecksumConstant> does not reside in any package ['..internal..'] in (HttpChecksumConstant.java:0)
2-
Class <software.amazon.awssdk.core.checksums.Crc32CChecksum> does not reside in any package ['..internal..'] in (Crc32CChecksum.java:0)
3-
Class <software.amazon.awssdk.core.checksums.Crc32Checksum> does not reside in any package ['..internal..'] in (Crc32Checksum.java:0)
4-
Class <software.amazon.awssdk.core.checksums.Md5Checksum> does not reside in any package ['..internal..'] in (Md5Checksum.java:0)
52
Class <software.amazon.awssdk.core.checksums.Sha1Checksum> does not reside in any package ['..internal..'] in (Sha1Checksum.java:0)
6-
Class <software.amazon.awssdk.core.checksums.Sha256Checksum> does not reside in any package ['..internal..'] in (Sha256Checksum.java:0)
73
Class <software.amazon.awssdk.core.endpointdiscovery.providers.EndpointDiscoveryProvider> does not reside in any package ['..internal..'] in (EndpointDiscoveryProvider.java:0)
84
Class <software.amazon.awssdk.core.endpointdiscovery.providers.ProfileEndpointDiscoveryProvider> does not reside in any package ['..internal..'] in (ProfileEndpointDiscoveryProvider.java:0)

0 commit comments

Comments
 (0)