Skip to content

Commit 671444b

Browse files
committed
Review comments
1 parent 671dee0 commit 671444b

File tree

3 files changed

+91
-81
lines changed

3 files changed

+91
-81
lines changed

netty/src/main/java/io/grpc/netty/GrpcHttp2OutboundHeaders.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,6 @@ static GrpcHttp2OutboundHeaders clientRequestHeaders(byte[][] serializedMetadata
4646
return new GrpcHttp2OutboundHeaders(preHeaders, serializedMetadata);
4747
}
4848

49-
@Override
50-
public String authority() {
51-
for (int i = 0; i < preHeaders.length / 2; i++) {
52-
if (preHeaders[i].equals(Http2Headers.PseudoHeaderName.AUTHORITY.value())) {
53-
return preHeaders[i + 1].toString();
54-
}
55-
}
56-
return "";
57-
}
58-
5949
static GrpcHttp2OutboundHeaders serverResponseHeaders(byte[][] serializedMetadata) {
6050
AsciiString[] preHeaders = new AsciiString[] {
6151
Http2Headers.PseudoHeaderName.STATUS.value(), Utils.STATUS_OK,
@@ -76,6 +66,18 @@ private GrpcHttp2OutboundHeaders(AsciiString[] preHeaders, byte[][] serializedMe
7666
this.preHeaders = preHeaders;
7767
}
7868

69+
@Override
70+
public CharSequence authority() {
71+
CharSequence authority = null;
72+
for (int i = 0; i < preHeaders.length / 2; i++) {
73+
if (preHeaders[i].equals(Http2Headers.PseudoHeaderName.AUTHORITY.value())) {
74+
authority = preHeaders[i + 1];
75+
}
76+
}
77+
assert authority != null;
78+
return authority;
79+
}
80+
7981
@Override
8082
@SuppressWarnings("ReferenceEquality") // STATUS.value() never changes.
8183
public CharSequence status() {

netty/src/main/java/io/grpc/netty/NettyClientHandler.java

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -603,38 +603,46 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise)
603603
return;
604604
}
605605

606-
CharSequence authority = command.headers().authority();
607-
if (authority != null) {
608-
Status authorityVerificationStatus = peerVerificationResults.get(authority.toString());
609-
if (authorityVerificationStatus == null) {
610-
if (attributes != null
611-
&& attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) != null) {
612-
authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
613-
.verifyAuthority(authority.toString());
614-
peerVerificationResults.put(authority.toString(), authorityVerificationStatus);
615-
if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) {
616-
logger.log(Level.WARNING, String.format("%s.%s",
617-
authorityVerificationStatus.getDescription(), enablePerRpcAuthorityCheck
618-
? "" : " This will be an error in the future."),
619-
InternalStatus.asRuntimeExceptionWithoutStacktrace(authorityVerificationStatus, null));
606+
CharSequence authorityHeader = command.headers().authority();
607+
if (authorityHeader != null) {
608+
// No need to verify authority for the rpc outgoing header if it is same as the authority
609+
// for the transport
610+
if (!authority.contentEquals(authorityHeader)) {
611+
Status authorityVerificationStatus = peerVerificationResults.get(
612+
authorityHeader.toString());
613+
if (authorityVerificationStatus == null) {
614+
if (attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) != null) {
615+
authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
616+
.verifyAuthority(authorityHeader.toString());
617+
peerVerificationResults.put(authorityHeader.toString(), authorityVerificationStatus);
618+
if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) {
619+
logger.log(Level.WARNING, String.format("%s.%s",
620+
authorityVerificationStatus.getDescription(),
621+
enablePerRpcAuthorityCheck
622+
? "" : " This will be an error in the future."),
623+
InternalStatus.asRuntimeExceptionWithoutStacktrace(
624+
authorityVerificationStatus, null));
625+
}
626+
} else {
627+
authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
628+
"Authority verifier not found to verify authority");
629+
command.stream().setNonExistent();
630+
command.stream().transportReportStatus(
631+
authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
632+
promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
633+
authorityVerificationStatus, null));
634+
return;
620635
}
621-
} else {
622-
authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
623-
"Authority verifier not found to verify authority");
624-
command.stream().setNonExistent();
625-
command.stream().transportReportStatus(
626-
authorityVerificationStatus, RpcProgress.DROPPED, true, new Metadata());
627-
promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(authorityVerificationStatus, null));
628-
return;
629636
}
630-
}
631-
if (authorityVerificationStatus != null && !authorityVerificationStatus.isOk()) {
632-
if (enablePerRpcAuthorityCheck) {
633-
command.stream().setNonExistent();
634-
command.stream().transportReportStatus(
635-
authorityVerificationStatus, RpcProgress.DROPPED, true, new Metadata());
636-
promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(authorityVerificationStatus, null));
637-
return;
637+
if (authorityVerificationStatus != null && !authorityVerificationStatus.isOk()) {
638+
if (enablePerRpcAuthorityCheck) {
639+
command.stream().setNonExistent();
640+
command.stream().transportReportStatus(
641+
authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
642+
promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
643+
authorityVerificationStatus, null));
644+
return;
645+
}
638646
}
639647
}
640648
} else {
@@ -643,7 +651,8 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise)
643651
command.stream().setNonExistent();
644652
command.stream().transportReportStatus(
645653
Status.UNAVAILABLE, RpcProgress.DROPPED, true, new Metadata());
646-
promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(authorityVerificationStatus, null));
654+
promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
655+
authorityVerificationStatus, null));
647656
return;
648657
}
649658
// Get the stream ID for the new stream.

netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@
112112
import org.mockito.ArgumentCaptor;
113113
import org.mockito.ArgumentMatchers;
114114
import org.mockito.Mock;
115-
import org.mockito.Mockito;
116115
import org.mockito.invocation.InvocationOnMock;
117116
import org.mockito.junit.MockitoJUnit;
118117
import org.mockito.junit.MockitoRule;
@@ -937,7 +936,8 @@ public void missingAuthorityHeader_streamCreationShouldFail() throws Exception {
937936
.add(as("auth"), as("sometoken"))
938937
.add(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC)
939938
.add(TE_HEADER, TE_TRAILERS);
940-
ChannelFuture channelFuture = enqueue(newCreateStreamCommand(grpcHeadersWithoutAuthority, streamTransportState));
939+
ChannelFuture channelFuture = enqueue(newCreateStreamCommand(
940+
grpcHeadersWithoutAuthority, streamTransportState));
941941
try {
942942
channelFuture.get();
943943
fail("Expected stream creation failure");
@@ -948,24 +948,23 @@ public void missingAuthorityHeader_streamCreationShouldFail() throws Exception {
948948

949949
@Test
950950
public void missingAuthorityVerifierInAttributes_streamCreationShouldFail() throws Exception {
951-
doAnswer(
952-
new Answer<Void>() {
953-
@Override
954-
public Void answer(InvocationOnMock invocation) throws Throwable {
955-
StreamListener.MessageProducer producer =
956-
(StreamListener.MessageProducer) invocation.getArguments()[0];
957-
InputStream message;
958-
while ((message = producer.next()) != null) {
959-
streamListenerMessageQueue.add(message);
960-
}
961-
return null;
962-
}
963-
})
964-
.when(streamListener)
965-
.messagesAvailable(ArgumentMatchers.<StreamListener.MessageProducer>any());
951+
doAnswer(new Answer<Void>() {
952+
@Override
953+
public Void answer(InvocationOnMock invocation) throws Throwable {
954+
StreamListener.MessageProducer producer =
955+
(StreamListener.MessageProducer) invocation.getArguments()[0];
956+
InputStream message;
957+
while ((message = producer.next()) != null) {
958+
streamListenerMessageQueue.add(message);
959+
}
960+
return null;
961+
}
962+
})
963+
.when(streamListener)
964+
.messagesAvailable(ArgumentMatchers.<StreamListener.MessageProducer>any());
966965
doAnswer((attributes) -> Attributes.EMPTY)
967-
.when(listener)
968-
.filterTransport(ArgumentMatchers.any(Attributes.class));
966+
.when(listener)
967+
.filterTransport(ArgumentMatchers.any(Attributes.class));
969968
lifecycleManager = new ClientTransportLifecycleManager(listener);
970969
// This mocks the keepalive manager only for there's in which we verify it. For other tests
971970
// it'll be null which will be testing if we behave correctly when it's not present.
@@ -1000,7 +999,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
1000999
channelFuture.get();
10011000
fail("Expected stream creation failure");
10021001
} catch (ExecutionException e) {
1003-
assertThat(e.getCause().getMessage()).isEqualTo("UNAVAILABLE: Authority verifier not found to verify authority");
1002+
assertThat(e.getCause().getMessage()).isEqualTo(
1003+
"UNAVAILABLE: Authority verifier not found to verify authority");
10041004
}
10051005
}
10061006

@@ -1019,27 +1019,26 @@ public void authorityVerificationSuccess_streamCreationSucceeds() throws Excepti
10191019
public void authorityVerificationFailure_streamCreationFails() throws Exception {
10201020
NettyClientHandler.enablePerRpcAuthorityCheck = true;
10211021
try {
1022-
doAnswer(
1023-
new Answer<Void>() {
1024-
@Override
1025-
public Void answer(InvocationOnMock invocation) throws Throwable {
1026-
StreamListener.MessageProducer producer =
1027-
(StreamListener.MessageProducer) invocation.getArguments()[0];
1028-
InputStream message;
1029-
while ((message = producer.next()) != null) {
1030-
streamListenerMessageQueue.add(message);
1031-
}
1032-
return null;
1033-
}
1034-
})
1035-
.when(streamListener)
1036-
.messagesAvailable(ArgumentMatchers.<StreamListener.MessageProducer>any());
1022+
doAnswer(new Answer<Void>() {
1023+
@Override
1024+
public Void answer(InvocationOnMock invocation) throws Throwable {
1025+
StreamListener.MessageProducer producer =
1026+
(StreamListener.MessageProducer) invocation.getArguments()[0];
1027+
InputStream message;
1028+
while ((message = producer.next()) != null) {
1029+
streamListenerMessageQueue.add(message);
1030+
}
1031+
return null;
1032+
}
1033+
})
1034+
.when(streamListener)
1035+
.messagesAvailable(ArgumentMatchers.<StreamListener.MessageProducer>any());
10371036
doAnswer((attributes) -> Attributes.newBuilder().set(
1038-
GrpcAttributes.ATTR_AUTHORITY_VERIFIER,
1039-
(authority) -> Status.UNAVAILABLE.withCause(
1040-
new CertificateException("Peer verification failed"))).build())
1041-
.when(listener)
1042-
.filterTransport(ArgumentMatchers.any(Attributes.class));
1037+
GrpcAttributes.ATTR_AUTHORITY_VERIFIER,
1038+
(authority) -> Status.UNAVAILABLE.withCause(
1039+
new CertificateException("Peer verification failed"))).build())
1040+
.when(listener)
1041+
.filterTransport(ArgumentMatchers.any(Attributes.class));
10431042
lifecycleManager = new ClientTransportLifecycleManager(listener);
10441043
// This mocks the keepalive manager only for there's in which we verify it. For other tests
10451044
// it'll be null which will be testing if we behave correctly when it's not present.

0 commit comments

Comments
 (0)