Skip to content

Commit 178182c

Browse files
mutianfblakeli0
andauthored
fix: provide API to share the same background executor for channel po… (#4030)
…ol resizing and refresh Before this change ChannelPool creates a new SingleThreadScheduledExecutor every time getTransportChannel is called. We can reuse the same background executor from the clientSettings and can reduce the number of threads when the user creates many instances of the client. --------- Co-authored-by: Blake Li <[email protected]>
1 parent ad05c34 commit 178182c

File tree

9 files changed

+179
-53
lines changed

9 files changed

+179
-53
lines changed

gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
package com.google.api.gax.grpc;
3131

3232
import com.google.api.core.InternalApi;
33+
import com.google.api.gax.core.FixedExecutorProvider;
3334
import com.google.common.annotations.VisibleForTesting;
3435
import com.google.common.base.Preconditions;
3536
import com.google.common.collect.ImmutableList;
@@ -48,6 +49,7 @@
4849
import java.util.concurrent.CancellationException;
4950
import java.util.concurrent.Executors;
5051
import java.util.concurrent.ScheduledExecutorService;
52+
import java.util.concurrent.ScheduledFuture;
5153
import java.util.concurrent.TimeUnit;
5254
import java.util.concurrent.atomic.AtomicBoolean;
5355
import java.util.concurrent.atomic.AtomicInteger;
@@ -72,33 +74,45 @@ class ChannelPool extends ManagedChannel {
7274

7375
private final ChannelPoolSettings settings;
7476
private final ChannelFactory channelFactory;
75-
private final ScheduledExecutorService executor;
77+
private final FixedExecutorProvider backgroundExecutorProvider;
78+
79+
private ScheduledFuture<?> refreshFuture = null;
80+
private ScheduledFuture<?> resizeFuture = null;
7681

7782
private final Object entryWriteLock = new Object();
7883
@VisibleForTesting final AtomicReference<ImmutableList<Entry>> entries = new AtomicReference<>();
7984
private final AtomicInteger indexTicker = new AtomicInteger();
8085
private final String authority;
8186

82-
static ChannelPool create(ChannelPoolSettings settings, ChannelFactory channelFactory)
87+
static ChannelPool create(
88+
ChannelPoolSettings settings,
89+
ChannelFactory channelFactory,
90+
@Nullable ScheduledExecutorService backgroundExecutor)
8391
throws IOException {
84-
return new ChannelPool(settings, channelFactory, Executors.newSingleThreadScheduledExecutor());
92+
93+
FixedExecutorProvider executorProvider =
94+
backgroundExecutor == null
95+
? FixedExecutorProvider.create(Executors.newSingleThreadScheduledExecutor(), true)
96+
: FixedExecutorProvider.create(backgroundExecutor, false);
97+
return new ChannelPool(settings, channelFactory, executorProvider);
8598
}
8699

87100
/**
88101
* Initializes the channel pool. Assumes that all channels have the same authority.
89102
*
90103
* @param settings options for controling the ChannelPool sizing behavior
91104
* @param channelFactory method to create the channels
92-
* @param executor periodically refreshes the channels
105+
* @param executorProvider provides the executor that periodically refreshes the channels
93106
*/
94107
@VisibleForTesting
95108
ChannelPool(
96109
ChannelPoolSettings settings,
97110
ChannelFactory channelFactory,
98-
ScheduledExecutorService executor)
111+
FixedExecutorProvider executorProvider)
99112
throws IOException {
100113
this.settings = settings;
101114
this.channelFactory = channelFactory;
115+
this.backgroundExecutorProvider = executorProvider;
102116

103117
ImmutableList.Builder<Entry> initialListBuilder = ImmutableList.builder();
104118

@@ -108,21 +122,26 @@ static ChannelPool create(ChannelPoolSettings settings, ChannelFactory channelFa
108122

109123
entries.set(initialListBuilder.build());
110124
authority = entries.get().get(0).channel.authority();
111-
this.executor = executor;
112125

113126
if (!settings.isStaticSize()) {
114-
executor.scheduleAtFixedRate(
115-
this::resizeSafely,
116-
ChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
117-
ChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
118-
TimeUnit.SECONDS);
127+
resizeFuture =
128+
backgroundExecutorProvider
129+
.getExecutor()
130+
.scheduleAtFixedRate(
131+
this::resizeSafely,
132+
ChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
133+
ChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
134+
TimeUnit.SECONDS);
119135
}
120136
if (settings.isPreemptiveRefreshEnabled()) {
121-
executor.scheduleAtFixedRate(
122-
this::refreshSafely,
123-
REFRESH_PERIOD.getSeconds(),
124-
REFRESH_PERIOD.getSeconds(),
125-
TimeUnit.SECONDS);
137+
refreshFuture =
138+
backgroundExecutorProvider
139+
.getExecutor()
140+
.scheduleAtFixedRate(
141+
this::refreshSafely,
142+
REFRESH_PERIOD.getSeconds(),
143+
REFRESH_PERIOD.getSeconds(),
144+
TimeUnit.SECONDS);
126145
}
127146
}
128147

@@ -153,14 +172,25 @@ Channel getChannel(int affinity) {
153172
public ManagedChannel shutdown() {
154173
LOG.fine("Initiating graceful shutdown due to explicit request");
155174

175+
// Resize and refresh tasks can block on channel priming. We don't need
176+
// to wait for the channels to be ready since we're shutting down the
177+
// pool. Allowing interrupt to speed it up.
178+
if (resizeFuture != null) {
179+
resizeFuture.cancel(true);
180+
}
181+
if (refreshFuture != null) {
182+
refreshFuture.cancel(true);
183+
}
184+
156185
List<Entry> localEntries = entries.get();
157186
for (Entry entry : localEntries) {
158187
entry.channel.shutdown();
159188
}
160-
if (executor != null) {
161-
// shutdownNow will cancel scheduled tasks
162-
executor.shutdownNow();
189+
190+
if (backgroundExecutorProvider.shouldAutoClose()) {
191+
backgroundExecutorProvider.getExecutor().shutdown();
163192
}
193+
164194
return this;
165195
}
166196

@@ -173,7 +203,7 @@ public boolean isShutdown() {
173203
return false;
174204
}
175205
}
176-
return executor == null || executor.isShutdown();
206+
return true;
177207
}
178208

179209
/** {@inheritDoc} */
@@ -185,21 +215,28 @@ public boolean isTerminated() {
185215
return false;
186216
}
187217
}
188-
189-
return executor == null || executor.isTerminated();
218+
return true;
190219
}
191220

192221
/** {@inheritDoc} */
193222
@Override
194223
public ManagedChannel shutdownNow() {
195224
LOG.fine("Initiating immediate shutdown due to explicit request");
196225

226+
if (resizeFuture != null) {
227+
resizeFuture.cancel(true);
228+
}
229+
if (refreshFuture != null) {
230+
refreshFuture.cancel(true);
231+
}
232+
197233
List<Entry> localEntries = entries.get();
198234
for (Entry entry : localEntries) {
199235
entry.channel.shutdownNow();
200236
}
201-
if (executor != null) {
202-
executor.shutdownNow();
237+
238+
if (backgroundExecutorProvider.shouldAutoClose()) {
239+
backgroundExecutorProvider.getExecutor().shutdownNow();
203240
}
204241
return this;
205242
}
@@ -216,10 +253,6 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
216253
}
217254
entry.channel.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS);
218255
}
219-
if (executor != null) {
220-
long awaitTimeNanos = endTimeNanos - System.nanoTime();
221-
executor.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS);
222-
}
223256
return isTerminated();
224257
}
225258

gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
129129

130130
private final int processorCount;
131131
private final Executor executor;
132+
@Nullable private final ScheduledExecutorService backgroundExecutor;
132133
private final HeaderProvider headerProvider;
133134
private final boolean useS2A;
134135
private final String endpoint;
@@ -181,6 +182,7 @@ public enum HardBoundTokenTypes {
181182
private InstantiatingGrpcChannelProvider(Builder builder) {
182183
this.processorCount = builder.processorCount;
183184
this.executor = builder.executor;
185+
this.backgroundExecutor = builder.backgroundExecutor;
184186
this.headerProvider = builder.headerProvider;
185187
this.useS2A = builder.useS2A;
186188
this.endpoint = builder.endpoint;
@@ -245,6 +247,16 @@ public TransportChannelProvider withExecutor(Executor executor) {
245247
return toBuilder().setExecutor(executor).build();
246248
}
247249

250+
@Override
251+
public boolean needsBackgroundExecutor() {
252+
return backgroundExecutor == null;
253+
}
254+
255+
@Override
256+
public TransportChannelProvider withBackgroundExecutor(ScheduledExecutorService executor) {
257+
return toBuilder().setBackgroundExecutor(executor).build();
258+
}
259+
248260
@Override
249261
public boolean needsHeaders() {
250262
return headerProvider == null;
@@ -356,7 +368,9 @@ private TransportChannel createChannel() throws IOException {
356368
return GrpcTransportChannel.newBuilder()
357369
.setManagedChannel(
358370
ChannelPool.create(
359-
channelPoolSettings, InstantiatingGrpcChannelProvider.this::createSingleChannel))
371+
channelPoolSettings,
372+
InstantiatingGrpcChannelProvider.this::createSingleChannel,
373+
backgroundExecutor))
360374
.setDirectPath(this.canUseDirectPath())
361375
.build();
362376
}
@@ -839,6 +853,11 @@ public ChannelPoolSettings getChannelPoolSettings() {
839853
return channelPoolSettings;
840854
}
841855

856+
/** Gets the background executor for channel refresh and resize. */
857+
ScheduledExecutorService getBackgroundExecutor() {
858+
return backgroundExecutor;
859+
}
860+
842861
@Override
843862
public boolean shouldAutoClose() {
844863
return true;
@@ -855,6 +874,7 @@ public static Builder newBuilder() {
855874
public static final class Builder {
856875
@Deprecated private int processorCount;
857876
private Executor executor;
877+
private ScheduledExecutorService backgroundExecutor;
858878
private HeaderProvider headerProvider;
859879
private String endpoint;
860880
private String mtlsEndpoint;
@@ -891,6 +911,7 @@ private Builder() {
891911
private Builder(InstantiatingGrpcChannelProvider provider) {
892912
this.processorCount = provider.processorCount;
893913
this.executor = provider.executor;
914+
this.backgroundExecutor = provider.backgroundExecutor;
894915
this.headerProvider = provider.headerProvider;
895916
this.endpoint = provider.endpoint;
896917
this.useS2A = provider.useS2A;
@@ -950,6 +971,15 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
950971
return setExecutor((Executor) executorProvider.getExecutor());
951972
}
952973

974+
/**
975+
* Sets the background executor for this TransportChannelProvider. The life cycle of the
976+
* executor should be managed by the caller.
977+
*/
978+
public Builder setBackgroundExecutor(ScheduledExecutorService executor) {
979+
this.backgroundExecutor = executor;
980+
return this;
981+
}
982+
953983
/**
954984
* Sets the HeaderProvider for this TransportChannelProvider.
955985
*

0 commit comments

Comments
 (0)