2828import com .google .common .cache .Cache ;
2929import com .google .common .cache .CacheBuilder ;
3030import com .google .spanner .admin .database .v1 .DatabaseName ;
31- import io .grpc .CallOptions ;
32- import io .grpc .Channel ;
33- import io .grpc .ClientCall ;
34- import io .grpc .ClientInterceptor ;
31+ import io .grpc .*;
3532import io .grpc .ForwardingClientCall .SimpleForwardingClientCall ;
3633import io .grpc .ForwardingClientCallListener .SimpleForwardingClientCallListener ;
37- import io .grpc .Grpc ;
38- import io .grpc .Metadata ;
39- import io .grpc .MethodDescriptor ;
4034import io .opencensus .stats .MeasureMap ;
4135import io .opencensus .stats .Stats ;
4236import io .opencensus .stats .StatsRecorder ;
5347import java .util .HashMap ;
5448import java .util .Map ;
5549import java .util .concurrent .ExecutionException ;
50+ import java .util .concurrent .atomic .AtomicBoolean ;
5651import java .util .logging .Level ;
5752import java .util .logging .Logger ;
5853import java .util .regex .Matcher ;
@@ -103,9 +98,13 @@ class HeaderInterceptor implements ClientInterceptor {
10398 public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
10499 MethodDescriptor <ReqT , RespT > method , CallOptions callOptions , Channel next ) {
105100 ApiTracer tracer = callOptions .getOption (TRACER_KEY );
101+ SpannerGrpcStreamTracer streamTracer = new SpannerGrpcStreamTracer ();
102+ CallOptions newOptions =
103+ callOptions .withStreamTracerFactory (new SpannerGrpcStreamTracer .Factory (streamTracer ));
106104 CompositeTracer compositeTracer =
107105 tracer instanceof CompositeTracer ? (CompositeTracer ) tracer : null ;
108- return new SimpleForwardingClientCall <ReqT , RespT >(next .newCall (method , callOptions )) {
106+ final AtomicBoolean headersReceived = new AtomicBoolean (false );
107+ return new SimpleForwardingClientCall <ReqT , RespT >(next .newCall (method , newOptions )) {
109108 @ Override
110109 public void start (Listener <RespT > responseListener , Metadata headers ) {
111110 try {
@@ -127,6 +126,7 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
127126 new SimpleForwardingClientCallListener <RespT >(responseListener ) {
128127 @ Override
129128 public void onHeaders (Metadata metadata ) {
129+ headersReceived .set (true );
130130 Boolean isDirectPathUsed =
131131 isDirectPathUsed (getAttributes ().get (Grpc .TRANSPORT_ATTR_REMOTE_ADDR ));
132132 addDirectPathUsedAttribute (compositeTracer , isDirectPathUsed );
@@ -135,6 +135,21 @@ public void onHeaders(Metadata metadata) {
135135 metadata , tagContext , attributes , span , compositeTracer , isDirectPathUsed );
136136 super .onHeaders (metadata );
137137 }
138+
139+ @ Override
140+ public void onClose (Status status , Metadata trailers ) {
141+ if (streamTracer .isOutBoundMessageSent () && !headersReceived .get ()) {
142+ // RPC was sent, but no response headers were received. This can happen in
143+ // case of a timeout, for example.
144+ if (compositeTracer != null ) {
145+ compositeTracer .recordGfeHeaderMissingCount (1L );
146+ if (GapicSpannerRpc .isEnableAFEServerTiming ()) {
147+ // compositeTracer.recordAfeHeaderMissingCount(1L);
148+ }
149+ }
150+ }
151+ super .onClose (status , trailers );
152+ }
138153 },
139154 headers );
140155 } catch (ExecutionException executionException ) {
0 commit comments