1919import com .google .bigtable .v2 .BigtableGrpc ;
2020import com .google .bigtable .v2 .PingAndWarmRequest ;
2121import com .google .bigtable .v2 .PingAndWarmResponse ;
22- import com .google .cloud .bigtable .examples .proxy .metrics .CallLabels ;
22+ import com .google .cloud .bigtable .examples .proxy .core .CallLabels ;
23+ import com .google .cloud .bigtable .examples .proxy .core .CallLabels .PrimingKey ;
2324import com .google .cloud .bigtable .examples .proxy .metrics .Metrics ;
2425import com .google .cloud .bigtable .examples .proxy .metrics .Tracer ;
2526import com .google .common .util .concurrent .ListenableFuture ;
3435import io .grpc .ManagedChannel ;
3536import io .grpc .ManagedChannelBuilder ;
3637import io .grpc .Metadata ;
38+ import io .grpc .Metadata .Key ;
3739import io .grpc .MethodDescriptor ;
3840import io .grpc .Status ;
39- import io .grpc .StatusRuntimeException ;
40- import java .net .URLEncoder ;
41- import java .nio .charset .StandardCharsets ;
41+ import java .time .Duration ;
4242import java .util .List ;
4343import java .util .Optional ;
44+ import java .util .Random ;
4445import java .util .concurrent .ExecutionException ;
4546import java .util .concurrent .ScheduledExecutorService ;
4647import java .util .concurrent .ScheduledFuture ;
5051import org .slf4j .Logger ;
5152import org .slf4j .LoggerFactory ;
5253
54+ /**
55+ * Decorator for a Bigtable data plane connection to add channel warming via PingAndWarm. Channel
56+ * warming will happen on creation and then every 3 minutes (with jitter).
57+ */
5358public class DataChannel extends ManagedChannel {
5459 private static final Logger LOGGER = LoggerFactory .getLogger (DataChannel .class );
5560
61+ private static final Metadata .Key <String > GFE_DEBUG_REQ_HEADER =
62+ Key .of ("X-Return-Encrypted-Headers" , Metadata .ASCII_STRING_MARSHALLER );
63+ private static final Metadata .Key <String > GFE_DEBUG_RESP_HEADER =
64+ Key .of ("X-Encrypted-Debug-Headers" , Metadata .ASCII_STRING_MARSHALLER );
65+
66+ private static final Duration WARM_PERIOD = Duration .ofMinutes (3 );
67+ private static final Duration MAX_JITTER = Duration .ofSeconds (10 );
68+
69+ private final Random random = new Random ();
5670 private final ManagedChannel inner ;
5771 private final Metrics metrics ;
5872 private final ResourceCollector resourceCollector ;
5973 private final CallCredentials callCredentials ;
60- private final ScheduledFuture <?> antiIdleTask ;
74+ private final ScheduledExecutorService warmingExecutor ;
75+ private volatile ScheduledFuture <?> antiIdleTask ;
6176
6277 private final AtomicBoolean closed = new AtomicBoolean ();
78+ private final Object scheduleLock = new Object ();
6379
6480 public DataChannel (
6581 ResourceCollector resourceCollector ,
@@ -80,8 +96,12 @@ public DataChannel(
8096 .keepAliveTime (30 , TimeUnit .SECONDS )
8197 .keepAliveTimeout (10 , TimeUnit .SECONDS )
8298 .build ();
99+
100+ this .warmingExecutor = warmingExecutor ;
83101 this .metrics = metrics ;
84102
103+ new StateTransitionWatcher ().run ();
104+
85105 try {
86106 warm ();
87107 } catch (RuntimeException e ) {
@@ -93,40 +113,57 @@ public DataChannel(
93113 throw e ;
94114 }
95115
96- antiIdleTask = warmingExecutor .scheduleAtFixedRate (this ::warmQuietly , 3 , 3 , TimeUnit .MINUTES );
116+ antiIdleTask =
117+ warmingExecutor .schedule (this ::warmTask , nextWarmup ().toMillis (), TimeUnit .MILLISECONDS );
97118 metrics .updateChannelCount (1 );
98119 }
99120
100- private void warmQuietly () {
121+ private Duration nextWarmup () {
122+ return WARM_PERIOD .minus (
123+ Duration .ofMillis ((long ) (MAX_JITTER .toMillis () * random .nextDouble ())));
124+ }
125+
126+ private void warmTask () {
101127 try {
102128 warm ();
103129 } catch (RuntimeException e ) {
104130 LOGGER .warn ("anti idle ping failed, forcing reconnect" , e );
105131 inner .enterIdle ();
132+ } finally {
133+ synchronized (scheduleLock ) {
134+ if (!closed .get ()) {
135+ antiIdleTask =
136+ warmingExecutor .schedule (
137+ this ::warmTask , nextWarmup ().toMillis (), TimeUnit .MILLISECONDS );
138+ }
139+ }
106140 }
107141 }
108142
109143 private void warm () {
110- List <PingAndWarmRequest > requests = resourceCollector .getRequests ();
111- if (requests .isEmpty ()) {
144+ List <PrimingKey > primingKeys = resourceCollector .getPrimingKeys ();
145+ if (primingKeys .isEmpty ()) {
112146 return ;
113147 }
114148
149+ LOGGER .debug ("Warming channel {} with: {}" , inner , primingKeys );
150+
115151 List <ListenableFuture <PingAndWarmResponse >> futures =
116- requests .stream ().map (this ::sendPingAndWarm ).collect (Collectors .toList ());
152+ primingKeys .stream ().map (this ::sendPingAndWarm ).collect (Collectors .toList ());
117153
118154 int successCount = 0 ;
119155 int failures = 0 ;
120156 for (ListenableFuture <PingAndWarmResponse > future : futures ) {
121- PingAndWarmRequest request = requests .get (successCount + failures );
157+ PrimingKey request = primingKeys .get (successCount + failures );
122158 try {
123159 future .get ();
124160 successCount ++;
125161 } catch (ExecutionException e ) {
126- // All permenant errors are ignored and treated as a success
162+ // All permanent errors are ignored and treated as a success
127163 // The priming request for that generated the error will be dropped
128- if (e .getCause () instanceof StatusRuntimeException ) {
129- StatusRuntimeException se = (StatusRuntimeException ) e .getCause ();
164+ if (e .getCause () instanceof PingAndWarmException ) {
165+ PingAndWarmException se = (PingAndWarmException ) e .getCause ();
166+
130167 switch (se .getStatus ().getCode ()) {
131168 case INTERNAL :
132169 case PERMISSION_DENIED :
@@ -139,8 +176,15 @@ private void warm() {
139176 default :
140177 // noop
141178 }
179+ LOGGER .warn (
180+ "Failed to prime channel with request: {}, status: {}, debug response headers: {}" ,
181+ request ,
182+ se .getStatus (),
183+ Optional .ofNullable (se .getDebugHeaders ()).orElse ("<missing>" ));
184+ } else {
185+ LOGGER .warn ("Unexpected failure priming channel with request: {}" , request , e .getCause ());
142186 }
143- LOGGER . warn ( "Failed to prime channel with request: {}" , request , e . getCause ());
187+
144188 failures ++;
145189 } catch (InterruptedException e ) {
146190 throw new RuntimeException ("Interrupted while priming channel with request: " + request , e );
@@ -151,13 +195,13 @@ private void warm() {
151195 }
152196 }
153197
154- private ListenableFuture <PingAndWarmResponse > sendPingAndWarm (PingAndWarmRequest request ) {
155- CallLabels callLabels =
156- CallLabels . create (
157- BigtableGrpc . getPingAndWarmMethod (),
158- Optional . of ( "bigtableproxy" ),
159- Optional . of ( request . getName ()),
160- Optional . of ( request . getAppProfileId ()) );
198+ private ListenableFuture <PingAndWarmResponse > sendPingAndWarm (PrimingKey primingKey ) {
199+ Metadata metadata = primingKey . composeMetadata ();
200+ metadata . put ( GFE_DEBUG_REQ_HEADER , "gfe_response_only" );
201+ PingAndWarmRequest request = primingKey . composeProto ();
202+ request = request . toBuilder (). setName ( request . getName ()). build ();
203+
204+ CallLabels callLabels = CallLabels . create ( BigtableGrpc . getPingAndWarmMethod (), metadata );
161205 Tracer tracer = new Tracer (metrics , callLabels );
162206
163207 CallOptions callOptions =
@@ -169,17 +213,11 @@ private ListenableFuture<PingAndWarmResponse> sendPingAndWarm(PingAndWarmRequest
169213 ClientCall <PingAndWarmRequest , PingAndWarmResponse > call =
170214 inner .newCall (BigtableGrpc .getPingAndWarmMethod (), callOptions );
171215
172- Metadata metadata = new Metadata ();
173- metadata .put (
174- CallLabels .REQUEST_PARAMS ,
175- String .format (
176- "name=%s&app_profile_id=%s" ,
177- URLEncoder .encode (request .getName (), StandardCharsets .UTF_8 ),
178- URLEncoder .encode (request .getAppProfileId (), StandardCharsets .UTF_8 )));
179-
180216 SettableFuture <PingAndWarmResponse > f = SettableFuture .create ();
181217 call .start (
182218 new Listener <>() {
219+ String debugHeaders = null ;
220+
183221 @ Override
184222 public void onMessage (PingAndWarmResponse response ) {
185223 if (!f .set (response )) {
@@ -188,14 +226,22 @@ public void onMessage(PingAndWarmResponse response) {
188226 }
189227 }
190228
229+ @ Override
230+ public void onHeaders (Metadata headers ) {
231+ debugHeaders = headers .get (GFE_DEBUG_RESP_HEADER );
232+ }
233+
191234 @ Override
192235 public void onClose (Status status , Metadata trailers ) {
193236 tracer .onCallFinished (status );
194237
195238 if (status .isOk ()) {
196- f .setException (new IllegalStateException ("PingAndWarm was missing a response" ));
239+ f .setException (
240+ new PingAndWarmException (
241+ "PingAndWarm was missing a response" , debugHeaders , trailers , status ));
197242 } else {
198- f .setException (status .asRuntimeException ());
243+ f .setException (
244+ new PingAndWarmException ("PingAndWarm failed" , debugHeaders , trailers , status ));
199245 }
200246 }
201247 },
@@ -207,12 +253,45 @@ public void onClose(Status status, Metadata trailers) {
207253 return f ;
208254 }
209255
256+ static class PingAndWarmException extends RuntimeException {
257+
258+ private final String debugHeaders ;
259+ private final Metadata trailers ;
260+ private final Status status ;
261+
262+ public PingAndWarmException (
263+ String message , String debugHeaders , Metadata trailers , Status status ) {
264+ super (String .format ("PingAndWarm failed, status: " + status ));
265+ this .debugHeaders = debugHeaders ;
266+ this .trailers = trailers ;
267+ this .status = status ;
268+ }
269+
270+ public String getDebugHeaders () {
271+ return debugHeaders ;
272+ }
273+
274+ public Metadata getTrailers () {
275+ return trailers ;
276+ }
277+
278+ public Status getStatus () {
279+ return status ;
280+ }
281+ }
282+
210283 @ Override
211284 public ManagedChannel shutdown () {
212- if (closed .compareAndSet (false , true )) {
285+ final boolean closing ;
286+
287+ synchronized (scheduleLock ) {
288+ closing = closed .compareAndSet (false , true );
289+ antiIdleTask .cancel (true );
290+ }
291+ if (closing ) {
213292 metrics .updateChannelCount (-1 );
214293 }
215- antiIdleTask . cancel ( true );
294+
216295 return inner .shutdown ();
217296 }
218297
@@ -228,10 +307,17 @@ public boolean isTerminated() {
228307
229308 @ Override
230309 public ManagedChannel shutdownNow () {
231- if (closed .compareAndSet (false , true )) {
310+ final boolean closing ;
311+
312+ synchronized (scheduleLock ) {
313+ closing = closed .compareAndSet (false , true );
314+ antiIdleTask .cancel (true );
315+ }
316+
317+ if (closing ) {
232318 metrics .updateChannelCount (-1 );
233319 }
234- antiIdleTask . cancel ( true );
320+
235321 return inner .shutdownNow ();
236322 }
237323
@@ -282,4 +368,20 @@ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
282368 public String authority () {
283369 return inner .authority ();
284370 }
371+
372+ class StateTransitionWatcher implements Runnable {
373+ private ConnectivityState prevState = null ;
374+
375+ @ Override
376+ public void run () {
377+ if (closed .get ()) {
378+ return ;
379+ }
380+
381+ ConnectivityState newState = inner .getState (false );
382+ metrics .recordChannelStateChange (prevState , newState );
383+ prevState = newState ;
384+ inner .notifyWhenStateChanged (prevState , this );
385+ }
386+ }
285387}
0 commit comments