1919import static com .google .common .truth .Truth .assertThat ;
2020import static com .google .common .truth .Truth .assertWithMessage ;
2121
22+ import com .google .auth .Credentials ;
2223import com .google .bigtable .admin .v2 .BigtableInstanceAdminGrpc ;
2324import com .google .bigtable .admin .v2 .BigtableInstanceAdminGrpc .BigtableInstanceAdminFutureStub ;
2425import com .google .bigtable .admin .v2 .BigtableInstanceAdminGrpc .BigtableInstanceAdminImplBase ;
3435import com .google .bigtable .v2 .BigtableGrpc .BigtableImplBase ;
3536import com .google .bigtable .v2 .CheckAndMutateRowRequest ;
3637import com .google .bigtable .v2 .CheckAndMutateRowResponse ;
38+ import com .google .common .collect .Lists ;
3739import com .google .common .util .concurrent .ListenableFuture ;
3840import com .google .longrunning .GetOperationRequest ;
3941import com .google .longrunning .Operation ;
6264import io .grpc .testing .GrpcCleanupRule ;
6365import java .io .IOException ;
6466import java .net .ServerSocket ;
67+ import java .net .URI ;
68+ import java .util .List ;
69+ import java .util .Map ;
6570import java .util .UUID ;
6671import java .util .concurrent .BlockingDeque ;
6772import java .util .concurrent .BlockingQueue ;
@@ -93,6 +98,7 @@ public class ServeTest {
9398 private FakeTableAdminService tableAdminService ;
9499 private OperationService operationService ;
95100 private ManagedChannel fakeServiceChannel ;
101+ private FakeCredentials fakeCredentials ;
96102
97103 // Proxy
98104 private Serve serve ;
@@ -107,6 +113,8 @@ public void setUp() throws IOException {
107113 tableAdminService = new FakeTableAdminService ();
108114 operationService = new OperationService ();
109115
116+ fakeCredentials = new FakeCredentials ();
117+
110118 grpcCleanup .register (
111119 InProcessServerBuilder .forName (TARGET_SERVER_NAME )
112120 .intercept (metadataInterceptor )
@@ -122,7 +130,9 @@ public void setUp() throws IOException {
122130 InProcessChannelBuilder .forName (TARGET_SERVER_NAME ).usePlaintext ().build ());
123131
124132 // Create the proxy
125- serve = createAndStartCommand (fakeServiceChannel );
133+ // Inject fakes for upstream calls. For unit tests we want to shim communications to the
134+ // bigtable service.
135+ serve = createAndStartCommand (fakeServiceChannel , fakeCredentials );
126136
127137 proxyChannel =
128138 grpcCleanup .register (
@@ -318,11 +328,84 @@ public void onClose(Status status, Metadata trailers) {
318328 assertThat (clientRecvTrailer .get ()).hasValue ("trailer" , "trailer-value" );
319329 }
320330
321- private static Serve createAndStartCommand (ManagedChannel targetChannel ) throws IOException {
331+ @ Test
332+ public void testCredentials () throws InterruptedException , ExecutionException , TimeoutException {
333+ BigtableFutureStub proxyStub = BigtableGrpc .newFutureStub (proxyChannel );
334+
335+ CheckAndMutateRowRequest request =
336+ CheckAndMutateRowRequest .newBuilder ().setTableName ("some-table" ).build ();
337+ ListenableFuture <CheckAndMutateRowResponse > proxyFuture = proxyStub .checkAndMutateRow (request );
338+ StreamObserver <CheckAndMutateRowResponse > serverObserver =
339+ dataService
340+ .calls
341+ .computeIfAbsent (request , (ignored ) -> new LinkedBlockingDeque <>())
342+ .poll (1 , TimeUnit .SECONDS );
343+
344+ assertWithMessage ("Timed out waiting for the proxied RPC on the fake server" )
345+ .that (serverObserver )
346+ .isNotNull ();
347+
348+ serverObserver .onNext (CheckAndMutateRowResponse .newBuilder ().setPredicateMatched (true ).build ());
349+ serverObserver .onCompleted ();
350+ proxyFuture .get (1 , TimeUnit .SECONDS );
351+
352+ assertThat (metadataInterceptor .requestHeaders .poll (1 , TimeUnit .SECONDS ))
353+ .hasValue ("authorization" , "fake-token" );
354+ }
355+
356+ @ Test
357+ public void testCredentialsClobber ()
358+ throws InterruptedException , ExecutionException , TimeoutException {
359+ BigtableFutureStub proxyStub =
360+ BigtableGrpc .newFutureStub (proxyChannel )
361+ .withInterceptors (
362+ new ClientInterceptor () {
363+ @ Override
364+ public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
365+ MethodDescriptor <ReqT , RespT > methodDescriptor ,
366+ CallOptions callOptions ,
367+ Channel channel ) {
368+ return new SimpleForwardingClientCall <ReqT , RespT >(
369+ channel .newCall (methodDescriptor , callOptions )) {
370+ @ Override
371+ public void start (Listener <RespT > responseListener , Metadata headers ) {
372+ headers .put (
373+ Metadata .Key .of ("authorization" , Metadata .ASCII_STRING_MARSHALLER ),
374+ "pre-proxied-value" );
375+ super .start (responseListener , headers );
376+ }
377+ };
378+ }
379+ });
380+
381+ CheckAndMutateRowRequest request =
382+ CheckAndMutateRowRequest .newBuilder ().setTableName ("some-table" ).build ();
383+ ListenableFuture <CheckAndMutateRowResponse > proxyFuture = proxyStub .checkAndMutateRow (request );
384+ StreamObserver <CheckAndMutateRowResponse > serverObserver =
385+ dataService
386+ .calls
387+ .computeIfAbsent (request , (ignored ) -> new LinkedBlockingDeque <>())
388+ .poll (1 , TimeUnit .SECONDS );
389+
390+ assertWithMessage ("Timed out waiting for the proxied RPC on the fake server" )
391+ .that (serverObserver )
392+ .isNotNull ();
393+
394+ serverObserver .onNext (CheckAndMutateRowResponse .newBuilder ().setPredicateMatched (true ).build ());
395+ serverObserver .onCompleted ();
396+ proxyFuture .get (1 , TimeUnit .SECONDS );
397+
398+ Metadata serverRequestHeaders = metadataInterceptor .requestHeaders .poll (1 , TimeUnit .SECONDS );
399+ assertThat (serverRequestHeaders ).hasValue ("authorization" , "fake-token" );
400+ }
401+
402+ private static Serve createAndStartCommand (
403+ ManagedChannel targetChannel , FakeCredentials targetCredentials ) throws IOException {
322404 for (int i = 10 ; i >= 0 ; i --) {
323405 Serve s = new Serve ();
324406 s .dataChannel = targetChannel ;
325407 s .adminChannel = targetChannel ;
408+ s .credentials = targetCredentials ;
326409
327410 try (ServerSocket serverSocket = new ServerSocket (0 )) {
328411 s .listenPort = serverSocket .getLocalPort ();
@@ -419,4 +502,34 @@ public void getOperation(
419502 .add (responseObserver );
420503 }
421504 }
505+
506+ private static class FakeCredentials extends Credentials {
507+ private static final String HEADER_NAME = "authorization" ;
508+ private String fakeValue = "fake-token" ;
509+
510+ @ Override
511+ public String getAuthenticationType () {
512+ return "fake" ;
513+ }
514+
515+ @ Override
516+ public Map <String , List <String >> getRequestMetadata (URI uri ) throws IOException {
517+ return Map .of (HEADER_NAME , Lists .newArrayList (fakeValue ));
518+ }
519+
520+ @ Override
521+ public boolean hasRequestMetadata () {
522+ return true ;
523+ }
524+
525+ @ Override
526+ public boolean hasRequestMetadataOnly () {
527+ return true ;
528+ }
529+
530+ @ Override
531+ public void refresh () throws IOException {
532+ // noop
533+ }
534+ }
422535}
0 commit comments