2121import static com .google .common .truth .Truth .assertThat ;
2222import static com .google .common .truth .Truth .assertWithMessage ;
2323
24+ import com .google .auth .Credentials ;
2425import com .google .bigtable .admin .v2 .BigtableInstanceAdminGrpc ;
2526import com .google .bigtable .admin .v2 .BigtableInstanceAdminGrpc .BigtableInstanceAdminFutureStub ;
2627import com .google .bigtable .admin .v2 .BigtableInstanceAdminGrpc .BigtableInstanceAdminImplBase ;
3637import com .google .bigtable .v2 .BigtableGrpc .BigtableImplBase ;
3738import com .google .bigtable .v2 .CheckAndMutateRowRequest ;
3839import com .google .bigtable .v2 .CheckAndMutateRowResponse ;
40+ import com .google .common .collect .Lists ;
3941import com .google .common .collect .Range ;
4042import com .google .common .util .concurrent .ListenableFuture ;
4143import com .google .longrunning .GetOperationRequest ;
6769import io .grpc .testing .GrpcCleanupRule ;
6870import java .io .IOException ;
6971import java .net .ServerSocket ;
72+ import java .net .URI ;
7073import java .time .Duration ;
74+ import java .util .List ;
75+ import java .util .Map ;
7176import java .util .UUID ;
7277import java .util .concurrent .BlockingDeque ;
7378import java .util .concurrent .BlockingQueue ;
@@ -100,6 +105,7 @@ public class ServeTest {
100105 private FakeTableAdminService tableAdminService ;
101106 private OperationService operationService ;
102107 private ManagedChannel fakeServiceChannel ;
108+ private FakeCredentials fakeCredentials ;
103109
104110 // Proxy
105111 private Serve serve ;
@@ -115,6 +121,8 @@ public void setUp() throws IOException {
115121 tableAdminService = new FakeTableAdminService ();
116122 operationService = new OperationService ();
117123
124+ fakeCredentials = new FakeCredentials ();
125+
118126 grpcCleanup .register (
119127 InProcessServerBuilder .forName (targetServerName )
120128 .intercept (callContextInterceptor )
@@ -131,7 +139,9 @@ public void setUp() throws IOException {
131139 InProcessChannelBuilder .forName (targetServerName ).usePlaintext ().build ());
132140
133141 // Create the proxy
134- serve = createAndStartCommand (fakeServiceChannel );
142+ // Inject fakes for upstream calls. For unit tests we want to shim communications to the
143+ // bigtable service.
144+ serve = createAndStartCommand (fakeServiceChannel , fakeCredentials );
135145
136146 proxyChannel =
137147 grpcCleanup .register (
@@ -363,11 +373,86 @@ public void testDeadlinePropagation()
363373 .isIn (Range .closed (Duration .ofMinutes (9 ), Duration .ofMinutes (10 )));
364374 }
365375
366- private static Serve createAndStartCommand (ManagedChannel targetChannel ) throws IOException {
376+ @ Test
377+ public void testCredentials () throws InterruptedException , ExecutionException , TimeoutException {
378+ BigtableFutureStub proxyStub = BigtableGrpc .newFutureStub (proxyChannel );
379+
380+ CheckAndMutateRowRequest request =
381+ CheckAndMutateRowRequest .newBuilder ().setTableName ("some-table" ).build ();
382+ final ListenableFuture <CheckAndMutateRowResponse > proxyFuture =
383+ proxyStub .checkAndMutateRow (request );
384+ StreamObserver <CheckAndMutateRowResponse > serverObserver =
385+ dataService
386+ .calls
387+ .computeIfAbsent (request , (ignored ) -> new LinkedBlockingDeque <>())
388+ .poll (1 , TimeUnit .SECONDS );
389+
390+ assertWithMessage ("Timed out waiting for the proxied RPC on the fake server" )
391+ .that (serverObserver )
392+ .isNotNull ();
393+
394+ serverObserver .onNext (CheckAndMutateRowResponse .newBuilder ().setPredicateMatched (true ).build ());
395+ serverObserver .onCompleted ();
396+ proxyFuture .get (1 , TimeUnit .SECONDS );
397+
398+ assertThat (metadataInterceptor .requestHeaders .poll (1 , TimeUnit .SECONDS ))
399+ .hasValue ("authorization" , "fake-token" );
400+ }
401+
402+ @ Test
403+ public void testCredentialsClobber ()
404+ throws InterruptedException , ExecutionException , TimeoutException {
405+ BigtableFutureStub proxyStub =
406+ BigtableGrpc .newFutureStub (proxyChannel )
407+ .withInterceptors (
408+ new ClientInterceptor () {
409+ @ Override
410+ public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
411+ MethodDescriptor <ReqT , RespT > methodDescriptor ,
412+ CallOptions callOptions ,
413+ Channel channel ) {
414+ return new SimpleForwardingClientCall <ReqT , RespT >(
415+ channel .newCall (methodDescriptor , callOptions )) {
416+ @ Override
417+ public void start (Listener <RespT > responseListener , Metadata headers ) {
418+ headers .put (
419+ Metadata .Key .of ("authorization" , Metadata .ASCII_STRING_MARSHALLER ),
420+ "pre-proxied-value" );
421+ super .start (responseListener , headers );
422+ }
423+ };
424+ }
425+ });
426+
427+ CheckAndMutateRowRequest request =
428+ CheckAndMutateRowRequest .newBuilder ().setTableName ("some-table" ).build ();
429+ final ListenableFuture <CheckAndMutateRowResponse > proxyFuture =
430+ proxyStub .checkAndMutateRow (request );
431+ StreamObserver <CheckAndMutateRowResponse > serverObserver =
432+ dataService
433+ .calls
434+ .computeIfAbsent (request , (ignored ) -> new LinkedBlockingDeque <>())
435+ .poll (1 , TimeUnit .SECONDS );
436+
437+ assertWithMessage ("Timed out waiting for the proxied RPC on the fake server" )
438+ .that (serverObserver )
439+ .isNotNull ();
440+
441+ serverObserver .onNext (CheckAndMutateRowResponse .newBuilder ().setPredicateMatched (true ).build ());
442+ serverObserver .onCompleted ();
443+ proxyFuture .get (1 , TimeUnit .SECONDS );
444+
445+ Metadata serverRequestHeaders = metadataInterceptor .requestHeaders .poll (1 , TimeUnit .SECONDS );
446+ assertThat (serverRequestHeaders ).hasValue ("authorization" , "fake-token" );
447+ }
448+
449+ private static Serve createAndStartCommand (
450+ ManagedChannel targetChannel , FakeCredentials targetCredentials ) throws IOException {
367451 for (int i = 10 ; i >= 0 ; i --) {
368452 Serve s = new Serve ();
369453 s .dataChannel = targetChannel ;
370454 s .adminChannel = targetChannel ;
455+ s .credentials = targetCredentials ;
371456
372457 try (ServerSocket serverSocket = new ServerSocket (0 )) {
373458 s .listenPort = serverSocket .getLocalPort ();
@@ -477,4 +562,34 @@ public void getOperation(
477562 .add (responseObserver );
478563 }
479564 }
565+
566+ private static class FakeCredentials extends Credentials {
567+ private static final String HEADER_NAME = "authorization" ;
568+ private String fakeValue = "fake-token" ;
569+
570+ @ Override
571+ public String getAuthenticationType () {
572+ return "fake" ;
573+ }
574+
575+ @ Override
576+ public Map <String , List <String >> getRequestMetadata (URI uri ) throws IOException {
577+ return Map .of (HEADER_NAME , Lists .newArrayList (fakeValue ));
578+ }
579+
580+ @ Override
581+ public boolean hasRequestMetadata () {
582+ return true ;
583+ }
584+
585+ @ Override
586+ public boolean hasRequestMetadataOnly () {
587+ return true ;
588+ }
589+
590+ @ Override
591+ public void refresh () throws IOException {
592+ // noop
593+ }
594+ }
480595}
0 commit comments