Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

class RateLimitingServerStreamingCallable
extends ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> {
Expand Down Expand Up @@ -69,6 +70,8 @@ class RateLimitingServerStreamingCallable

private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable;

private BigtableTracer bigtableTracer;

RateLimitingServerStreamingCallable(
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
this.limiter = new ConditionalRateLimiter(DEFAULT_QPS);
Expand All @@ -84,8 +87,8 @@ public void call(
limiter.acquire();
stopwatch.stop();
if (context.getTracer() instanceof BigtableTracer) {
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
bigtableTracer = (BigtableTracer) context.getTracer();
bigtableTracer.batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
}
RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver);
innerCallable.call(request, innerObserver, context);
Expand Down Expand Up @@ -158,19 +161,27 @@ public double getRate() {
* @param rate The new rate of the rate limiter.
* @param period The period during which rate should not be updated again and the rate limiter
* should not be disabled.
* @param bigtableTracer The tracer for exporting client-side metrics.
*/
public void trySetRate(double rate, Duration period) {
public void trySetRate(
double rate,
Duration period,
BigtableTracer bigtableTracer,
double cappedFactor,
@Nullable Throwable status) {
Instant nextTime = nextRateUpdateTime.get();
Instant now = Instant.now();

if (now.isBefore(nextTime)) {
bigtableTracer.addBatchWriteFlowControlFactor(cappedFactor, status, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check here that bigtableTracer is not null? I don't think you need status, it doesn't mean anything for this metric? Also the QPS is not updated, do we want to update the metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good point, I guess in case context.getTracer() instanceof BigtableTracer is false? How do we deal with it now? Looks like we don't log error?

Recall that status is to record to indicate whether the factor is from the server or due to errors. One theory for the original issue was that a bunch of errors results in min_factor and throttles the client. We'd want to confirm that theory and know which error it is in the future.

We want to report this factor distribution even if it's not applied to target QPS, thus the field "applied" in the metric.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We skip exporting the tracer if it's not an instance of BigtableTracer ( which should never happen). I think you can add a sanity check for not null.

return;
}

Instant newNextTime = now.plusSeconds(period.getSeconds());

if (!nextRateUpdateTime.compareAndSet(nextTime, newNextTime)) {
// Someone else updated it already.
bigtableTracer.addBatchWriteFlowControlFactor(cappedFactor, status, false);
return;
}
final double oldRate = limiter.getRate();
Expand All @@ -183,6 +194,8 @@ public void trySetRate(double rate, Duration period) {
+ " with period "
+ period.getSeconds()
+ " seconds.");
bigtableTracer.setBatchWriteFlowControlTargetQps(rate);
bigtableTracer.addBatchWriteFlowControlFactor(cappedFactor, status, true);
}

@VisibleForTesting
Expand Down Expand Up @@ -236,7 +249,8 @@ protected void onResponseImpl(MutateRowsResponse response) {
RateLimitInfo info = response.getRateLimitInfo();
updateQps(
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())),
null);
} else {
limiter.tryDisable();
}
Expand All @@ -250,7 +264,7 @@ protected void onErrorImpl(Throwable t) {
if (t instanceof DeadlineExceededException
|| t instanceof UnavailableException
|| t instanceof ResourceExhaustedException) {
updateQps(MIN_FACTOR, DEFAULT_PERIOD);
updateQps(MIN_FACTOR, DEFAULT_PERIOD, t);
}
outerObserver.onError(t);
}
Expand All @@ -260,11 +274,11 @@ protected void onCompleteImpl() {
outerObserver.onComplete();
}

private void updateQps(double factor, Duration period) {
private void updateQps(double factor, Duration period, @Nullable Throwable t) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of throwable pass in the status. So you don't need to do the null check, you can just have OK status if the update is from a response

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But updateQps is used by both onResponseImpl and onErrorImpl

double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR);
double currentRate = limiter.getRate();
double cappedRate = Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS);
limiter.trySetRate(cappedRate, period);
limiter.trySetRate(cappedRate, period, bigtableTracer, cappedFactor, t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
Expand Down Expand Up @@ -115,4 +116,25 @@ public void grpcMessageSent() {
public void setTotalTimeoutDuration(Duration totalTimeoutDuration) {
// noop
}

/**
* Record the target QPS for batch write flow control.
*
* @param targetQps The new target QPS for the client.
*/
@InternalApi
public void setBatchWriteFlowControlTargetQps(double targetQps) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @InternalApi annotation here and the other method.


/**
* Record the factors received from server-side for batch write flow control. The factors are
* capped by min and max allowed factor values. Status and whether the factor was actually applied
* are also recorded.
*
* @param factor Capped factor from server-side. For non-OK response, min factor is used.
* @param status Status of the request.
* @param applied Whether the factor was actually applied.
*/
@InternalApi
public void addBatchWriteFlowControlFactor(
double factor, @Nullable Throwable status, boolean applied) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class BuiltinMetricsConstants {
static final AttributeKey<String> METHOD_KEY = AttributeKey.stringKey("method");
static final AttributeKey<String> STATUS_KEY = AttributeKey.stringKey("status");
static final AttributeKey<String> CLIENT_UID_KEY = AttributeKey.stringKey("client_uid");
static final AttributeKey<Boolean> APPLIED_KEY = AttributeKey.booleanKey("applied");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename it to BATCH_WRITE_QPS_APPLIED_KEY


static final AttributeKey<String> TRANSPORT_TYPE = AttributeKey.stringKey("transport_type");
static final AttributeKey<String> TRANSPORT_REGION = AttributeKey.stringKey("transport_region");
Expand All @@ -70,6 +71,9 @@ public class BuiltinMetricsConstants {
static final String REMAINING_DEADLINE_NAME = "remaining_deadline";
static final String CLIENT_BLOCKING_LATENCIES_NAME = "throttling_latencies";
static final String PER_CONNECTION_ERROR_COUNT_NAME = "per_connection_error_count";
static final String BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME =
"batch_write_flow_control_target_qps";
static final String BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME = "batch_write_flow_control_factor";

// Start allow list of metrics that will be exported as internal
public static final Map<String, Set<String>> GRPC_METRICS =
Expand Down Expand Up @@ -140,6 +144,9 @@ public class BuiltinMetricsConstants {
500_000.0,
1_000_000.0));

private static final Aggregation AGGREGATION_BATCH_WRITE_FLOW_CONTROL_FACTOR_HISTOGRAM =
Aggregation.explicitBucketHistogram(ImmutableList.of(0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3));

static final Set<AttributeKey> COMMON_ATTRIBUTES =
ImmutableSet.of(
BIGTABLE_PROJECT_ID_KEY,
Expand Down Expand Up @@ -286,7 +293,23 @@ public static Map<InstrumentSelector, View> getAllViews() {
.addAll(COMMON_ATTRIBUTES)
.add(STREAMING_KEY, STATUS_KEY)
.build());

defineView(
views,
BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME,
Aggregation.sum(),
InstrumentType.GAUGE,
"1",
ImmutableSet.<AttributeKey>builder().addAll(COMMON_ATTRIBUTES).build());
defineView(
views,
BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME,
AGGREGATION_BATCH_WRITE_FLOW_CONTROL_FACTOR_HISTOGRAM,
InstrumentType.HISTOGRAM,
"1",
ImmutableSet.<AttributeKey>builder()
.addAll(COMMON_ATTRIBUTES)
.add(STATUS_KEY, APPLIED_KEY)
.build());
return views.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.api.gax.tracing.ApiTracerFactory.OperationType;
import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeDuration;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APPLIED_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_NAME_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLUSTER_ID_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METHOD_KEY;
Expand All @@ -41,6 +42,7 @@
import com.google.gson.reflect.TypeToken;
import io.grpc.Deadline;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleGauge;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import java.time.Duration;
Expand Down Expand Up @@ -136,6 +138,8 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend
private final DoubleHistogram remainingDeadlineHistogram;
private final LongCounter connectivityErrorCounter;
private final LongCounter retryCounter;
private final DoubleGauge batchWriteFlowControlTargetQps;
private final DoubleHistogram batchWriteFlowControlFactorHistogram;

BuiltinMetricsTracer(
OperationType operationType,
Expand All @@ -150,7 +154,9 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend
DoubleHistogram applicationBlockingLatenciesHistogram,
DoubleHistogram deadlineHistogram,
LongCounter connectivityErrorCounter,
LongCounter retryCounter) {
LongCounter retryCounter,
DoubleGauge batchWriteFlowControlTargetQps,
DoubleHistogram batchWriteFlowControlFactorHistogram) {
this.operationType = operationType;
this.spanName = spanName;
this.baseAttributes = attributes;
Expand All @@ -165,6 +171,8 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend
this.remainingDeadlineHistogram = deadlineHistogram;
this.connectivityErrorCounter = connectivityErrorCounter;
this.retryCounter = retryCounter;
this.batchWriteFlowControlTargetQps = batchWriteFlowControlTargetQps;
this.batchWriteFlowControlFactorHistogram = batchWriteFlowControlFactorHistogram;
}

@Override
Expand Down Expand Up @@ -496,4 +504,26 @@ private static double convertToMs(long nanoSeconds) {
double toMs = 1e-6;
return nanoSeconds * toMs;
}

@Override
public void setBatchWriteFlowControlTargetQps(double targetQps) {
Attributes attributes = baseAttributes.toBuilder().put(METHOD_KEY, spanName.toString()).build();

batchWriteFlowControlTargetQps.set(targetQps, attributes);
}

@Override
public void addBatchWriteFlowControlFactor(
double factor, @Nullable Throwable status, boolean applied) {
String statusStr = status == null ? "OK" : Util.extractStatus(status);

Attributes attributes =
baseAttributes.toBuilder()
.put(METHOD_KEY, spanName.toString())
.put(STATUS_KEY, statusStr)
.put(APPLIED_KEY, applied)
.build();

batchWriteFlowControlFactorHistogram.record(factor, attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APPLICATION_BLOCKING_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ATTEMPT_LATENCIES2_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ATTEMPT_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_BLOCKING_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CONNECTIVITY_ERROR_COUNT_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.FIRST_RESPONSE_LATENCIES_NAME;
Expand All @@ -34,6 +36,7 @@
import com.google.api.gax.tracing.SpanName;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleGauge;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
Expand Down Expand Up @@ -61,6 +64,8 @@ public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory {
private final DoubleHistogram remainingDeadlineHistogram;
private final LongCounter connectivityErrorCounter;
private final LongCounter retryCounter;
private final DoubleGauge batchWriteFlowControlTargetQps;
private final DoubleHistogram batchWriteFlowControlFactorHistogram;

public static BuiltinMetricsTracerFactory create(
OpenTelemetry openTelemetry, Attributes attributes) throws IOException {
Expand Down Expand Up @@ -147,6 +152,19 @@ public static BuiltinMetricsTracerFactory create(
.setDescription("The number of additional RPCs sent after the initial attempt.")
.setUnit(COUNT)
.build();
batchWriteFlowControlTargetQps =
meter
.gaugeBuilder(BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME)
.setDescription("The current target QPS of the client under batch write flow control.")
.setUnit("1")
.build();
batchWriteFlowControlFactorHistogram =
meter
.histogramBuilder(BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME)
.setDescription(
"The distribution of batch write flow control factors received from the server.")
.setUnit("1")
.build();
}

@Override
Expand All @@ -164,6 +182,8 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op
applicationBlockingLatenciesHistogram,
remainingDeadlineHistogram,
connectivityErrorCounter,
retryCounter);
retryCounter,
batchWriteFlowControlTargetQps,
batchWriteFlowControlFactorHistogram);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,19 @@ public void setTotalTimeoutDuration(java.time.Duration totalTimeoutDuration) {
tracer.setTotalTimeoutDuration(totalTimeoutDuration);
}
}

@Override
public void setBatchWriteFlowControlTargetQps(double targetQps) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.setBatchWriteFlowControlTargetQps(targetQps);
}
}

@Override
public void addBatchWriteFlowControlFactor(
double factor, @Nullable Throwable t, boolean applied) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.addBatchWriteFlowControlFactor(factor, t, applied);
}
}
}
Loading