Skip to content

Commit fb0e733

Browse files
committed
Changes to move the authority verification logic to the stream creation code.
1 parent 2af1cca commit fb0e733

File tree

6 files changed

+301
-68
lines changed

6 files changed

+301
-68
lines changed

examples/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ dependencies {
4545
protobuf {
4646
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
4747
plugins {
48-
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
48+
grpc { artifact = "io.grpc:protoc-gen-grpc-java:1.70.0" }
4949
}
5050
generateProtoTasks {
5151
all()*.plugins { grpc {} }

examples/src/main/java/io/grpc/examples/deadline/DeadlineServer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.grpc.examples.helloworld.GreeterGrpc;
2424
import io.grpc.examples.helloworld.HelloReply;
2525
import io.grpc.examples.helloworld.HelloRequest;
26+
import io.grpc.stub.ServerCallStreamObserver;
2627
import io.grpc.stub.StreamObserver;
2728
import java.io.IOException;
2829
import java.util.concurrent.TimeUnit;
@@ -100,6 +101,11 @@ void setClientStub(GreeterGrpc.GreeterBlockingStub clientStub) {
100101

101102
@Override
102103
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
104+
ServerCallStreamObserver<HelloReply> serverCallStreamObserver =
105+
(ServerCallStreamObserver<HelloReply>) responseObserver;
106+
serverCallStreamObserver.setOnCloseHandler(() -> {
107+
System.out.println("rpc close called.");
108+
});
103109
try {
104110
Thread.sleep(500);
105111
} catch (InterruptedException e) {

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ private void streamReady(Metadata metadata, String path) {
409409
transport.isUsingPlaintext());
410410
// TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found:
411411
// 'this.lock'
412-
transport.streamReadyToStart(OkHttpClientStream.this);
412+
transport.streamReadyToStart(OkHttpClientStream.this, authority);
413413
}
414414

415415
Tag tag() {

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java

Lines changed: 75 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -450,61 +450,7 @@ public ClientStream newStream(
450450
Preconditions.checkNotNull(headers, "headers");
451451
StatsTraceContext statsTraceContext =
452452
StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
453-
if (hostnameVerifier != null && socket instanceof SSLSocket
454-
&& !hostnameVerifier.verify(callOptions.getAuthority(),
455-
((SSLSocket) socket).getSession())) {
456-
if (enablePerRpcAuthorityCheck) {
457-
return new FailingClientStream(Status.UNAVAILABLE.withDescription(
458-
String.format("HostNameVerifier verification failed for authority '%s'",
459-
callOptions.getAuthority())), tracers);
460-
}
461-
}
462-
if (socket instanceof SSLSocket && callOptions.getAuthority() != null
463-
&& channelCredentials != null && channelCredentials instanceof TlsChannelCredentials) {
464-
Status peerVerificationStatus = null;
465-
if (peerVerificationResults.containsKey(callOptions.getAuthority())) {
466-
peerVerificationStatus = peerVerificationResults.get(callOptions.getAuthority());
467-
} else {
468-
TrustManager x509ExtendedTrustManager;
469-
try {
470-
x509ExtendedTrustManager = x509ExtendedTrustManagerClass != null
471-
? getX509ExtendedTrustManager((TlsChannelCredentials) channelCredentials) : null;
472-
if (x509ExtendedTrustManager == null) {
473-
if (GrpcUtil.getFlag(GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK, false)) {
474-
return new FailingClientStream(Status.UNAVAILABLE.withDescription(
475-
"Can't allow authority override in rpc when X509ExtendedTrustManager is not "
476-
+ "available"), tracers);
477-
}
478-
} else {
479-
try {
480-
Certificate[] peerCertificates = sslSession.getPeerCertificates();
481-
X509Certificate[] x509PeerCertificates = new X509Certificate[peerCertificates.length];
482-
for (int i = 0; i < peerCertificates.length; i++) {
483-
x509PeerCertificates[i] = (X509Certificate) peerCertificates[i];
484-
}
485-
checkServerTrustedMethod.invoke(x509ExtendedTrustManager, x509PeerCertificates,
486-
"RSA", new SslSocketWrapper((SSLSocket) socket, callOptions.getAuthority()));
487-
peerVerificationStatus = Status.OK;
488-
} catch (SSLPeerUnverifiedException | InvocationTargetException
489-
| IllegalAccessException e) {
490-
peerVerificationStatus = Status.UNAVAILABLE.withDescription(
491-
String.format("Failure in verifying authority '%s' against peer during rpc",
492-
callOptions.getAuthority())).withCause(e);
493-
}
494-
peerVerificationResults.put(callOptions.getAuthority(), peerVerificationStatus);
495-
}
496-
} catch (GeneralSecurityException e) {
497-
if (GrpcUtil.getFlag(GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK, false)) {
498-
return new FailingClientStream(Status.UNAVAILABLE.withDescription(
499-
"Failure getting X509ExtendedTrustManager from TlsCredentials").withCause(e),
500-
tracers);
501-
}
502-
}
503-
}
504-
if (peerVerificationStatus != null && !peerVerificationStatus.isOk()) {
505-
return new FailingClientStream(peerVerificationStatus, tracers);
506-
}
507-
}
453+
508454
// FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
509455
synchronized (lock) { // to make @GuardedBy linter happy
510456
return new OkHttpClientStream(
@@ -548,14 +494,87 @@ private TrustManager getX509ExtendedTrustManager(TlsChannelCredentials tlsCreds)
548494
}
549495

550496
@GuardedBy("lock")
551-
void streamReadyToStart(OkHttpClientStream clientStream) {
497+
void streamReadyToStart(OkHttpClientStream clientStream, String authority) {
552498
if (goAwayStatus != null) {
553499
clientStream.transportState().transportReportStatus(
554500
goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
555501
} else if (streams.size() >= maxConcurrentStreams) {
556502
pendingStreams.add(clientStream);
557503
setInUse(clientStream);
558504
} else {
505+
if (hostnameVerifier != null
506+
&& socket instanceof SSLSocket
507+
&& !hostnameVerifier.verify(authority,
508+
((SSLSocket) socket).getSession())) {
509+
if (enablePerRpcAuthorityCheck) {
510+
clientStream.transportState().transportReportStatus(
511+
Status.UNAVAILABLE.withDescription(
512+
String.format("HostNameVerifier verification failed for authority '%s'",
513+
authority)),
514+
RpcProgress.DROPPED, true, new Metadata());
515+
return;
516+
} else {
517+
log.log(Level.WARNING, String.format("HostNameVerifier verification failed for authority "
518+
+ "'%s'. This will be an error in the future.",
519+
authority));
520+
}
521+
}
522+
if (socket instanceof SSLSocket && authority != null
523+
&& channelCredentials != null && channelCredentials instanceof TlsChannelCredentials) {
524+
Status peerVerificationStatus = null;
525+
if (peerVerificationResults.containsKey(authority)) {
526+
peerVerificationStatus = peerVerificationResults.get(authority);
527+
} else {
528+
TrustManager x509ExtendedTrustManager = null;
529+
try {
530+
x509ExtendedTrustManager = x509ExtendedTrustManagerClass != null
531+
? getX509ExtendedTrustManager((TlsChannelCredentials) channelCredentials) : null;
532+
} catch (GeneralSecurityException e) {
533+
peerVerificationStatus = Status.UNAVAILABLE.withCause(e)
534+
.withDescription("Could not verify authority due to failure getting "
535+
+ "X509ExtendedTrustManager from TlsCredentials");
536+
}
537+
if (x509ExtendedTrustManager == null) {
538+
peerVerificationStatus = Status.UNAVAILABLE.withDescription(String.format("Could not verify authority '%s' for "
539+
+ "the rpc with no X509ExtendedTrustManager available",
540+
authority));
541+
}
542+
if (x509ExtendedTrustManager != null) {
543+
try {
544+
Certificate[] peerCertificates = sslSession.getPeerCertificates();
545+
X509Certificate[] x509PeerCertificates = new X509Certificate[peerCertificates.length];
546+
for (int i = 0; i < peerCertificates.length; i++) {
547+
x509PeerCertificates[i] = (X509Certificate) peerCertificates[i];
548+
}
549+
checkServerTrustedMethod.invoke(x509ExtendedTrustManager, x509PeerCertificates,
550+
"RSA", new SslSocketWrapper((SSLSocket) socket, authority));
551+
peerVerificationStatus = Status.OK;
552+
} catch (SSLPeerUnverifiedException | InvocationTargetException
553+
| IllegalAccessException e) {
554+
peerVerificationStatus = Status.UNAVAILABLE.withCause(e).withDescription(
555+
"Peer verification failed");
556+
}
557+
if (peerVerificationStatus != null) {
558+
peerVerificationResults.put(authority, peerVerificationStatus);
559+
}
560+
}
561+
}
562+
if (peerVerificationStatus != null && !peerVerificationStatus.isOk()) {
563+
if (enablePerRpcAuthorityCheck) {
564+
clientStream.transportState().transportReportStatus(
565+
peerVerificationStatus, RpcProgress.DROPPED, true, new Metadata());
566+
return;
567+
} else {
568+
if (peerVerificationStatus.getCause() != null) {
569+
log.log(Level.WARNING, peerVerificationStatus.getDescription()
570+
+ ". This will be an error in the future.", peerVerificationStatus.getCause());
571+
} else {
572+
log.log(Level.WARNING, peerVerificationStatus.getDescription()
573+
+ ". This will be an error in the future.");
574+
}
575+
}
576+
}
577+
}
559578
startStream(clientStream);
560579
}
561580
}

okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
2121
import static org.junit.Assert.assertEquals;
2222
import static org.junit.Assert.assertTrue;
23+
import static org.mockito.ArgumentMatchers.any;
2324
import static org.mockito.ArgumentMatchers.eq;
2425
import static org.mockito.ArgumentMatchers.isA;
2526
import static org.mockito.Mockito.times;
@@ -244,12 +245,13 @@ public void getUnaryRequest() throws IOException {
244245
// GET streams send headers after halfClose is called.
245246
verify(mockedFrameWriter, times(0)).synStream(
246247
eq(false), eq(false), eq(3), eq(0), headersCaptor.capture());
247-
verify(transport, times(0)).streamReadyToStart(isA(OkHttpClientStream.class));
248+
verify(transport, times(0)).streamReadyToStart(isA(OkHttpClientStream.class),
249+
isA(String.class));
248250

249251
byte[] msg = "request".getBytes(Charset.forName("UTF-8"));
250252
stream.writeMessage(new ByteArrayInputStream(msg));
251253
stream.halfClose();
252-
verify(transport).streamReadyToStart(eq(stream));
254+
verify(transport).streamReadyToStart(eq(stream), any(String.class));
253255
stream.transportState().start(3);
254256

255257
verify(mockedFrameWriter)

0 commit comments

Comments
 (0)