Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,27 @@
<method>*create*</method>
<to>*</to>
</difference>
<difference>
<differenceType>4001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer</className>
<to>com/google/api/gax/grpc/ChannelPrimer</to>
</difference>
<difference>
<differenceType>4001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/NoOpChannelPrimer</className>
<to>com/google/api/gax/grpc/ChannelPrimer</to>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool</className>
<method>*create*</method>
<to>*</to>
</difference>
<difference>
<!-- InternalApi was updated -->
<differenceType>7005</differenceType>
<className>com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider</className>
<method>*create*</method>
<to>*</to>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.auth.Credentials;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ClientContext;
import com.google.auth.Credentials;
Expand All @@ -34,6 +33,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider;
import com.google.cloud.bigtable.gaxx.grpc.BigtableTransportChannelProvider;
import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer;
import io.grpc.ManagedChannelBuilder;
import io.grpc.opentelemetry.GrpcOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.core.SettableApiFuture;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer;
import io.grpc.ManagedChannel;

@InternalApi
Expand All @@ -31,4 +33,11 @@ private NoOpChannelPrimer() {}
public void primeChannel(ManagedChannel managedChannel) {
// No op
}

@Override
public SettableApiFuture<PingAndWarmResponse> sendPrimeRequestsAsync(ManagedChannel var1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
public SettableApiFuture<PingAndWarmResponse> sendPrimeRequestsAsync(ManagedChannel var1) {
public SettableApiFuture<PingAndWarmResponse> sendPrimeRequestsAsync(ManagedChannel channel) {

SettableApiFuture future = SettableApiFuture.create();
future.set(PingAndWarmResponse.getDefaultInstance());
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.ChannelFactory;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.cloud.bigtable.gaxx.grpc.ChannelPoolHealthChecker.ProbeResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -31,9 +31,11 @@
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -62,11 +64,11 @@ public class BigtableChannelPool extends ManagedChannel {
private final BigtableChannelPoolSettings settings;
private final ChannelFactory channelFactory;

private final ChannelPrimer channelPrimer;
private ChannelPrimer channelPrimer;
private final ScheduledExecutorService executor;

private final Object entryWriteLock = new Object();
@VisibleForTesting final AtomicReference<ImmutableList<Entry>> entries = new AtomicReference<>();
private ChannelPoolHealthChecker channelPoolHealthChecker;
private final AtomicInteger indexTicker = new AtomicInteger();
private final String authority;

Expand Down Expand Up @@ -96,6 +98,10 @@ public static BigtableChannelPool create(
this.settings = settings;
this.channelFactory = channelFactory;
this.channelPrimer = channelPrimer;
Clock systemClock = Clock.systemUTC();
this.channelPoolHealthChecker =
new ChannelPoolHealthChecker(() -> entries.get(), channelPrimer, executor, systemClock);
this.channelPoolHealthChecker.start();

ImmutableList.Builder<Entry> initialListBuilder = ImmutableList.builder();

Expand Down Expand Up @@ -445,15 +451,27 @@ static class Entry {

private final AtomicInteger maxOutstanding = new AtomicInteger();

@VisibleForTesting
final ConcurrentLinkedQueue<ProbeResult> probeHistory = new ConcurrentLinkedQueue<>();

// we keep both so that we don't have to check size() on the ConcurrentLinkedQueue all the time
AtomicInteger failedProbesInWindow = new AtomicInteger();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

AtomicInteger successfulProbesInWindow = new AtomicInteger();

// Flag that the channel should be closed once all of the outstanding RPC complete.
private final AtomicBoolean shutdownRequested = new AtomicBoolean();
// Flag that the channel has been closed.
private final AtomicBoolean shutdownInitiated = new AtomicBoolean();

private Entry(ManagedChannel channel) {
@VisibleForTesting
Entry(ManagedChannel channel) {
this.channel = channel;
}

ManagedChannel getManagedChannel() {
return this.channel;
}

int getAndResetMaxOutstanding() {
return maxOutstanding.getAndSet(outstandingRpcs.get());
}
Expand All @@ -468,7 +486,7 @@ private boolean retain() {
// register desire to start RPC
int currentOutstanding = outstandingRpcs.incrementAndGet();

// Rough book keeping
// Rough bookkeeping
int prevMax = maxOutstanding.get();
if (currentOutstanding > prevMax) {
maxOutstanding.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.ChannelFactory;
import com.google.api.gax.grpc.ChannelPoolSettings;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.TransportChannel;
Expand Down
Loading
Loading