From ae0ed5a05d5222bbafec6716a2e7a15ba02bd763 Mon Sep 17 00:00:00 2001 From: Weihan Kong Date: Mon, 6 Oct 2025 10:08:18 +0800 Subject: [PATCH] Add CSM for batch write flow control --- .../RateLimitingServerStreamingCallable.java | 28 ++++++++++++---- .../data/v2/stub/metrics/BigtableTracer.java | 22 +++++++++++++ .../stub/metrics/BuiltinMetricsConstants.java | 25 ++++++++++++++- .../v2/stub/metrics/BuiltinMetricsTracer.java | 32 ++++++++++++++++++- .../metrics/BuiltinMetricsTracerFactory.java | 22 ++++++++++++- .../data/v2/stub/metrics/CompositeTracer.java | 15 +++++++++ 6 files changed, 134 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index c3b0f94ec7..6fd5884db3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import javax.annotation.Nonnull; +import javax.annotation.Nullable; class RateLimitingServerStreamingCallable extends ServerStreamingCallable { @@ -69,6 +70,8 @@ class RateLimitingServerStreamingCallable private final ServerStreamingCallable innerCallable; + private BigtableTracer bigtableTracer; + RateLimitingServerStreamingCallable( @Nonnull ServerStreamingCallable innerCallable) { this.limiter = new ConditionalRateLimiter(DEFAULT_QPS); @@ -84,8 +87,8 @@ public void call( limiter.acquire(); stopwatch.stop(); if (context.getTracer() instanceof BigtableTracer) { - ((BigtableTracer) context.getTracer()) - .batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + bigtableTracer = (BigtableTracer) context.getTracer(); + bigtableTracer.batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS)); } RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver); innerCallable.call(request, innerObserver, context); @@ -158,12 +161,19 @@ public double getRate() { * @param rate The new rate of the rate limiter. * @param period The period during which rate should not be updated again and the rate limiter * should not be disabled. + * @param bigtableTracer The tracer for exporting client-side metrics. */ - public void trySetRate(double rate, Duration period) { + public void trySetRate( + double rate, + Duration period, + BigtableTracer bigtableTracer, + double cappedFactor, + @Nullable Throwable status) { Instant nextTime = nextRateUpdateTime.get(); Instant now = Instant.now(); if (now.isBefore(nextTime)) { + bigtableTracer.addBatchWriteFlowControlFactor(cappedFactor, status, false); return; } @@ -171,6 +181,7 @@ public void trySetRate(double rate, Duration period) { if (!nextRateUpdateTime.compareAndSet(nextTime, newNextTime)) { // Someone else updated it already. + bigtableTracer.addBatchWriteFlowControlFactor(cappedFactor, status, false); return; } final double oldRate = limiter.getRate(); @@ -183,6 +194,8 @@ public void trySetRate(double rate, Duration period) { + " with period " + period.getSeconds() + " seconds."); + bigtableTracer.setBatchWriteFlowControlTargetQps(rate); + bigtableTracer.addBatchWriteFlowControlFactor(cappedFactor, status, true); } @VisibleForTesting @@ -236,7 +249,8 @@ protected void onResponseImpl(MutateRowsResponse response) { RateLimitInfo info = response.getRateLimitInfo(); updateQps( info.getFactor(), - Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); + Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())), + null); } else { limiter.tryDisable(); } @@ -250,7 +264,7 @@ protected void onErrorImpl(Throwable t) { if (t instanceof DeadlineExceededException || t instanceof UnavailableException || t instanceof ResourceExhaustedException) { - updateQps(MIN_FACTOR, DEFAULT_PERIOD); + updateQps(MIN_FACTOR, DEFAULT_PERIOD, t); } outerObserver.onError(t); } @@ -260,11 +274,11 @@ protected void onCompleteImpl() { outerObserver.onComplete(); } - private void updateQps(double factor, Duration period) { + private void updateQps(double factor, Duration period, @Nullable Throwable t) { double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR); double currentRate = limiter.getRate(); double cappedRate = Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS); - limiter.trySetRate(cappedRate, period); + limiter.trySetRate(cappedRate, period, bigtableTracer, cappedFactor, t); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 083b5dabc9..524b94ddd5 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.data.v2.stub.metrics; import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.tracing.ApiTracer; import com.google.api.gax.tracing.BaseApiTracer; @@ -115,4 +116,25 @@ public void grpcMessageSent() { public void setTotalTimeoutDuration(Duration totalTimeoutDuration) { // noop } + + /** + * Record the target QPS for batch write flow control. + * + * @param targetQps The new target QPS for the client. + */ + @InternalApi + public void setBatchWriteFlowControlTargetQps(double targetQps) {} + + /** + * Record the factors received from server-side for batch write flow control. The factors are + * capped by min and max allowed factor values. Status and whether the factor was actually applied + * are also recorded. + * + * @param factor Capped factor from server-side. For non-OK response, min factor is used. + * @param status Status of the request. + * @param applied Whether the factor was actually applied. + */ + @InternalApi + public void addBatchWriteFlowControlFactor( + double factor, @Nullable Throwable status, boolean applied) {} } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java index 78ed689cc3..98bbef2d47 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java @@ -49,6 +49,7 @@ public class BuiltinMetricsConstants { static final AttributeKey METHOD_KEY = AttributeKey.stringKey("method"); static final AttributeKey STATUS_KEY = AttributeKey.stringKey("status"); static final AttributeKey CLIENT_UID_KEY = AttributeKey.stringKey("client_uid"); + static final AttributeKey APPLIED_KEY = AttributeKey.booleanKey("applied"); static final AttributeKey TRANSPORT_TYPE = AttributeKey.stringKey("transport_type"); static final AttributeKey TRANSPORT_REGION = AttributeKey.stringKey("transport_region"); @@ -70,6 +71,9 @@ public class BuiltinMetricsConstants { static final String REMAINING_DEADLINE_NAME = "remaining_deadline"; static final String CLIENT_BLOCKING_LATENCIES_NAME = "throttling_latencies"; static final String PER_CONNECTION_ERROR_COUNT_NAME = "per_connection_error_count"; + static final String BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME = + "batch_write_flow_control_target_qps"; + static final String BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME = "batch_write_flow_control_factor"; // Start allow list of metrics that will be exported as internal public static final Map> GRPC_METRICS = @@ -140,6 +144,9 @@ public class BuiltinMetricsConstants { 500_000.0, 1_000_000.0)); + private static final Aggregation AGGREGATION_BATCH_WRITE_FLOW_CONTROL_FACTOR_HISTOGRAM = + Aggregation.explicitBucketHistogram(ImmutableList.of(0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3)); + static final Set COMMON_ATTRIBUTES = ImmutableSet.of( BIGTABLE_PROJECT_ID_KEY, @@ -286,7 +293,23 @@ public static Map getAllViews() { .addAll(COMMON_ATTRIBUTES) .add(STREAMING_KEY, STATUS_KEY) .build()); - + defineView( + views, + BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME, + Aggregation.sum(), + InstrumentType.GAUGE, + "1", + ImmutableSet.builder().addAll(COMMON_ATTRIBUTES).build()); + defineView( + views, + BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME, + AGGREGATION_BATCH_WRITE_FLOW_CONTROL_FACTOR_HISTOGRAM, + InstrumentType.HISTOGRAM, + "1", + ImmutableSet.builder() + .addAll(COMMON_ATTRIBUTES) + .add(STATUS_KEY, APPLIED_KEY) + .build()); return views.build(); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index 1f95224185..78e6c8266e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -17,6 +17,7 @@ import static com.google.api.gax.tracing.ApiTracerFactory.OperationType; import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeDuration; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APPLIED_KEY; import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_NAME_KEY; import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLUSTER_ID_KEY; import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METHOD_KEY; @@ -41,6 +42,7 @@ import com.google.gson.reflect.TypeToken; import io.grpc.Deadline; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleGauge; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.LongCounter; import java.time.Duration; @@ -136,6 +138,8 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend private final DoubleHistogram remainingDeadlineHistogram; private final LongCounter connectivityErrorCounter; private final LongCounter retryCounter; + private final DoubleGauge batchWriteFlowControlTargetQps; + private final DoubleHistogram batchWriteFlowControlFactorHistogram; BuiltinMetricsTracer( OperationType operationType, @@ -150,7 +154,9 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend DoubleHistogram applicationBlockingLatenciesHistogram, DoubleHistogram deadlineHistogram, LongCounter connectivityErrorCounter, - LongCounter retryCounter) { + LongCounter retryCounter, + DoubleGauge batchWriteFlowControlTargetQps, + DoubleHistogram batchWriteFlowControlFactorHistogram) { this.operationType = operationType; this.spanName = spanName; this.baseAttributes = attributes; @@ -165,6 +171,8 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend this.remainingDeadlineHistogram = deadlineHistogram; this.connectivityErrorCounter = connectivityErrorCounter; this.retryCounter = retryCounter; + this.batchWriteFlowControlTargetQps = batchWriteFlowControlTargetQps; + this.batchWriteFlowControlFactorHistogram = batchWriteFlowControlFactorHistogram; } @Override @@ -496,4 +504,26 @@ private static double convertToMs(long nanoSeconds) { double toMs = 1e-6; return nanoSeconds * toMs; } + + @Override + public void setBatchWriteFlowControlTargetQps(double targetQps) { + Attributes attributes = baseAttributes.toBuilder().put(METHOD_KEY, spanName.toString()).build(); + + batchWriteFlowControlTargetQps.set(targetQps, attributes); + } + + @Override + public void addBatchWriteFlowControlFactor( + double factor, @Nullable Throwable status, boolean applied) { + String statusStr = status == null ? "OK" : Util.extractStatus(status); + + Attributes attributes = + baseAttributes.toBuilder() + .put(METHOD_KEY, spanName.toString()) + .put(STATUS_KEY, statusStr) + .put(APPLIED_KEY, applied) + .build(); + + batchWriteFlowControlFactorHistogram.record(factor, attributes); + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java index 174a023b6f..eb8089b1c6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java @@ -18,6 +18,8 @@ import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APPLICATION_BLOCKING_LATENCIES_NAME; import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ATTEMPT_LATENCIES2_NAME; import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ATTEMPT_LATENCIES_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME; import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_BLOCKING_LATENCIES_NAME; import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CONNECTIVITY_ERROR_COUNT_NAME; import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.FIRST_RESPONSE_LATENCIES_NAME; @@ -34,6 +36,7 @@ import com.google.api.gax.tracing.SpanName; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleGauge; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; @@ -61,6 +64,8 @@ public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory { private final DoubleHistogram remainingDeadlineHistogram; private final LongCounter connectivityErrorCounter; private final LongCounter retryCounter; + private final DoubleGauge batchWriteFlowControlTargetQps; + private final DoubleHistogram batchWriteFlowControlFactorHistogram; public static BuiltinMetricsTracerFactory create( OpenTelemetry openTelemetry, Attributes attributes) throws IOException { @@ -147,6 +152,19 @@ public static BuiltinMetricsTracerFactory create( .setDescription("The number of additional RPCs sent after the initial attempt.") .setUnit(COUNT) .build(); + batchWriteFlowControlTargetQps = + meter + .gaugeBuilder(BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME) + .setDescription("The current target QPS of the client under batch write flow control.") + .setUnit("1") + .build(); + batchWriteFlowControlFactorHistogram = + meter + .histogramBuilder(BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME) + .setDescription( + "The distribution of batch write flow control factors received from the server.") + .setUnit("1") + .build(); } @Override @@ -164,6 +182,8 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op applicationBlockingLatenciesHistogram, remainingDeadlineHistogram, connectivityErrorCounter, - retryCounter); + retryCounter, + batchWriteFlowControlTargetQps, + batchWriteFlowControlFactorHistogram); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index 5922530e8b..5a19767d27 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -266,4 +266,19 @@ public void setTotalTimeoutDuration(java.time.Duration totalTimeoutDuration) { tracer.setTotalTimeoutDuration(totalTimeoutDuration); } } + + @Override + public void setBatchWriteFlowControlTargetQps(double targetQps) { + for (BigtableTracer tracer : bigtableTracers) { + tracer.setBatchWriteFlowControlTargetQps(targetQps); + } + } + + @Override + public void addBatchWriteFlowControlFactor( + double factor, @Nullable Throwable t, boolean applied) { + for (BigtableTracer tracer : bigtableTracers) { + tracer.addBatchWriteFlowControlFactor(factor, t, applied); + } + } }