1616
1717package com .google .cloud .pubsublite ;
1818
19+ import com .google .api .gax .core .GaxProperties ;
20+ import com .google .api .gax .grpc .GaxGrpcProperties ;
21+ import com .google .api .gax .rpc .ApiClientHeaderProvider ;
1922import com .google .auth .oauth2 .GoogleCredentials ;
2023import com .google .cloud .pubsublite .internal .ChannelCache ;
2124import com .google .common .collect .ImmutableList ;
25+ import io .grpc .CallOptions ;
2226import io .grpc .Channel ;
27+ import io .grpc .ClientCall ;
28+ import io .grpc .ClientInterceptor ;
29+ import io .grpc .ClientInterceptors ;
30+ import io .grpc .ForwardingClientCall .SimpleForwardingClientCall ;
31+ import io .grpc .Metadata ;
32+ import io .grpc .Metadata .Key ;
33+ import io .grpc .MethodDescriptor ;
2334import io .grpc .auth .MoreCallCredentials ;
2435import io .grpc .stub .AbstractStub ;
2536import java .io .IOException ;
37+ import java .util .ArrayList ;
38+ import java .util .List ;
39+ import java .util .Map ;
40+ import java .util .Map .Entry ;
2641import java .util .function .Function ;
2742
2843public class Stubs {
@@ -31,13 +46,44 @@ public class Stubs {
3146 public static <StubT extends AbstractStub <StubT >> StubT defaultStub (
3247 String target , Function <Channel , StubT > stubFactory ) throws IOException {
3348 return stubFactory
34- .apply (channels .get (target ))
49+ .apply (ClientInterceptors . intercept ( channels .get (target ), getClientInterceptors () ))
3550 .withCallCredentials (
3651 MoreCallCredentials .from (
3752 GoogleCredentials .getApplicationDefault ()
3853 .createScoped (
3954 ImmutableList .of ("https://www.googleapis.com/auth/cloud-platform" ))));
4055 }
4156
57+ private static List <ClientInterceptor > getClientInterceptors () {
58+ List <ClientInterceptor > clientInterceptors = new ArrayList <>();
59+ Map <String , String > apiClientHeaders =
60+ ApiClientHeaderProvider .newBuilder ()
61+ .setClientLibToken ("gccl" , GaxProperties .getLibraryVersion (Stubs .class ))
62+ .setTransportToken (
63+ GaxGrpcProperties .getGrpcTokenName (), GaxGrpcProperties .getGrpcVersion ())
64+ .build ()
65+ .getHeaders ();
66+ clientInterceptors .add (
67+ new ClientInterceptor () {
68+ @ Override
69+ public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
70+ MethodDescriptor <ReqT , RespT > method , CallOptions callOptions , Channel next ) {
71+ ClientCall <ReqT , RespT > call = next .newCall (method , callOptions );
72+ return new SimpleForwardingClientCall <ReqT , RespT >(call ) {
73+ @ Override
74+ public void start (ClientCall .Listener <RespT > responseListener , Metadata headers ) {
75+ for (Entry <String , String > apiClientHeader : apiClientHeaders .entrySet ()) {
76+ headers .put (
77+ Key .of (apiClientHeader .getKey (), Metadata .ASCII_STRING_MARSHALLER ),
78+ apiClientHeader .getValue ());
79+ }
80+ super .start (responseListener , headers );
81+ }
82+ };
83+ }
84+ });
85+ return clientInterceptors ;
86+ }
87+
4288 private Stubs () {}
4389}
0 commit comments