diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java index da7bd4f956..d9145068fc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java @@ -18,6 +18,8 @@ import com.google.api.core.InternalApi; import com.google.api.gax.grpc.ChannelFactory; import com.google.api.gax.grpc.ChannelPrimer; +import com.google.cloud.bigtable.data.v2.stub.BigtableChannelPrimer; +import com.google.cloud.bigtable.gaxx.grpc.ChannelPoolHealthChecker.ProbeResult; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -31,9 +33,11 @@ import io.grpc.MethodDescriptor; import io.grpc.Status; import java.io.IOException; +import java.time.Clock; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -62,11 +66,11 @@ public class BigtableChannelPool extends ManagedChannel { private final BigtableChannelPoolSettings settings; private final ChannelFactory channelFactory; - private final ChannelPrimer channelPrimer; + private ChannelPrimer channelPrimer; private final ScheduledExecutorService executor; - private final Object entryWriteLock = new Object(); @VisibleForTesting final AtomicReference> entries = new AtomicReference<>(); + private ChannelPoolHealthChecker channelPoolHealthChecker; private final AtomicInteger indexTicker = new AtomicInteger(); private final String authority; @@ -96,6 +100,11 @@ public static BigtableChannelPool create( this.settings = settings; this.channelFactory = channelFactory; this.channelPrimer = channelPrimer; + Clock systemClock = Clock.systemUTC(); + this.channelPoolHealthChecker = + new ChannelPoolHealthChecker( + () -> entries.get(), (BigtableChannelPrimer) channelPrimer, executor, systemClock); + this.channelPoolHealthChecker.start(); ImmutableList.Builder initialListBuilder = ImmutableList.builder(); @@ -445,15 +454,27 @@ static class Entry { private final AtomicInteger maxOutstanding = new AtomicInteger(); + @VisibleForTesting + final ConcurrentLinkedQueue probeHistory = new ConcurrentLinkedQueue<>(); + + // we keep both so that we don't have to check size() on the ConcurrentLinkedQueue all the time + AtomicInteger failedProbesInWindow = new AtomicInteger(); + AtomicInteger successfulProbesInWindow = new AtomicInteger(); + // Flag that the channel should be closed once all of the outstanding RPC complete. private final AtomicBoolean shutdownRequested = new AtomicBoolean(); // Flag that the channel has been closed. private final AtomicBoolean shutdownInitiated = new AtomicBoolean(); - private Entry(ManagedChannel channel) { + @VisibleForTesting + Entry(ManagedChannel channel) { this.channel = channel; } + ManagedChannel getManagedChannel() { + return this.channel; + } + int getAndResetMaxOutstanding() { return maxOutstanding.getAndSet(outstandingRpcs.get()); } @@ -468,7 +489,7 @@ private boolean retain() { // register desire to start RPC int currentOutstanding = outstandingRpcs.incrementAndGet(); - // Rough book keeping + // Rough bookkeeping int prevMax = maxOutstanding.get(); if (currentOutstanding > prevMax) { maxOutstanding.incrementAndGet(); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPoolHealthChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPoolHealthChecker.java new file mode 100644 index 0000000000..b6c6c62157 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPoolHealthChecker.java @@ -0,0 +1,220 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.grpc; + +import com.google.api.core.SettableApiFuture; +import com.google.bigtable.v2.PingAndWarmResponse; +import com.google.cloud.bigtable.data.v2.stub.BigtableChannelPrimer; +import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPool.Entry; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +/** Stub for a class that will manage the health checking in the BigtableChannelPool */ +public class ChannelPoolHealthChecker { + + // Configuration constants + private static final Duration WINDOW_DURATION = Duration.ofMinutes(5); + static final Duration PROBE_RATE = Duration.ofSeconds(30); + @VisibleForTesting static final Duration PROBE_DEADLINE = Duration.ofMillis(500); + private static final Duration MIN_EVICTION_INTERVAL = Duration.ofMinutes(10); + private static final int MIN_PROBES_FOR_EVALUATION = 4; + private static final int SINGLE_CHANNEL_FAILURE_PERCENT_THRESHOLD = 60; + private static final int POOLWIDE_BAD_CHANNEL_CIRCUITBREAKER_PERCENT = 70; + + /** Inner class to represent the result of a single probe. */ + static class ProbeResult { + final Instant startTime; + final boolean success; + + ProbeResult(Instant startTime, boolean success) { + this.startTime = startTime; + this.success = success; + } + + public boolean isSuccessful() { + return success; + } + } + + // Class fields + private final Supplier> entrySupplier; + private Instant lastEviction; + private ScheduledExecutorService executor; + + private BigtableChannelPrimer channelPrimer; + + private final Clock clock; + + /** Constructor for the pool health checker. */ + public ChannelPoolHealthChecker( + Supplier> entrySupplier, + BigtableChannelPrimer channelPrimer, + ScheduledExecutorService executor, + Clock clock) { + this.entrySupplier = entrySupplier; + this.lastEviction = Instant.MIN; + this.channelPrimer = channelPrimer; + this.executor = executor; + this.clock = clock; + } + + void start() { + Duration initialDelayProbe = + Duration.ofMillis(ThreadLocalRandom.current().nextLong(PROBE_RATE.toMillis())); + executor.scheduleAtFixedRate( + this::runProbes, + initialDelayProbe.toMillis(), + PROBE_RATE.toMillis(), + TimeUnit.MILLISECONDS); + Duration initialDelayDetect = + Duration.ofMillis(ThreadLocalRandom.current().nextLong(PROBE_RATE.toMillis())); + executor.scheduleAtFixedRate( + this::detectAndRemoveOutlierEntries, + initialDelayDetect.toMillis(), + PROBE_RATE.toMillis(), + TimeUnit.MILLISECONDS); + } + + /** Stop running health checking (No-op stub) */ + public void stop() { + executor.shutdownNow(); + } + + /** Runs probes on all the channels in the pool. */ + @VisibleForTesting + void runProbes() { + // Method stub, no operation. + for (Entry entry : this.entrySupplier.get()) { + Instant startTime = clock.instant(); + SettableApiFuture probeFuture = + channelPrimer.sendPrimeRequestsAsync(entry.getManagedChannel()); + probeFuture.addListener(() -> onComplete(entry, startTime, probeFuture), executor); + } + } + + /** Callback that will update Entry data on probe complete. */ + @VisibleForTesting + void onComplete( + Entry entry, Instant startTime, SettableApiFuture probeFuture) { + boolean success; + try { + probeFuture.get(PROBE_DEADLINE.toMillis(), TimeUnit.MILLISECONDS); + success = true; + } catch (Exception e) { + success = false; + } + addProbeResult(entry, new ProbeResult(startTime, success)); + } + + @VisibleForTesting + void addProbeResult(Entry entry, ProbeResult result) { + entry.probeHistory.add(result); + if (result.isSuccessful()) { + entry.successfulProbesInWindow.incrementAndGet(); + } else { + entry.failedProbesInWindow.incrementAndGet(); + } + } + + @VisibleForTesting + void pruneHistoryFor(Entry entry) { + Instant windowStart = clock.instant().minus(WINDOW_DURATION); + while (!entry.probeHistory.isEmpty() + && entry.probeHistory.peek().startTime.isBefore(windowStart)) { + ProbeResult removedResult = entry.probeHistory.poll(); + if (removedResult.isSuccessful()) { + entry.successfulProbesInWindow.decrementAndGet(); + } else { + entry.failedProbesInWindow.decrementAndGet(); + } + } + } + + /** Checks if a single entry is currently healthy based on its probe history. */ + @VisibleForTesting + boolean isEntryHealthy(Entry entry) { + pruneHistoryFor(entry); // Ensure window is current before calculation + + int failedProbes = entry.failedProbesInWindow.get(); + int totalProbes = failedProbes + entry.successfulProbesInWindow.get(); + + if (totalProbes < MIN_PROBES_FOR_EVALUATION) { + return true; // Not enough data, assume healthy. + } + + double failureRate = ((double) failedProbes / totalProbes) * 100.0; + return failureRate < SINGLE_CHANNEL_FAILURE_PERCENT_THRESHOLD; + } + + /** + * Finds a channel that is an outlier in terms of health. + * + * @return Entry + */ + @Nullable + @VisibleForTesting + Entry findOutlierEntry() { + if (lastEviction.plus(WINDOW_DURATION).isAfter(clock.instant())) { + return null; + } + + List unhealthyEntries = + this.entrySupplier.get().stream() + .peek(this::pruneHistoryFor) + .filter(entry -> !isEntryHealthy(entry)) + .collect(Collectors.toList()); + + int poolSize = this.entrySupplier.get().size(); + if (unhealthyEntries.isEmpty() || poolSize == 0) { + return null; + } + + // If more than CIRCUITBREAKER_PERCENT of channels are unhealthy we won't evict + double unhealthyPercent = (double) unhealthyEntries.size() / poolSize * 100.0; + if (unhealthyPercent >= POOLWIDE_BAD_CHANNEL_CIRCUITBREAKER_PERCENT) { + return null; + } + + return unhealthyEntries.stream() + .max(Comparator.comparingInt(entry -> entry.failedProbesInWindow.get())) + .orElse(null); + } + + /** Periodically detects and removes outlier channels from the pool. (No-op stub) */ + @VisibleForTesting + void detectAndRemoveOutlierEntries() { + if (clock.instant().isBefore(lastEviction.plus(MIN_EVICTION_INTERVAL))) { + // Primitive but effective rate-limiting. + return; + } + Entry outlier = findOutlierEntry(); + if (outlier != null) { + this.lastEviction = clock.instant(); + outlier.getManagedChannel().enterIdle(); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPoolHealthCheckerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPoolHealthCheckerTest.java new file mode 100644 index 0000000000..a62cd4d10e --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPoolHealthCheckerTest.java @@ -0,0 +1,192 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.grpc; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.SettableApiFuture; +import com.google.bigtable.v2.PingAndWarmResponse; +import com.google.cloud.bigtable.data.v2.stub.BigtableChannelPrimer; +import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPool.Entry; +import com.google.cloud.bigtable.gaxx.grpc.ChannelPoolHealthChecker.ProbeResult; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.testing.TestingExecutors; +import io.grpc.ManagedChannel; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class ChannelPoolHealthCheckerTest { + @Rule public MockitoRule mockitoRule = MockitoJUnit.rule(); + @Mock private BigtableChannelPrimer mockPrimer; + private ListeningScheduledExecutorService executor; + @Mock private Clock mockClock; + private ChannelPoolHealthChecker healthChecker; + private List channelList; + + @Before + public void setUp() { + executor = TestingExecutors.sameThreadScheduledExecutor(); + channelList = new ArrayList<>(); + Supplier> entrySupplier = () -> ImmutableList.copyOf(channelList); + + healthChecker = new ChannelPoolHealthChecker(entrySupplier, mockPrimer, executor, mockClock); + + // Default the clock to a fixed time + Mockito.when(mockClock.instant()).thenReturn(Instant.parse("2025-08-01T10:00:00Z")); + } + + // Helper method to create test entries + private Entry createTestEntry() { + ManagedChannel mockChannel = Mockito.mock(ManagedChannel.class); + return new Entry(mockChannel); + } + + @After + public void tearDown() { + executor.shutdownNow(); + } + + @Test + public void testOnComplete_successUpdatesCounters() { + Entry entry = createTestEntry(); + channelList.add(entry); + + SettableApiFuture successFuture = SettableApiFuture.create(); + Mockito.when(mockPrimer.sendPrimeRequestsAsync(entry.getManagedChannel())) + .thenReturn(successFuture); + + healthChecker.runProbes(); + + successFuture.set(PingAndWarmResponse.getDefaultInstance()); + + assertThat(entry.successfulProbesInWindow.get()).isEqualTo(1); + assertThat(entry.failedProbesInWindow.get()).isEqualTo(0); + } + + @Test + public void testOnComplete_cancellationIsFailure() { + Entry entry = createTestEntry(); + channelList.add(entry); + + SettableApiFuture hangingFuture = SettableApiFuture.create(); + Mockito.when(mockPrimer.sendPrimeRequestsAsync(entry.getManagedChannel())) + .thenReturn(hangingFuture); + + healthChecker.runProbes(); + + hangingFuture.cancel(true); + + assertThat(entry.failedProbesInWindow.get()).isEqualTo(1); + assertThat(entry.successfulProbesInWindow.get()).isEqualTo(0); + } + + @Test + public void testPruning_removesOldProbesAndCounters() { + Entry entry = createTestEntry(); + healthChecker.addProbeResult(entry, new ProbeResult(mockClock.instant(), false)); + assertThat(entry.failedProbesInWindow.get()).isEqualTo(1); + + Instant newTime = mockClock.instant().plus(Duration.ofMinutes(6)); + Mockito.when(mockClock.instant()).thenReturn(newTime); + healthChecker.pruneHistoryFor(entry); // Manually call for direct testing + + assertThat(entry.probeHistory).isEmpty(); + assertThat(entry.failedProbesInWindow.get()).isEqualTo(0); + } + + @Test + public void testEviction_selectsUnhealthyChannel() { + Entry healthyEntry = createTestEntry(); + Entry badEntry = createTestEntry(); + Entry worseEntry = createTestEntry(); + + // A channel needs at least 4 probes to be considered for eviction + healthyEntry.successfulProbesInWindow.set(10); // 0% failure -> healthy + badEntry.failedProbesInWindow.set(3); // 3/13 = 23% failure -> healthy + badEntry.successfulProbesInWindow.set(10); + worseEntry.failedProbesInWindow.set(10); // 10/10 = 100% failure -> unhealthy + + channelList.addAll(Arrays.asList(healthyEntry, badEntry, worseEntry)); + + healthChecker.detectAndRemoveOutlierEntries(); + + // Assert that only the unhealthy channel was evicted + Mockito.verify(worseEntry.getManagedChannel()).enterIdle(); + Mockito.verify(badEntry.getManagedChannel(), Mockito.never()).enterIdle(); + Mockito.verify(healthyEntry.getManagedChannel(), Mockito.never()).enterIdle(); + } + + @Test + public void testEviction_selectsMostUnhealthyChannel() { + Entry healthyEntry = createTestEntry(); + Entry badEntry = createTestEntry(); + Entry worseEntry = createTestEntry(); + + // A channel needs at least 4 probes to be considered for eviction + healthyEntry.successfulProbesInWindow.set(10); // 0% failure -> healthy + badEntry.failedProbesInWindow.set(8); // 8/13 = 61% failure -> unhealthy + badEntry.successfulProbesInWindow.set(10); + worseEntry.failedProbesInWindow.set(10); // 10/10 = 100% failure -> most unhealthy + + channelList.addAll(Arrays.asList(healthyEntry, badEntry, worseEntry)); + + healthChecker.detectAndRemoveOutlierEntries(); + + // Assert that only the unhealthy channel was evicted + Mockito.verify(worseEntry.getManagedChannel()).enterIdle(); + Mockito.verify(badEntry.getManagedChannel(), Mockito.never()).enterIdle(); + Mockito.verify(healthyEntry.getManagedChannel(), Mockito.never()).enterIdle(); + } + + @Test + public void testCircuitBreaker_preventsEviction() { + Entry entry1 = createTestEntry(); + Entry entry2 = createTestEntry(); + Entry entry3 = createTestEntry(); + channelList.addAll(Arrays.asList(entry1, entry2, entry3)); + + // Set failure counts to exceed 60% SINGLE_CHANNEL_FAILURE_PERCENT_THRESHOLD with at least + // MIN_PROBES_FOR_EVALUATION (4) failures + for (Entry entry : channelList) { + entry.failedProbesInWindow.set(4); // 4 failures, 0 successes = 100% failure rate + } + + healthChecker.detectAndRemoveOutlierEntries(); + + // The circuit breaker should engage because 3/3 channels (100%) are unhealthy, + // which is greater than the 70% POOLWIDE_BAD_CHANNEL_CIRCUITBREAKER_PERCENT threshold. + Mockito.verify(entry1.getManagedChannel(), Mockito.never()).enterIdle(); + Mockito.verify(entry2.getManagedChannel(), Mockito.never()).enterIdle(); + Mockito.verify(entry3.getManagedChannel(), Mockito.never()).enterIdle(); + } +}