Skip to content

Commit 671dee0

Browse files
committed
Review comments.
1 parent 1ff1f0b commit 671dee0

File tree

7 files changed

+209
-36
lines changed

7 files changed

+209
-36
lines changed

core/src/main/java/io/grpc/internal/CertificateUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
/**
3333
* Contains certificate/key PEM file utility method(s) for internal usage.
3434
*/
35-
public class CertificateUtils {
35+
public final class CertificateUtils {
3636
/**
3737
* Creates X509TrustManagers using the provided CA certs.
3838
*/

netty/src/main/java/io/grpc/netty/GrpcHttp2OutboundHeaders.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,14 @@ static GrpcHttp2OutboundHeaders clientRequestHeaders(byte[][] serializedMetadata
4646
return new GrpcHttp2OutboundHeaders(preHeaders, serializedMetadata);
4747
}
4848

49-
String getAuthority() {
49+
@Override
50+
public String authority() {
5051
for (int i = 0; i < preHeaders.length / 2; i++) {
5152
if (preHeaders[i].equals(Http2Headers.PseudoHeaderName.AUTHORITY.value())) {
5253
return preHeaders[i + 1].toString();
5354
}
5455
}
55-
return null;
56+
return "";
5657
}
5758

5859
static GrpcHttp2OutboundHeaders serverResponseHeaders(byte[][] serializedMetadata) {

netty/src/main/java/io/grpc/netty/NettyClientHandler.java

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.grpc.Attributes;
2929
import io.grpc.ChannelLogger;
3030
import io.grpc.InternalChannelz;
31+
import io.grpc.InternalStatus;
3132
import io.grpc.Metadata;
3233
import io.grpc.Status;
3334
import io.grpc.StatusException;
@@ -586,17 +587,6 @@ protected boolean isGracefulShutdownComplete() {
586587
&& ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
587588
}
588589

589-
private String getAuthorityPseudoHeader(Http2Headers http2Headers) {
590-
if (http2Headers instanceof GrpcHttp2OutboundHeaders) {
591-
return ((GrpcHttp2OutboundHeaders) http2Headers).getAuthority();
592-
}
593-
try {
594-
return http2Headers.authority().toString();
595-
} catch (UnsupportedOperationException e) {
596-
return null;
597-
}
598-
}
599-
600590
/**
601591
* Attempts to create a new stream from the given command. If there are too many active streams,
602592
* the creation request is queued.
@@ -613,28 +603,48 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise)
613603
return;
614604
}
615605

616-
String authority = getAuthorityPseudoHeader(command.headers());
606+
CharSequence authority = command.headers().authority();
617607
if (authority != null) {
618-
Status authorityVerificationStatus = peerVerificationResults.get(authority);
619-
if (authorityVerificationStatus == null && attributes != null
620-
&& attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) != null) {
621-
authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
622-
.verifyAuthority(((GrpcHttp2OutboundHeaders) command.headers()).getAuthority());
623-
peerVerificationResults.put(authority, authorityVerificationStatus);
608+
Status authorityVerificationStatus = peerVerificationResults.get(authority.toString());
609+
if (authorityVerificationStatus == null) {
610+
if (attributes != null
611+
&& attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) != null) {
612+
authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
613+
.verifyAuthority(authority.toString());
614+
peerVerificationResults.put(authority.toString(), authorityVerificationStatus);
615+
if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) {
616+
logger.log(Level.WARNING, String.format("%s.%s",
617+
authorityVerificationStatus.getDescription(), enablePerRpcAuthorityCheck
618+
? "" : " This will be an error in the future."),
619+
InternalStatus.asRuntimeExceptionWithoutStacktrace(authorityVerificationStatus, null));
620+
}
621+
} else {
622+
authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
623+
"Authority verifier not found to verify authority");
624+
command.stream().setNonExistent();
625+
command.stream().transportReportStatus(
626+
authorityVerificationStatus, RpcProgress.DROPPED, true, new Metadata());
627+
promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(authorityVerificationStatus, null));
628+
return;
629+
}
624630
}
625631
if (authorityVerificationStatus != null && !authorityVerificationStatus.isOk()) {
626-
logger.log(Level.WARNING, String.format("%s.%s",
627-
authorityVerificationStatus.getDescription(), enablePerRpcAuthorityCheck
628-
? "" : "This will be an error in the future."),
629-
authorityVerificationStatus.getCause());
630632
if (enablePerRpcAuthorityCheck) {
631633
command.stream().setNonExistent();
632634
command.stream().transportReportStatus(
633-
authorityVerificationStatus, RpcProgress.DROPPED, true, new Metadata());
634-
promise.setFailure(authorityVerificationStatus.getCause());
635+
authorityVerificationStatus, RpcProgress.DROPPED, true, new Metadata());
636+
promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(authorityVerificationStatus, null));
635637
return;
636638
}
637639
}
640+
} else {
641+
Status authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
642+
"Missing authority header");
643+
command.stream().setNonExistent();
644+
command.stream().transportReportStatus(
645+
Status.UNAVAILABLE, RpcProgress.DROPPED, true, new Metadata());
646+
promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(authorityVerificationStatus, null));
647+
return;
638648
}
639649
// Get the stream ID for the new stream.
640650
int streamId;

netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -762,8 +762,7 @@ public static ProtocolNegotiator tls(SslContext sslContext,
762762
*/
763763
public static ProtocolNegotiator tls(SslContext sslContext,
764764
X509TrustManager x509ExtendedTrustManager) {
765-
return tls(sslContext, null, Optional.absent(),
766-
x509ExtendedTrustManager);
765+
return tls(sslContext, null, Optional.absent(), x509ExtendedTrustManager);
767766
}
768767

769768
public static ProtocolNegotiator.ClientFactory tlsClientFactory(SslContext sslContext,

netty/src/main/java/io/grpc/netty/X509AuthorityVerifier.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import javax.net.ssl.SSLPeerUnverifiedException;
2929
import javax.net.ssl.X509TrustManager;
3030

31-
public class X509AuthorityVerifier implements AuthorityVerifier {
31+
final class X509AuthorityVerifier implements AuthorityVerifier {
3232
private final SSLEngine sslEngine;
3333
private final X509TrustManager x509ExtendedTrustManager;
3434

@@ -57,17 +57,15 @@ public X509AuthorityVerifier(SSLEngine sslEngine, X509TrustManager x509ExtendedT
5757

5858
@Override
5959
public Status verifyAuthority(@Nonnull String authority) {
60-
// sslEngine won't be set when creating ClientTlsHandler from InternalProtocolNegotiators
61-
// for example.
6260
if (sslEngine == null || x509ExtendedTrustManager == null) {
63-
return Status.FAILED_PRECONDITION.withDescription(
61+
return Status.UNAVAILABLE.withDescription(
6462
"Can't allow authority override in rpc when SslEngine or X509ExtendedTrustManager"
6563
+ " is not available");
6664
}
6765
Status peerVerificationStatus;
6866
try {
6967
// Because the authority pseudo-header can contain a port number:
70-
// https://www.rfc-editor.org/rfc/rfc7540#section-8.1.2.3com
68+
// https://www.rfc-editor.org/rfc/rfc7540#section-8.1.2.3
7169
verifyAuthorityAllowedForPeerCert(removeAnyPortNumber(authority));
7270
peerVerificationStatus = Status.OK;
7371
} catch (SSLPeerUnverifiedException | CertificateException | InvocationTargetException
@@ -99,6 +97,9 @@ private void verifyAuthorityAllowedForPeerCert(String authority)
9997
for (int i = 0; i < peerCertificates.length; i++) {
10098
x509PeerCertificates[i] = (X509Certificate) peerCertificates[i];
10199
}
100+
if (checkServerTrustedMethod == null) {
101+
throw new IllegalStateException("checkServerTrustedMethod not found");
102+
}
102103
checkServerTrustedMethod.invoke(
103104
x509ExtendedTrustManager, x509PeerCertificates, "RSA", sslEngineWrapper);
104105
}

netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.junit.Assert.assertNotNull;
3737
import static org.junit.Assert.assertNull;
3838
import static org.junit.Assert.assertTrue;
39+
import static org.junit.Assert.fail;
3940
import static org.mockito.ArgumentMatchers.any;
4041
import static org.mockito.ArgumentMatchers.eq;
4142
import static org.mockito.ArgumentMatchers.same;
@@ -64,6 +65,7 @@
6465
import io.grpc.internal.ClientStreamListener.RpcProgress;
6566
import io.grpc.internal.ClientTransport;
6667
import io.grpc.internal.ClientTransport.PingCallback;
68+
import io.grpc.internal.GrpcAttributes;
6769
import io.grpc.internal.GrpcUtil;
6870
import io.grpc.internal.KeepAliveManager;
6971
import io.grpc.internal.ManagedClientTransport;
@@ -90,10 +92,12 @@
9092
import io.netty.handler.codec.http2.Http2Stream;
9193
import io.netty.util.AsciiString;
9294
import java.io.InputStream;
95+
import java.security.cert.CertificateException;
9396
import java.text.MessageFormat;
9497
import java.util.LinkedList;
9598
import java.util.List;
9699
import java.util.Queue;
100+
import java.util.concurrent.ExecutionException;
97101
import java.util.concurrent.atomic.AtomicBoolean;
98102
import java.util.concurrent.atomic.AtomicReference;
99103
import java.util.logging.Handler;
@@ -108,6 +112,7 @@
108112
import org.mockito.ArgumentCaptor;
109113
import org.mockito.ArgumentMatchers;
110114
import org.mockito.Mock;
115+
import org.mockito.Mockito;
111116
import org.mockito.invocation.InvocationOnMock;
112117
import org.mockito.junit.MockitoJUnit;
113118
import org.mockito.junit.MockitoRule;
@@ -189,7 +194,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
189194
})
190195
.when(streamListener)
191196
.messagesAvailable(ArgumentMatchers.<StreamListener.MessageProducer>any());
192-
197+
doAnswer((attributes) -> Attributes.newBuilder().set(
198+
GrpcAttributes.ATTR_AUTHORITY_VERIFIER,
199+
(authority) -> Status.OK).build())
200+
.when(listener)
201+
.filterTransport(ArgumentMatchers.any(Attributes.class));
193202
lifecycleManager = new ClientTransportLifecycleManager(listener);
194203
// This mocks the keepalive manager only for there's in which we verify it. For other tests
195204
// it'll be null which will be testing if we behave correctly when it's not present.
@@ -919,6 +928,159 @@ public void exceptionCaughtShouldCloseConnection() throws Exception {
919928
assertFalse(channel().isOpen());
920929
}
921930

931+
@Test
932+
public void missingAuthorityHeader_streamCreationShouldFail() throws Exception {
933+
Http2Headers grpcHeadersWithoutAuthority = new DefaultHttp2Headers()
934+
.scheme(HTTPS)
935+
.path(as("/fakemethod"))
936+
.method(HTTP_METHOD)
937+
.add(as("auth"), as("sometoken"))
938+
.add(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC)
939+
.add(TE_HEADER, TE_TRAILERS);
940+
ChannelFuture channelFuture = enqueue(newCreateStreamCommand(grpcHeadersWithoutAuthority, streamTransportState));
941+
try {
942+
channelFuture.get();
943+
fail("Expected stream creation failure");
944+
} catch (ExecutionException e) {
945+
assertThat(e.getCause().getMessage()).isEqualTo("UNAVAILABLE: Missing authority header");
946+
}
947+
}
948+
949+
@Test
950+
public void missingAuthorityVerifierInAttributes_streamCreationShouldFail() throws Exception {
951+
doAnswer(
952+
new Answer<Void>() {
953+
@Override
954+
public Void answer(InvocationOnMock invocation) throws Throwable {
955+
StreamListener.MessageProducer producer =
956+
(StreamListener.MessageProducer) invocation.getArguments()[0];
957+
InputStream message;
958+
while ((message = producer.next()) != null) {
959+
streamListenerMessageQueue.add(message);
960+
}
961+
return null;
962+
}
963+
})
964+
.when(streamListener)
965+
.messagesAvailable(ArgumentMatchers.<StreamListener.MessageProducer>any());
966+
doAnswer((attributes) -> Attributes.EMPTY)
967+
.when(listener)
968+
.filterTransport(ArgumentMatchers.any(Attributes.class));
969+
lifecycleManager = new ClientTransportLifecycleManager(listener);
970+
// This mocks the keepalive manager only for there's in which we verify it. For other tests
971+
// it'll be null which will be testing if we behave correctly when it's not present.
972+
if (setKeepaliveManagerFor.contains(testNameRule.getMethodName())) {
973+
mockKeepAliveManager = mock(KeepAliveManager.class);
974+
}
975+
976+
initChannel(new GrpcHttp2ClientHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE));
977+
streamTransportState = new TransportStateImpl(
978+
handler(),
979+
channel().eventLoop(),
980+
DEFAULT_MAX_MESSAGE_SIZE,
981+
transportTracer);
982+
streamTransportState.setListener(streamListener);
983+
984+
grpcHeaders = new DefaultHttp2Headers()
985+
.scheme(HTTPS)
986+
.authority(as("www.fake.com"))
987+
.path(as("/fakemethod"))
988+
.method(HTTP_METHOD)
989+
.add(as("auth"), as("sometoken"))
990+
.add(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC)
991+
.add(TE_HEADER, TE_TRAILERS);
992+
993+
// Simulate receipt of initial remote settings.
994+
ByteBuf serializedSettings = serializeSettings(new Http2Settings());
995+
channelRead(serializedSettings);
996+
channel().releaseOutbound();
997+
998+
ChannelFuture channelFuture = createStream();
999+
try {
1000+
channelFuture.get();
1001+
fail("Expected stream creation failure");
1002+
} catch (ExecutionException e) {
1003+
assertThat(e.getCause().getMessage()).isEqualTo("UNAVAILABLE: Authority verifier not found to verify authority");
1004+
}
1005+
}
1006+
1007+
@Test
1008+
public void authorityVerificationSuccess_streamCreationSucceeds() throws Exception {
1009+
NettyClientHandler.enablePerRpcAuthorityCheck = true;
1010+
try {
1011+
ChannelFuture channelFuture = createStream();
1012+
channelFuture.get();
1013+
} finally {
1014+
NettyClientHandler.enablePerRpcAuthorityCheck = false;
1015+
}
1016+
}
1017+
1018+
@Test
1019+
public void authorityVerificationFailure_streamCreationFails() throws Exception {
1020+
NettyClientHandler.enablePerRpcAuthorityCheck = true;
1021+
try {
1022+
doAnswer(
1023+
new Answer<Void>() {
1024+
@Override
1025+
public Void answer(InvocationOnMock invocation) throws Throwable {
1026+
StreamListener.MessageProducer producer =
1027+
(StreamListener.MessageProducer) invocation.getArguments()[0];
1028+
InputStream message;
1029+
while ((message = producer.next()) != null) {
1030+
streamListenerMessageQueue.add(message);
1031+
}
1032+
return null;
1033+
}
1034+
})
1035+
.when(streamListener)
1036+
.messagesAvailable(ArgumentMatchers.<StreamListener.MessageProducer>any());
1037+
doAnswer((attributes) -> Attributes.newBuilder().set(
1038+
GrpcAttributes.ATTR_AUTHORITY_VERIFIER,
1039+
(authority) -> Status.UNAVAILABLE.withCause(
1040+
new CertificateException("Peer verification failed"))).build())
1041+
.when(listener)
1042+
.filterTransport(ArgumentMatchers.any(Attributes.class));
1043+
lifecycleManager = new ClientTransportLifecycleManager(listener);
1044+
// This mocks the keepalive manager only for there's in which we verify it. For other tests
1045+
// it'll be null which will be testing if we behave correctly when it's not present.
1046+
if (setKeepaliveManagerFor.contains(testNameRule.getMethodName())) {
1047+
mockKeepAliveManager = mock(KeepAliveManager.class);
1048+
}
1049+
1050+
initChannel(new GrpcHttp2ClientHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE));
1051+
streamTransportState = new TransportStateImpl(
1052+
handler(),
1053+
channel().eventLoop(),
1054+
DEFAULT_MAX_MESSAGE_SIZE,
1055+
transportTracer);
1056+
streamTransportState.setListener(streamListener);
1057+
1058+
grpcHeaders = new DefaultHttp2Headers()
1059+
.scheme(HTTPS)
1060+
.authority(as("www.fake.com"))
1061+
.path(as("/fakemethod"))
1062+
.method(HTTP_METHOD)
1063+
.add(as("auth"), as("sometoken"))
1064+
.add(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC)
1065+
.add(TE_HEADER, TE_TRAILERS);
1066+
1067+
// Simulate receipt of initial remote settings.
1068+
ByteBuf serializedSettings = serializeSettings(new Http2Settings());
1069+
channelRead(serializedSettings);
1070+
channel().releaseOutbound();
1071+
1072+
ChannelFuture channelFuture = createStream();
1073+
try {
1074+
channelFuture.get();
1075+
fail("Expected stream creation failure");
1076+
} catch (ExecutionException e) {
1077+
assertThat(e.getMessage()).isEqualTo("io.grpc.InternalStatusRuntimeException: UNAVAILABLE");
1078+
}
1079+
} finally {
1080+
NettyClientHandler.enablePerRpcAuthorityCheck = false;
1081+
}
1082+
}
1083+
9221084
@Override
9231085
protected void makeStream() throws Exception {
9241086
createStream();

netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,7 @@ public void authorityOverrideInCallOptions_noX509ExtendedTrustManager_newStreamC
888888
Status status = ((StatusException) ex.getCause()).getStatus();
889889
assertThat(status.getDescription()).isEqualTo("Can't allow authority override in rpc "
890890
+ "when SslEngine or X509ExtendedTrustManager is not available");
891-
assertThat(status.getCode()).isEqualTo(Code.FAILED_PRECONDITION);
891+
assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE);
892892
}
893893
} finally {
894894
NettyClientHandler.enablePerRpcAuthorityCheck = false;

0 commit comments

Comments
 (0)