Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
package com.google.api.gax.grpc;

import com.google.api.core.InternalApi;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -48,6 +49,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -72,33 +74,45 @@ class ChannelPool extends ManagedChannel {

private final ChannelPoolSettings settings;
private final ChannelFactory channelFactory;
private final ScheduledExecutorService executor;
private final FixedExecutorProvider backgroundExecutorProvider;

private ScheduledFuture<?> refreshFuture = null;
private ScheduledFuture<?> resizeFuture = null;

private final Object entryWriteLock = new Object();
@VisibleForTesting final AtomicReference<ImmutableList<Entry>> entries = new AtomicReference<>();
private final AtomicInteger indexTicker = new AtomicInteger();
private final String authority;

static ChannelPool create(ChannelPoolSettings settings, ChannelFactory channelFactory)
static ChannelPool create(
ChannelPoolSettings settings,
ChannelFactory channelFactory,
@Nullable ScheduledExecutorService backgroundExecutor)
throws IOException {
return new ChannelPool(settings, channelFactory, Executors.newSingleThreadScheduledExecutor());

FixedExecutorProvider executorProvider =
backgroundExecutor == null
? FixedExecutorProvider.create(Executors.newSingleThreadScheduledExecutor(), true)
: FixedExecutorProvider.create(backgroundExecutor, false);
return new ChannelPool(settings, channelFactory, executorProvider);
}

/**
* Initializes the channel pool. Assumes that all channels have the same authority.
*
* @param settings options for controling the ChannelPool sizing behavior
* @param channelFactory method to create the channels
* @param executor periodically refreshes the channels
* @param executorProvider provides the executor that periodically refreshes the channels
*/
@VisibleForTesting
ChannelPool(
ChannelPoolSettings settings,
ChannelFactory channelFactory,
ScheduledExecutorService executor)
FixedExecutorProvider executorProvider)
throws IOException {
this.settings = settings;
this.channelFactory = channelFactory;
this.backgroundExecutorProvider = executorProvider;

ImmutableList.Builder<Entry> initialListBuilder = ImmutableList.builder();

Expand All @@ -108,21 +122,26 @@ static ChannelPool create(ChannelPoolSettings settings, ChannelFactory channelFa

entries.set(initialListBuilder.build());
authority = entries.get().get(0).channel.authority();
this.executor = executor;

if (!settings.isStaticSize()) {
executor.scheduleAtFixedRate(
this::resizeSafely,
ChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
ChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
TimeUnit.SECONDS);
resizeFuture =
backgroundExecutorProvider
.getExecutor()
.scheduleAtFixedRate(
this::resizeSafely,
ChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
ChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
TimeUnit.SECONDS);
}
if (settings.isPreemptiveRefreshEnabled()) {
executor.scheduleAtFixedRate(
this::refreshSafely,
REFRESH_PERIOD.getSeconds(),
REFRESH_PERIOD.getSeconds(),
TimeUnit.SECONDS);
refreshFuture =
backgroundExecutorProvider
.getExecutor()
.scheduleAtFixedRate(
this::refreshSafely,
REFRESH_PERIOD.getSeconds(),
REFRESH_PERIOD.getSeconds(),
TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -153,14 +172,25 @@ Channel getChannel(int affinity) {
public ManagedChannel shutdown() {
LOG.fine("Initiating graceful shutdown due to explicit request");

// Resize and refresh tasks can block on channel priming. We don't need
// to wait for the channels to be ready since we're shutting down the
// pool. Allowing interrupt to speed it up.
if (resizeFuture != null) {
resizeFuture.cancel(true);
}
if (refreshFuture != null) {
refreshFuture.cancel(true);
}

List<Entry> localEntries = entries.get();
for (Entry entry : localEntries) {
entry.channel.shutdown();
}
if (executor != null) {
// shutdownNow will cancel scheduled tasks
executor.shutdownNow();

if (backgroundExecutorProvider.shouldAutoClose()) {
backgroundExecutorProvider.getExecutor().shutdown();
}

return this;
}

Expand All @@ -173,7 +203,7 @@ public boolean isShutdown() {
return false;
}
}
return executor == null || executor.isShutdown();
return true;
}

/** {@inheritDoc} */
Expand All @@ -185,21 +215,28 @@ public boolean isTerminated() {
return false;
}
}

return executor == null || executor.isTerminated();
return true;
}

/** {@inheritDoc} */
@Override
public ManagedChannel shutdownNow() {
LOG.fine("Initiating immediate shutdown due to explicit request");

if (resizeFuture != null) {
resizeFuture.cancel(true);
}
if (refreshFuture != null) {
refreshFuture.cancel(true);
}

List<Entry> localEntries = entries.get();
for (Entry entry : localEntries) {
entry.channel.shutdownNow();
}
if (executor != null) {
executor.shutdownNow();

if (backgroundExecutorProvider.shouldAutoClose()) {
backgroundExecutorProvider.getExecutor().shutdownNow();
}
return this;
}
Expand All @@ -216,10 +253,6 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
}
entry.channel.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS);
}
if (executor != null) {
long awaitTimeNanos = endTimeNanos - System.nanoTime();
executor.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS);
}
return isTerminated();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP

private final int processorCount;
private final Executor executor;
@Nullable private final ScheduledExecutorService backgroundExecutor;
private final HeaderProvider headerProvider;
private final boolean useS2A;
private final String endpoint;
Expand Down Expand Up @@ -181,6 +182,7 @@ public enum HardBoundTokenTypes {
private InstantiatingGrpcChannelProvider(Builder builder) {
this.processorCount = builder.processorCount;
this.executor = builder.executor;
this.backgroundExecutor = builder.backgroundExecutor;
this.headerProvider = builder.headerProvider;
this.useS2A = builder.useS2A;
this.endpoint = builder.endpoint;
Expand Down Expand Up @@ -245,6 +247,16 @@ public TransportChannelProvider withExecutor(Executor executor) {
return toBuilder().setExecutor(executor).build();
}

@Override
public boolean needsBackgroundExecutor() {
return backgroundExecutor == null;
}

@Override
public TransportChannelProvider withBackgroundExecutor(ScheduledExecutorService executor) {
Copy link
Contributor

@blakeli0 blakeli0 Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we plan to use it? Do we expect customers to create an InstantiatingGrpcChannelProvider with their own executor?
I see that the same executor is being passed to both executor and backgroundExecutor in ClientContext, if that's the expected use case, we don't have to create new setter methods in TransportChannel.

Copy link
Contributor Author

@mutianf mutianf Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChannelProvider can have 2 executors: background executor and executor. Executor is used for handling rpc callbakcs. Background executor is used for all other background tasks. In the GrpcChannelProvider case, it doesn't need an executor because Grpc ManagedChannel provides a default executor that's optimized for performance. And the background executor is used for managing channels in the channel pool. Normally we don't want to use the same background executor for callbacks and background tasks because it impacts the performances. The settings you see in ClientContext is a bit confusing and it's from an old fix where we deprecated overriding the default grpc executor on the managed channel: 95c4c7b#diff-4e2f6e463b9b7d89de68e3f1a87765080045880c8dad018750f269e311f2471f. I don't think we actually go into the if statement by default

    if (transportChannelProvider.needsExecutor() && settings.getExecutorProvider() != null) {
      transportChannelProvider = transportChannelProvider.withExecutor(backgroundExecutor);
    }

because settings.getExecutorProvider() is null. Maybe it'll be easier to understand if the code is transportChannelProvider.withExecutor(settings.getExecutorProvider().getExecutor());? I made the change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, that's much easier to understand. Thanks!

return toBuilder().setBackgroundExecutor(executor).build();
}

@Override
public boolean needsHeaders() {
return headerProvider == null;
Expand Down Expand Up @@ -356,7 +368,9 @@ private TransportChannel createChannel() throws IOException {
return GrpcTransportChannel.newBuilder()
.setManagedChannel(
ChannelPool.create(
channelPoolSettings, InstantiatingGrpcChannelProvider.this::createSingleChannel))
channelPoolSettings,
InstantiatingGrpcChannelProvider.this::createSingleChannel,
backgroundExecutor))
.setDirectPath(this.canUseDirectPath())
.build();
}
Expand Down Expand Up @@ -839,6 +853,11 @@ public ChannelPoolSettings getChannelPoolSettings() {
return channelPoolSettings;
}

/** Gets the background executor for channel refresh and resize. */
ScheduledExecutorService getBackgroundExecutor() {
return backgroundExecutor;
}

@Override
public boolean shouldAutoClose() {
return true;
Expand All @@ -855,6 +874,7 @@ public static Builder newBuilder() {
public static final class Builder {
@Deprecated private int processorCount;
private Executor executor;
private ScheduledExecutorService backgroundExecutor;
private HeaderProvider headerProvider;
private String endpoint;
private String mtlsEndpoint;
Expand Down Expand Up @@ -891,6 +911,7 @@ private Builder() {
private Builder(InstantiatingGrpcChannelProvider provider) {
this.processorCount = provider.processorCount;
this.executor = provider.executor;
this.backgroundExecutor = provider.backgroundExecutor;
this.headerProvider = provider.headerProvider;
this.endpoint = provider.endpoint;
this.useS2A = provider.useS2A;
Expand Down Expand Up @@ -950,6 +971,15 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
return setExecutor((Executor) executorProvider.getExecutor());
}

/**
* Sets the background executor for this TransportChannelProvider. The life cycle of the
* executor should be managed by the caller.
*/
public Builder setBackgroundExecutor(ScheduledExecutorService executor) {
this.backgroundExecutor = executor;
return this;
}

/**
* Sets the HeaderProvider for this TransportChannelProvider.
*
Expand Down
Loading
Loading