Skip to content

Commit 923a14a

Browse files
feat: add support of dynamic channel pooling (#4265)
* feat: add support of dynamic channel pooling * set initial pool size to 0 make dynamic channel pool work * chore: generate libraries at Tue Dec 16 06:00:50 UTC 2025 * bump grpc-gcp-java version * fix test * incorporate suggestions * chore: generate libraries at Tue Dec 16 09:56:45 UTC 2025 * make dynamic channel pool default disabled * add verifySameChannelId back and fix partitionDML test * support setting GcpChannelPoolOptions directly --------- Co-authored-by: cloud-java-bot <[email protected]>
1 parent 7837724 commit 923a14a

File tree

8 files changed

+841
-152
lines changed

8 files changed

+841
-152
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.google.cloud.ServiceRpc;
4444
import com.google.cloud.TransportOptions;
4545
import com.google.cloud.grpc.GcpManagedChannelOptions;
46+
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions;
4647
import com.google.cloud.grpc.GrpcTransportOptions;
4748
import com.google.cloud.spanner.Options.DirectedReadOption;
4849
import com.google.cloud.spanner.Options.QueryOption;
@@ -134,6 +135,72 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
134135
// is enabled, to make sure there are sufficient channels available to move the sessions to a
135136
// different channel if a network connection in a particular channel fails.
136137
@VisibleForTesting static final int GRPC_GCP_ENABLED_DEFAULT_CHANNELS = 8;
138+
139+
// Dynamic Channel Pool (DCP) default values and bounds
140+
/** Default max concurrent RPCs per channel before triggering scale up. */
141+
public static final int DEFAULT_DYNAMIC_POOL_MAX_RPC = 25;
142+
143+
/** Default min concurrent RPCs per channel for scale down check. */
144+
public static final int DEFAULT_DYNAMIC_POOL_MIN_RPC = 15;
145+
146+
/** Default scale down check interval. */
147+
public static final Duration DEFAULT_DYNAMIC_POOL_SCALE_DOWN_INTERVAL = Duration.ofMinutes(3);
148+
149+
/** Default initial number of channels for dynamic pool. */
150+
public static final int DEFAULT_DYNAMIC_POOL_INITIAL_SIZE = 4;
151+
152+
/** Default max number of channels for dynamic pool. */
153+
public static final int DEFAULT_DYNAMIC_POOL_MAX_CHANNELS = 10;
154+
155+
/** Default min number of channels for dynamic pool. */
156+
public static final int DEFAULT_DYNAMIC_POOL_MIN_CHANNELS = 2;
157+
158+
/**
159+
* Default affinity key lifetime for dynamic channel pool. This is how long to keep an affinity
160+
* key after its last use. Zero means keeping keys forever. Default is 10 minutes, which is
161+
* sufficient to ensure that requests within a single transaction use the same channel.
162+
*/
163+
public static final Duration DEFAULT_DYNAMIC_POOL_AFFINITY_KEY_LIFETIME = Duration.ofMinutes(10);
164+
165+
/**
166+
* Default cleanup interval for dynamic channel pool affinity keys. This is how frequently the
167+
* affinity key cleanup process runs. Default is 1 minute (1/10 of default affinity key lifetime).
168+
*/
169+
public static final Duration DEFAULT_DYNAMIC_POOL_CLEANUP_INTERVAL = Duration.ofMinutes(1);
170+
171+
/**
172+
* Creates a {@link GcpChannelPoolOptions} instance with Spanner-specific defaults for dynamic
173+
* channel pooling. These defaults are optimized for typical Spanner workloads.
174+
*
175+
* <p>Default values:
176+
*
177+
* <ul>
178+
* <li>Max size: {@value #DEFAULT_DYNAMIC_POOL_MAX_CHANNELS}
179+
* <li>Min size: {@value #DEFAULT_DYNAMIC_POOL_MIN_CHANNELS}
180+
* <li>Initial size: {@value #DEFAULT_DYNAMIC_POOL_INITIAL_SIZE}
181+
* <li>Max RPC per channel: {@value #DEFAULT_DYNAMIC_POOL_MAX_RPC}
182+
* <li>Min RPC per channel: {@value #DEFAULT_DYNAMIC_POOL_MIN_RPC}
183+
* <li>Scale down interval: 3 minutes
184+
* <li>Affinity key lifetime: 10 minutes
185+
* <li>Cleanup interval: 1 minute
186+
* </ul>
187+
*
188+
* @return a new {@link GcpChannelPoolOptions} instance with Spanner defaults
189+
*/
190+
public static GcpChannelPoolOptions createDefaultDynamicChannelPoolOptions() {
191+
return GcpChannelPoolOptions.newBuilder()
192+
.setMaxSize(DEFAULT_DYNAMIC_POOL_MAX_CHANNELS)
193+
.setMinSize(DEFAULT_DYNAMIC_POOL_MIN_CHANNELS)
194+
.setInitSize(DEFAULT_DYNAMIC_POOL_INITIAL_SIZE)
195+
.setDynamicScaling(
196+
DEFAULT_DYNAMIC_POOL_MIN_RPC,
197+
DEFAULT_DYNAMIC_POOL_MAX_RPC,
198+
DEFAULT_DYNAMIC_POOL_SCALE_DOWN_INTERVAL)
199+
.setAffinityKeyLifetime(DEFAULT_DYNAMIC_POOL_AFFINITY_KEY_LIFETIME)
200+
.setCleanupInterval(DEFAULT_DYNAMIC_POOL_CLEANUP_INTERVAL)
201+
.build();
202+
}
203+
137204
private final TransportChannelProvider channelProvider;
138205

139206
@SuppressWarnings("rawtypes")
@@ -153,6 +220,8 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
153220
private final Duration partitionedDmlTimeout;
154221
private final boolean grpcGcpExtensionEnabled;
155222
private final GcpManagedChannelOptions grpcGcpOptions;
223+
private final boolean dynamicChannelPoolEnabled;
224+
private final GcpChannelPoolOptions gcpChannelPoolOptions;
156225
private final boolean autoThrottleAdministrativeRequests;
157226
private final RetrySettings retryAdministrativeRequestsSettings;
158227
private final boolean trackTransactionStarter;
@@ -800,6 +869,26 @@ protected SpannerOptions(Builder builder) {
800869
partitionedDmlTimeout = builder.partitionedDmlTimeout;
801870
grpcGcpExtensionEnabled = builder.grpcGcpExtensionEnabled;
802871
grpcGcpOptions = builder.grpcGcpOptions;
872+
873+
// Dynamic channel pooling is disabled by default.
874+
// It is only enabled when:
875+
// 1. enableDynamicChannelPool() was explicitly called, AND
876+
// 2. grpc-gcp extension is enabled, AND
877+
// 3. numChannels was not explicitly set
878+
if (builder.dynamicChannelPoolEnabled != null && builder.dynamicChannelPoolEnabled) {
879+
// DCP was explicitly enabled, but respect numChannels if set
880+
dynamicChannelPoolEnabled = grpcGcpExtensionEnabled && !builder.numChannelsExplicitlySet;
881+
} else {
882+
// DCP is disabled by default, or was explicitly disabled
883+
dynamicChannelPoolEnabled = false;
884+
}
885+
886+
// Use user-provided GcpChannelPoolOptions or create Spanner-specific defaults
887+
gcpChannelPoolOptions =
888+
builder.gcpChannelPoolOptions != null
889+
? builder.gcpChannelPoolOptions
890+
: createDefaultDynamicChannelPoolOptions();
891+
803892
autoThrottleAdministrativeRequests = builder.autoThrottleAdministrativeRequests;
804893
retryAdministrativeRequestsSettings = builder.retryAdministrativeRequestsSettings;
805894
trackTransactionStarter = builder.trackTransactionStarter;
@@ -1010,6 +1099,7 @@ public static class Builder
10101099
private GrpcInterceptorProvider interceptorProvider;
10111100

10121101
private Integer numChannels;
1102+
private boolean numChannelsExplicitlySet = false;
10131103

10141104
private String transportChannelExecutorThreadNameFormat = "Cloud-Spanner-TransportChannel-%d";
10151105

@@ -1027,6 +1117,8 @@ public static class Builder
10271117
private Duration partitionedDmlTimeout = Duration.ofHours(2L);
10281118
private boolean grpcGcpExtensionEnabled = true;
10291119
private GcpManagedChannelOptions grpcGcpOptions;
1120+
private Boolean dynamicChannelPoolEnabled;
1121+
private GcpChannelPoolOptions gcpChannelPoolOptions;
10301122
private RetrySettings retryAdministrativeRequestsSettings =
10311123
DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS;
10321124
private boolean autoThrottleAdministrativeRequests = false;
@@ -1099,6 +1191,8 @@ protected Builder() {
10991191
this.partitionedDmlTimeout = options.partitionedDmlTimeout;
11001192
this.grpcGcpExtensionEnabled = options.grpcGcpExtensionEnabled;
11011193
this.grpcGcpOptions = options.grpcGcpOptions;
1194+
this.dynamicChannelPoolEnabled = options.dynamicChannelPoolEnabled;
1195+
this.gcpChannelPoolOptions = options.gcpChannelPoolOptions;
11021196
this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests;
11031197
this.retryAdministrativeRequestsSettings = options.retryAdministrativeRequestsSettings;
11041198
this.trackTransactionStarter = options.trackTransactionStarter;
@@ -1189,6 +1283,7 @@ public Builder setInterceptorProvider(GrpcInterceptorProvider interceptorProvide
11891283
*/
11901284
public Builder setNumChannels(int numChannels) {
11911285
this.numChannels = numChannels;
1286+
this.numChannelsExplicitlySet = true;
11921287
return this;
11931288
}
11941289

@@ -1578,6 +1673,62 @@ public Builder disableGrpcGcpExtension() {
15781673
return this;
15791674
}
15801675

1676+
/**
1677+
* Enables dynamic channel pooling. When enabled, the client will automatically scale the number
1678+
* of channels based on load. This requires the gRPC-GCP extension to be enabled.
1679+
*
1680+
* <p>Dynamic channel pooling is disabled by default. Use this method to explicitly enable it.
1681+
* Note that calling {@link #setNumChannels(int)} will disable dynamic channel pooling even if
1682+
* this method was called.
1683+
*/
1684+
public Builder enableDynamicChannelPool() {
1685+
this.dynamicChannelPoolEnabled = true;
1686+
return this;
1687+
}
1688+
1689+
/**
1690+
* Disables dynamic channel pooling. When disabled, the client will use a static number of
1691+
* channels as configured by {@link #setNumChannels(int)}.
1692+
*
1693+
* <p>Dynamic channel pooling is disabled by default, so this method is typically not needed
1694+
* unless you want to explicitly disable it after enabling it.
1695+
*/
1696+
public Builder disableDynamicChannelPool() {
1697+
this.dynamicChannelPoolEnabled = false;
1698+
return this;
1699+
}
1700+
1701+
/**
1702+
* Sets the channel pool options for dynamic channel pooling. Use this to configure the dynamic
1703+
* channel pool behavior when {@link #enableDynamicChannelPool()} is enabled.
1704+
*
1705+
* <p>If not set, Spanner-specific defaults will be used (see {@link
1706+
* #createDefaultDynamicChannelPoolOptions()}).
1707+
*
1708+
* <p>Example usage:
1709+
*
1710+
* <pre>{@code
1711+
* SpannerOptions options = SpannerOptions.newBuilder()
1712+
* .setProjectId("my-project")
1713+
* .enableDynamicChannelPool()
1714+
* .setGcpChannelPoolOptions(
1715+
* GcpChannelPoolOptions.newBuilder()
1716+
* .setMaxSize(15)
1717+
* .setMinSize(3)
1718+
* .setInitSize(5)
1719+
* .setDynamicScaling(10, 30, Duration.ofMinutes(5))
1720+
* .build())
1721+
* .build();
1722+
* }</pre>
1723+
*
1724+
* @param gcpChannelPoolOptions the channel pool options to use
1725+
* @return this builder for chaining
1726+
*/
1727+
public Builder setGcpChannelPoolOptions(GcpChannelPoolOptions gcpChannelPoolOptions) {
1728+
this.gcpChannelPoolOptions = Preconditions.checkNotNull(gcpChannelPoolOptions);
1729+
return this;
1730+
}
1731+
15811732
/**
15821733
* Sets the host of an emulator to use. By default the value is read from an environment
15831734
* variable. If the environment variable is not set, this will be <code>null</code>.
@@ -1990,6 +2141,26 @@ public GcpManagedChannelOptions getGrpcGcpOptions() {
19902141
return grpcGcpOptions;
19912142
}
19922143

2144+
/**
2145+
* Returns whether dynamic channel pooling is enabled. Dynamic channel pooling is disabled by
2146+
* default. Use {@link Builder#enableDynamicChannelPool()} to explicitly enable it. Note that
2147+
* calling {@link Builder#setNumChannels(int)} will disable dynamic channel pooling even if it was
2148+
* explicitly enabled.
2149+
*/
2150+
public boolean isDynamicChannelPoolEnabled() {
2151+
return dynamicChannelPoolEnabled;
2152+
}
2153+
2154+
/**
2155+
* Returns the channel pool options for dynamic channel pooling. If no options were explicitly
2156+
* set, returns the Spanner-specific defaults.
2157+
*
2158+
* @see #createDefaultDynamicChannelPoolOptions()
2159+
*/
2160+
public GcpChannelPoolOptions getGcpChannelPoolOptions() {
2161+
return gcpChannelPoolOptions;
2162+
}
2163+
19932164
public boolean isAutoThrottleAdministrativeRequests() {
19942165
return autoThrottleAdministrativeRequests;
19952166
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ public class GapicSpannerRpc implements SpannerRpc {
281281
private final boolean endToEndTracingEnabled;
282282
private final int numChannels;
283283
private final boolean isGrpcGcpExtensionEnabled;
284+
private final boolean isDynamicChannelPoolEnabled;
284285

285286
private final GrpcCallContext baseGrpcCallContext;
286287

@@ -337,6 +338,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
337338
this.endToEndTracingEnabled = options.isEndToEndTracingEnabled();
338339
this.numChannels = options.getNumChannels();
339340
this.isGrpcGcpExtensionEnabled = options.isGrpcGcpExtensionEnabled();
341+
this.isDynamicChannelPoolEnabled = options.isDynamicChannelPoolEnabled();
340342
this.baseGrpcCallContext = createBaseCallContext();
341343

342344
if (initializeStubs) {
@@ -475,7 +477,8 @@ public GapicSpannerRpc(final SpannerOptions options) {
475477
.withCheckInterval(pdmlSettings.getStreamWatchdogCheckInterval()));
476478
}
477479
this.partitionedDmlStub =
478-
GrpcSpannerStubWithStubSettingsAndClientContext.create(pdmlSettings.build());
480+
GrpcSpannerStubWithStubSettingsAndClientContext.create(
481+
pdmlSettings.build(), clientContext);
479482
this.instanceAdminStubSettings =
480483
options.getInstanceAdminStubSettings().toBuilder()
481484
.setTransportChannelProvider(channelProvider)
@@ -569,10 +572,14 @@ private static String parseGrpcGcpApiConfig() {
569572
}
570573
}
571574

572-
// Enhance metric options for gRPC-GCP extension.
573-
private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions options) {
575+
// Enhance gRPC-GCP options with metrics and dynamic channel pool configuration.
576+
private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerOptions options) {
574577
GcpManagedChannelOptions grpcGcpOptions =
575578
MoreObjects.firstNonNull(options.getGrpcGcpOptions(), new GcpManagedChannelOptions());
579+
GcpManagedChannelOptions.Builder optionsBuilder =
580+
GcpManagedChannelOptions.newBuilder(grpcGcpOptions);
581+
582+
// Configure metrics options with OpenTelemetry meter
576583
GcpMetricsOptions metricsOptions =
577584
MoreObjects.firstNonNull(
578585
grpcGcpOptions.getMetricsOptions(), GcpMetricsOptions.newBuilder().build());
@@ -581,9 +588,21 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions
581588
if (metricsOptions.getNamePrefix().equals("")) {
582589
metricsOptionsBuilder.withNamePrefix("cloud.google.com/java/spanner/gcp-channel-pool/");
583590
}
584-
return GcpManagedChannelOptions.newBuilder(grpcGcpOptions)
585-
.withMetricsOptions(metricsOptionsBuilder.build())
586-
.build();
591+
// Pass OpenTelemetry meter to grpc-gcp for channel pool metrics
592+
if (metricsOptions.getOpenTelemetryMeter() == null) {
593+
metricsOptionsBuilder.withOpenTelemetryMeter(
594+
options.getOpenTelemetry().getMeter("com.google.cloud.spanner"));
595+
}
596+
optionsBuilder.withMetricsOptions(metricsOptionsBuilder.build());
597+
598+
// Configure dynamic channel pool options if enabled.
599+
// Uses the GcpChannelPoolOptions from SpannerOptions, which contains Spanner-specific defaults
600+
// or user-provided configuration.
601+
if (options.isDynamicChannelPoolEnabled()) {
602+
optionsBuilder.withChannelPoolOptions(options.getGcpChannelPoolOptions());
603+
}
604+
605+
return optionsBuilder.build();
587606
}
588607

589608
@SuppressWarnings("rawtypes")
@@ -595,7 +614,11 @@ private static void maybeEnableGrpcGcpExtension(
595614
}
596615

597616
final String jsonApiConfig = parseGrpcGcpApiConfig();
598-
final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetrics(options);
617+
final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetricsAndDcp(options);
618+
619+
// When dynamic channel pool is enabled, use the DCP initial size as the pool size.
620+
// When disabled, use the explicitly configured numChannels.
621+
final int poolSize = options.isDynamicChannelPoolEnabled() ? 0 : options.getNumChannels();
599622

600623
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> apiFunction =
601624
channelBuilder -> {
@@ -605,7 +628,7 @@ private static void maybeEnableGrpcGcpExtension(
605628
return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder)
606629
.withApiConfigJsonString(jsonApiConfig)
607630
.withOptions(grpcGcpOptions)
608-
.setPoolSize(options.getNumChannels());
631+
.setPoolSize(poolSize);
609632
};
610633

611634
// Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1.
@@ -2060,20 +2083,34 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20602083
if (affinity != null) {
20612084
if (this.isGrpcGcpExtensionEnabled) {
20622085
// Set channel affinity in gRPC-GCP.
2063-
// Compute bounded channel hint to prevent gRPC-GCP affinity map from getting unbounded.
2064-
int boundedChannelHint = affinity.intValue() % this.numChannels;
2086+
String affinityKey;
2087+
if (this.isDynamicChannelPoolEnabled) {
2088+
// When dynamic channel pooling is enabled, we use the raw affinity value as the key.
2089+
// This allows grpc-gcp to use round-robin for new keys, enabling new channels
2090+
// (created during scale-up) to receive requests. The affinity key lifetime setting
2091+
// ensures the affinity map doesn't grow unbounded.
2092+
affinityKey = String.valueOf(affinity);
2093+
} else {
2094+
// When DCP is disabled, compute bounded channel hint to prevent
2095+
// gRPC-GCP affinity map from getting unbounded.
2096+
int boundedChannelHint = affinity.intValue() % this.numChannels;
2097+
affinityKey = String.valueOf(boundedChannelHint);
2098+
}
20652099
context =
20662100
context.withCallOptions(
2067-
context
2068-
.getCallOptions()
2069-
.withOption(
2070-
GcpManagedChannel.AFFINITY_KEY, String.valueOf(boundedChannelHint)));
2101+
context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey));
20712102
} else {
20722103
// Set channel affinity in GAX.
20732104
context = context.withChannelAffinity(affinity.intValue());
20742105
}
20752106
}
2076-
int requestIdChannel = convertToRequestIdChannelNumber(affinity);
2107+
// When grpc-gcp extension with dynamic channel pooling is enabled, the actual channel ID
2108+
// will be set by RequestIdInterceptor after grpc-gcp selects the channel.
2109+
// Set to 0 (unknown) here as a placeholder.
2110+
int requestIdChannel =
2111+
(this.isGrpcGcpExtensionEnabled && this.isDynamicChannelPoolEnabled)
2112+
? 0
2113+
: convertToRequestIdChannelNumber(affinity);
20772114
if (requestId == null) {
20782115
requestId = requestIdCreator.nextRequestId(requestIdChannel);
20792116
} else {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/RequestIdInterceptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.cloud.spanner.XGoogSpannerRequestId.REQUEST_ID_CALL_OPTIONS_KEY;
2020
import static com.google.cloud.spanner.XGoogSpannerRequestId.REQUEST_ID_HEADER_KEY;
2121

22+
import com.google.cloud.grpc.GcpManagedChannel;
2223
import com.google.cloud.spanner.XGoogSpannerRequestId;
2324
import io.grpc.CallOptions;
2425
import io.grpc.Channel;
@@ -47,6 +48,15 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
4748
public void start(Listener<RespT> responseListener, Metadata headers) {
4849
XGoogSpannerRequestId requestId = callOptions.getOption(REQUEST_ID_CALL_OPTIONS_KEY);
4950
if (requestId != null) {
51+
// If grpc-gcp has set the actual channel ID, use it to update the request ID.
52+
// This provides the real channel ID used after channel selection, especially
53+
// important when dynamic channel pooling is enabled.
54+
Integer gcpChannelId = callOptions.getOption(GcpManagedChannel.CHANNEL_ID_KEY);
55+
if (gcpChannelId != null) {
56+
// Channel IDs from grpc-gcp are 0-based, add 1 to match request ID convention
57+
// where 0 means unknown and >0 means a known channel.
58+
requestId.setChannelId(gcpChannelId + 1);
59+
}
5060
requestId.incrementAttempt();
5161
headers.put(REQUEST_ID_HEADER_KEY, requestId.getHeaderValue());
5262
}

0 commit comments

Comments
 (0)