Skip to content

Commit aeedb00

Browse files
nicholslmutianf
andauthored
chore: Move channel priming out of InstantiatingGrpcTransportProvider (#2644)
* chore: Move channel priming out of InstantiatingGrpcTransportProvider Change-Id: I7214aa3016bd7e7f7f167c64cbaa04134b54a352 * delete comment Change-Id: I26d9b4929be6ba941eb88da05244fc031a747b68 * fixes Change-Id: Ib414d250f9f1ce0555450e81a706f6c0619d80f2 * Update channel priming annotation Co-authored-by: Mattie Fu <[email protected]> * Update google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java Co-authored-by: Mattie Fu <[email protected]> * fixes Change-Id: I2f27c788652c9e5e12ea470355098c73b29ab40a * add no op channel primer Change-Id: I343b3dc44d12638be2af32398b6aaa8529c7ebfb * add noop channel primer test Change-Id: I2a11cd44ddd9caaea4db918f66aa7313a31a0057 * lint Change-Id: Ifc564bcd182d4ea18e8e63ce78787281d589d8fd * feat/add-async-channel-ping Change-Id: I3e390b6f7a6c9beaee52d74f37ef557629af7759 * lint Change-Id: I5e2b13674c7fc945d5567b7f86478c847017aa41 * fixes Change-Id: Ibaa32ee3425b23596a3141bb4e29cfb068caf0ee * fixes Change-Id: I3d43864fee0b05cc64f27aabcb2ffaf84c9c0ab8 * minor change to test Change-Id: I63f9f0e930f95d48ce56962e49d6bdca0fc5918d * clirr-ignored-differences.xml Change-Id: Ia4e25a7c80256762e241f30a585056aa78fb2878 * increase timeout for testing purposes Change-Id: I96fb32e098862546aae0de498637459c5a9e197d * increase offset jitter to make more resilient to flakes Change-Id: I8754683cae05ed9fd0267a1fd16746756450c15e * more fault tolerance Change-Id: I997c3752d8b5ae13084762a7eb3fc6b042ead206 * revert flaky test changes Change-Id: I8e0ad310444c754df71dc61782b23d007bbfdaba --------- Co-authored-by: Mattie Fu <[email protected]>
1 parent e219c38 commit aeedb00

File tree

8 files changed

+218
-55
lines changed

8 files changed

+218
-55
lines changed

google-cloud-bigtable/clirr-ignored-differences.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,4 +382,18 @@
382382
<method>*</method>
383383
<to>*</to>
384384
</difference>
385+
<difference>
386+
<!-- InternalApi was updated -->
387+
<differenceType>7004</differenceType>
388+
<className>com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool</className>
389+
<method>*create*</method>
390+
<to>*</to>
391+
</difference>
392+
<difference>
393+
<!-- InternalApi was updated -->
394+
<differenceType>7004</differenceType>
395+
<className>com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider</className>
396+
<method>*create*</method>
397+
<to>*</to>
398+
</difference>
385399
</differences>

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.stub;
1717

18-
import com.google.api.core.BetaApi;
18+
import com.google.api.core.InternalApi;
1919
import com.google.api.core.SettableApiFuture;
2020
import com.google.api.gax.grpc.ChannelPrimer;
2121
import com.google.auth.Credentials;
@@ -41,13 +41,13 @@
4141

4242
/**
4343
* A channel warmer that ensures that a Bigtable channel is ready to be used before being added to
44-
* the active {@link com.google.api.gax.grpc.ChannelPool}.
44+
* the active {@link com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPool}.
4545
*
4646
* <p>This implementation is subject to change in the future, but currently it will prime the
4747
* channel by sending a ReadRow request for a hardcoded, non-existent row key.
4848
*/
49-
@BetaApi("Channel priming is not currently stable and might change in the future")
50-
class BigtableChannelPrimer implements ChannelPrimer {
49+
@InternalApi
50+
public class BigtableChannelPrimer implements ChannelPrimer {
5151
private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString());
5252

5353
static final Metadata.Key<String> REQUEST_PARAMS =
@@ -96,43 +96,12 @@ public void primeChannel(ManagedChannel managedChannel) {
9696
}
9797

9898
private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException {
99-
sendPrimeRequests(managedChannel);
99+
sendPrimeRequestsBlocking(managedChannel);
100100
}
101101

102-
private void sendPrimeRequests(ManagedChannel managedChannel) {
102+
private void sendPrimeRequestsBlocking(ManagedChannel managedChannel) {
103103
try {
104-
ClientCall<PingAndWarmRequest, PingAndWarmResponse> clientCall =
105-
managedChannel.newCall(
106-
BigtableGrpc.getPingAndWarmMethod(),
107-
CallOptions.DEFAULT
108-
.withCallCredentials(callCredentials)
109-
.withDeadline(Deadline.after(1, TimeUnit.MINUTES)));
110-
111-
SettableApiFuture<PingAndWarmResponse> future = SettableApiFuture.create();
112-
clientCall.start(
113-
new ClientCall.Listener<PingAndWarmResponse>() {
114-
PingAndWarmResponse response;
115-
116-
@Override
117-
public void onMessage(PingAndWarmResponse message) {
118-
response = message;
119-
}
120-
121-
@Override
122-
public void onClose(Status status, Metadata trailers) {
123-
if (status.isOk()) {
124-
future.set(response);
125-
} else {
126-
future.setException(status.asException());
127-
}
128-
}
129-
},
130-
createMetadata(headers, request));
131-
clientCall.sendMessage(request);
132-
clientCall.halfClose();
133-
clientCall.request(Integer.MAX_VALUE);
134-
135-
future.get(1, TimeUnit.MINUTES);
104+
sendPrimeRequestsAsync(managedChannel).get(1, TimeUnit.MINUTES);
136105
} catch (Throwable e) {
137106
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
138107
// channels if the new
@@ -141,6 +110,53 @@ public void onClose(Status status, Metadata trailers) {
141110
}
142111
}
143112

113+
public SettableApiFuture<PingAndWarmResponse> sendPrimeRequestsAsync(
114+
ManagedChannel managedChannel) {
115+
ClientCall<PingAndWarmRequest, PingAndWarmResponse> clientCall =
116+
managedChannel.newCall(
117+
BigtableGrpc.getPingAndWarmMethod(),
118+
CallOptions.DEFAULT
119+
.withCallCredentials(callCredentials)
120+
.withDeadline(Deadline.after(1, TimeUnit.MINUTES)));
121+
122+
SettableApiFuture<PingAndWarmResponse> future = SettableApiFuture.create();
123+
clientCall.start(
124+
new ClientCall.Listener<PingAndWarmResponse>() {
125+
private PingAndWarmResponse response;
126+
127+
@Override
128+
public void onMessage(PingAndWarmResponse message) {
129+
response = message;
130+
}
131+
132+
@Override
133+
public void onClose(Status status, Metadata trailers) {
134+
if (status.isOk()) {
135+
future.set(response);
136+
} else {
137+
// Propagate the gRPC error to the future.
138+
future.setException(status.asException(trailers));
139+
}
140+
}
141+
},
142+
createMetadata(headers, request));
143+
144+
try {
145+
// Send the request message.
146+
clientCall.sendMessage(request);
147+
// Signal that no more messages will be sent.
148+
clientCall.halfClose();
149+
// Request the response from the server.
150+
clientCall.request(Integer.MAX_VALUE);
151+
} catch (Throwable t) {
152+
// If sending fails, cancel the call and notify the future.
153+
clientCall.cancel("Failed to send priming request", t);
154+
future.setException(t);
155+
}
156+
157+
return future;
158+
}
159+
144160
private static Metadata createMetadata(Map<String, String> headers, PingAndWarmRequest request) {
145161
Metadata metadata = new Metadata();
146162

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.api.gax.core.BackgroundResource;
2121
import com.google.api.gax.core.CredentialsProvider;
2222
import com.google.api.gax.core.FixedCredentialsProvider;
23+
import com.google.api.gax.grpc.ChannelPrimer;
2324
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
2425
import com.google.api.gax.rpc.ClientContext;
2526
import com.google.auth.Credentials;
@@ -121,20 +122,22 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
121122
setupCookieHolder(transportProvider);
122123
}
123124

125+
ChannelPrimer channelPrimer = NoOpChannelPrimer.create();
126+
124127
// Inject channel priming if enabled
125128
if (builder.isRefreshingChannel()) {
126-
transportProvider.setChannelPrimer(
129+
channelPrimer =
127130
BigtableChannelPrimer.create(
128131
builder.getProjectId(),
129132
builder.getInstanceId(),
130133
builder.getAppProfileId(),
131134
credentials,
132-
builder.getHeaderProvider().getHeaders()));
135+
builder.getHeaderProvider().getHeaders());
133136
}
134137

135138
BigtableTransportChannelProvider btTransportProvider =
136139
BigtableTransportChannelProvider.create(
137-
(InstantiatingGrpcChannelProvider) transportProvider.build());
140+
(InstantiatingGrpcChannelProvider) transportProvider.build(), channelPrimer);
138141

139142
builder.setTransportChannelProvider(btTransportProvider);
140143
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.grpc.ChannelPrimer;
20+
import io.grpc.ManagedChannel;
21+
22+
@InternalApi
23+
public class NoOpChannelPrimer implements ChannelPrimer {
24+
static NoOpChannelPrimer create() {
25+
return new NoOpChannelPrimer();
26+
}
27+
28+
private NoOpChannelPrimer() {}
29+
30+
@Override
31+
public void primeChannel(ManagedChannel managedChannel) {
32+
// No op
33+
}
34+
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.google.api.core.InternalApi;
1919
import com.google.api.gax.grpc.ChannelFactory;
20+
import com.google.api.gax.grpc.ChannelPrimer;
2021
import com.google.common.annotations.VisibleForTesting;
2122
import com.google.common.base.Preconditions;
2223
import com.google.common.collect.ImmutableList;
@@ -60,6 +61,8 @@ public class BigtableChannelPool extends ManagedChannel {
6061

6162
private final BigtableChannelPoolSettings settings;
6263
private final ChannelFactory channelFactory;
64+
65+
private final ChannelPrimer channelPrimer;
6366
private final ScheduledExecutorService executor;
6467

6568
private final Object entryWriteLock = new Object();
@@ -68,9 +71,12 @@ public class BigtableChannelPool extends ManagedChannel {
6871
private final String authority;
6972

7073
public static BigtableChannelPool create(
71-
BigtableChannelPoolSettings settings, ChannelFactory channelFactory) throws IOException {
74+
BigtableChannelPoolSettings settings,
75+
ChannelFactory channelFactory,
76+
ChannelPrimer channelPrimer)
77+
throws IOException {
7278
return new BigtableChannelPool(
73-
settings, channelFactory, Executors.newSingleThreadScheduledExecutor());
79+
settings, channelFactory, channelPrimer, Executors.newSingleThreadScheduledExecutor());
7480
}
7581

7682
/**
@@ -84,15 +90,19 @@ public static BigtableChannelPool create(
8490
BigtableChannelPool(
8591
BigtableChannelPoolSettings settings,
8692
ChannelFactory channelFactory,
93+
ChannelPrimer channelPrimer,
8794
ScheduledExecutorService executor)
8895
throws IOException {
8996
this.settings = settings;
9097
this.channelFactory = channelFactory;
98+
this.channelPrimer = channelPrimer;
9199

92100
ImmutableList.Builder<Entry> initialListBuilder = ImmutableList.builder();
93101

94102
for (int i = 0; i < settings.getInitialChannelCount(); i++) {
95-
initialListBuilder.add(new Entry(channelFactory.createSingleChannel()));
103+
ManagedChannel newChannel = channelFactory.createSingleChannel();
104+
channelPrimer.primeChannel(newChannel);
105+
initialListBuilder.add(new Entry(newChannel));
96106
}
97107

98108
entries.set(initialListBuilder.build());
@@ -316,7 +326,9 @@ private void expand(int desiredSize) {
316326

317327
for (int i = 0; i < desiredSize - localEntries.size(); i++) {
318328
try {
319-
newEntries.add(new Entry(channelFactory.createSingleChannel()));
329+
ManagedChannel newChannel = channelFactory.createSingleChannel();
330+
this.channelPrimer.primeChannel(newChannel);
331+
newEntries.add(new Entry(newChannel));
320332
} catch (IOException e) {
321333
LOG.log(Level.WARNING, "Failed to add channel", e);
322334
}
@@ -354,7 +366,9 @@ void refresh() {
354366

355367
for (int i = 0; i < newEntries.size(); i++) {
356368
try {
357-
newEntries.set(i, new Entry(channelFactory.createSingleChannel()));
369+
ManagedChannel newChannel = channelFactory.createSingleChannel();
370+
this.channelPrimer.primeChannel(newChannel);
371+
newEntries.set(i, new Entry(newChannel));
358372
} catch (IOException e) {
359373
LOG.log(Level.WARNING, "Failed to refresh channel, leaving old channel", e);
360374
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.api.core.InternalApi;
1919
import com.google.api.gax.grpc.ChannelFactory;
2020
import com.google.api.gax.grpc.ChannelPoolSettings;
21+
import com.google.api.gax.grpc.ChannelPrimer;
2122
import com.google.api.gax.grpc.GrpcTransportChannel;
2223
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
2324
import com.google.api.gax.rpc.TransportChannel;
@@ -38,10 +39,13 @@
3839
public final class BigtableTransportChannelProvider implements TransportChannelProvider {
3940

4041
private final InstantiatingGrpcChannelProvider delegate;
42+
private final ChannelPrimer channelPrimer;
4143

4244
private BigtableTransportChannelProvider(
43-
InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider) {
45+
InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider,
46+
ChannelPrimer channelPrimer) {
4447
delegate = Preconditions.checkNotNull(instantiatingGrpcChannelProvider);
48+
this.channelPrimer = channelPrimer;
4549
}
4650

4751
@Override
@@ -63,7 +67,7 @@ public BigtableTransportChannelProvider withExecutor(ScheduledExecutorService ex
6367
public BigtableTransportChannelProvider withExecutor(Executor executor) {
6468
InstantiatingGrpcChannelProvider newChannelProvider =
6569
(InstantiatingGrpcChannelProvider) delegate.withExecutor(executor);
66-
return new BigtableTransportChannelProvider(newChannelProvider);
70+
return new BigtableTransportChannelProvider(newChannelProvider, channelPrimer);
6771
}
6872

6973
@Override
@@ -75,7 +79,7 @@ public boolean needsHeaders() {
7579
public BigtableTransportChannelProvider withHeaders(Map<String, String> headers) {
7680
InstantiatingGrpcChannelProvider newChannelProvider =
7781
(InstantiatingGrpcChannelProvider) delegate.withHeaders(headers);
78-
return new BigtableTransportChannelProvider(newChannelProvider);
82+
return new BigtableTransportChannelProvider(newChannelProvider, channelPrimer);
7983
}
8084

8185
@Override
@@ -87,7 +91,7 @@ public boolean needsEndpoint() {
8791
public TransportChannelProvider withEndpoint(String endpoint) {
8892
InstantiatingGrpcChannelProvider newChannelProvider =
8993
(InstantiatingGrpcChannelProvider) delegate.withEndpoint(endpoint);
90-
return new BigtableTransportChannelProvider(newChannelProvider);
94+
return new BigtableTransportChannelProvider(newChannelProvider, channelPrimer);
9195
}
9296

9397
@Deprecated
@@ -101,7 +105,7 @@ public boolean acceptsPoolSize() {
101105
public TransportChannelProvider withPoolSize(int size) {
102106
InstantiatingGrpcChannelProvider newChannelProvider =
103107
(InstantiatingGrpcChannelProvider) delegate.withPoolSize(size);
104-
return new BigtableTransportChannelProvider(newChannelProvider);
108+
return new BigtableTransportChannelProvider(newChannelProvider, channelPrimer);
105109
}
106110

107111
/** Expected to only be called once when BigtableClientContext is created */
@@ -130,7 +134,8 @@ public TransportChannel getTransportChannel() throws IOException {
130134
BigtableChannelPoolSettings btPoolSettings =
131135
BigtableChannelPoolSettings.copyFrom(delegate.getChannelPoolSettings());
132136

133-
BigtableChannelPool btChannelPool = BigtableChannelPool.create(btPoolSettings, channelFactory);
137+
BigtableChannelPool btChannelPool =
138+
BigtableChannelPool.create(btPoolSettings, channelFactory, channelPrimer);
134139

135140
return GrpcTransportChannel.create(btChannelPool);
136141
}
@@ -149,12 +154,13 @@ public boolean needsCredentials() {
149154
public TransportChannelProvider withCredentials(Credentials credentials) {
150155
InstantiatingGrpcChannelProvider newChannelProvider =
151156
(InstantiatingGrpcChannelProvider) delegate.withCredentials(credentials);
152-
return new BigtableTransportChannelProvider(newChannelProvider);
157+
return new BigtableTransportChannelProvider(newChannelProvider, channelPrimer);
153158
}
154159

155160
/** Creates a BigtableTransportChannelProvider. */
156161
public static BigtableTransportChannelProvider create(
157-
InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider) {
158-
return new BigtableTransportChannelProvider(instantiatingGrpcChannelProvider);
162+
InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider,
163+
ChannelPrimer channelPrimer) {
164+
return new BigtableTransportChannelProvider(instantiatingGrpcChannelProvider, channelPrimer);
159165
}
160166
}

0 commit comments

Comments
 (0)